Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,22 @@

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.google.common.io.Files;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.RandomAccessFile;
import java.io.StringWriter;
import java.io.Writer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.nio.file.StandardCopyOption;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.org.slf4j.Logger;
Expand Down Expand Up @@ -183,53 +179,37 @@ public void setDataVersion(DataVersion dataVersion) {

@Override
public synchronized void persist() {
String config = configFilePath();
String temp = config + ".tmp";
String backup = config + ".bak";
BufferedWriter bufferedWriter = null;
try {
File tmpFile = new File(temp);
File parentDirectory = tmpFile.getParentFile();
if (!parentDirectory.exists()) {
if (!parentDirectory.mkdirs()) {
log.error("Failed to create directory: {}", parentDirectory.getCanonicalPath());
return;
}
}
// bak metrics file
String config = configFilePath();
String backup = config + ".bak";
File configFile = new File(config);
File bakFile = new File(backup);

if (!tmpFile.exists()) {
if (!tmpFile.createNewFile()) {
log.error("Failed to create file: {}", tmpFile.getCanonicalPath());
return;
}
if (configFile.exists()) {
// atomic move
Files.move(configFile.toPath(), bakFile.toPath(), StandardCopyOption.ATOMIC_MOVE);

// sync the directory, ensure that the bak file is visible
MixAll.fsyncDirectory(Paths.get(bakFile.getParent()));
}
bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(tmpFile, false),
StandardCharsets.UTF_8));
write0(bufferedWriter);
bufferedWriter.flush();
bufferedWriter.close();
log.debug("Finished writing tmp file: {}", temp);

File configFile = new File(config);
if (configFile.exists()) {
Files.copy(configFile, new File(backup));
Path backupPath = Paths.get(backup);
try (FileChannel channel = FileChannel.open(backupPath, StandardOpenOption.WRITE)) {
channel.force(true); // force flush before deleting original file.
}
configFile.delete();
File dir = new File(configFile.getParent());
if (!dir.exists()) {
Files.createDirectories(dir.toPath());
}

tmpFile.renameTo(configFile);
} catch (IOException e) {
log.error("Failed to persist {}", temp, e);
} finally {
if (null != bufferedWriter) {
try {
bufferedWriter.close();
} catch (IOException ignore) {
}
// persist metrics file
StringWriter stringWriter = new StringWriter();
write0(stringWriter);
try (RandomAccessFile randomAccessFile = new RandomAccessFile(config, "rw")) {
randomAccessFile.write(stringWriter.toString().getBytes(StandardCharsets.UTF_8));
randomAccessFile.getChannel().force(true);
// sync the directory, ensure that the config file is visible
MixAll.fsyncDirectory(Paths.get(configFile.getParent()));
}
} catch (Throwable t) {
log.error("Failed to persist", t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -59,6 +60,7 @@ public void cleanUp() {

@Before
public void setUp() throws IOException {
Assume.assumeFalse(MixAll.isMac());
BrokerConfig brokerConfig = new BrokerConfig();
Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig();

Expand All @@ -75,6 +77,7 @@ public void setUp() throws IOException {
*/
@Test
public void testCommitOffset_Standard() {
Assume.assumeFalse(MixAll.isMac());
Assert.assertTrue(consumerOffsetManagerV2.load());

String clientHost = "localhost";
Expand All @@ -101,6 +104,7 @@ public void testCommitOffset_Standard() {
*/
@Test
public void testCommitOffset_LMQ() {
Assume.assumeFalse(MixAll.isMac());
Assert.assertTrue(consumerOffsetManagerV2.load());

String clientHost = "localhost";
Expand All @@ -126,6 +130,7 @@ public void testCommitOffset_LMQ() {
*/
@Test
public void testCommitPullOffset_LMQ() {
Assume.assumeFalse(MixAll.isMac());
Assert.assertTrue(consumerOffsetManagerV2.load());

String clientHost = "localhost";
Expand All @@ -150,6 +155,7 @@ public void testCommitPullOffset_LMQ() {
*/
@Test
public void testRemoveByTopicAtGroup() {
Assume.assumeFalse(MixAll.isMac());
Assert.assertTrue(consumerOffsetManagerV2.load());

String clientHost = "localhost";
Expand Down Expand Up @@ -182,6 +188,7 @@ public void testRemoveByTopicAtGroup() {
*/
@Test
public void testRemoveByGroup() {
Assume.assumeFalse(MixAll.isMac());
Assert.assertTrue(consumerOffsetManagerV2.load());

String clientHost = "localhost";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import java.io.IOException;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicy;
import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicyType;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -64,6 +66,7 @@ public void cleanUp() {

@Before
public void setUp() throws IOException {
Assume.assumeFalse(MixAll.isMac());
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setAutoCreateSubscriptionGroup(false);
Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig();
Expand All @@ -82,6 +85,7 @@ public void setUp() throws IOException {

@Test
public void testUpdateSubscriptionGroupConfig() {
Assume.assumeFalse(MixAll.isMac());
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName("G1");
subscriptionGroupConfig.setConsumeEnable(true);
Expand Down Expand Up @@ -116,6 +120,7 @@ public void testUpdateSubscriptionGroupConfig() {

@Test
public void testDeleteSubscriptionGroupConfig() {
Assume.assumeFalse(MixAll.isMac());
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName("G1");
subscriptionGroupConfig.setConsumeEnable(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -61,6 +63,7 @@ public void cleanUp() {

@Before
public void setUp() throws IOException {
Assume.assumeFalse(MixAll.isMac());
BrokerConfig brokerConfig = new BrokerConfig();
Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig();

Expand All @@ -77,6 +80,7 @@ public void setUp() throws IOException {

@Test
public void testUpdateTopicConfig() {
Assume.assumeFalse(MixAll.isMac());
TopicConfigManagerV2 topicConfigManagerV2 = new TopicConfigManagerV2(controller, configStorage);
topicConfigManagerV2.load();

Expand Down Expand Up @@ -113,6 +117,7 @@ public void testUpdateTopicConfig() {

@Test
public void testRemoveTopicConfig() {
Assume.assumeFalse(MixAll.isMac());
TopicConfig topicConfig = new TopicConfig();
String topicName = "T1";
topicConfig.setTopicName(topicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public class RocksDBLmqConsumerOffsetManagerTest {

@Before
public void setUp() {
if (MixAll.isMac()) {
return;
}
brokerController = Mockito.mock(BrokerController.class);
when(brokerController.getMessageStoreConfig()).thenReturn(Mockito.mock(MessageStoreConfig.class));
when(brokerController.getBrokerConfig()).thenReturn(new BrokerConfig());
Expand All @@ -58,6 +61,9 @@ public void setUp() {

@Test
public void testQueryOffsetForNonLmq() {
if (MixAll.isMac()) {
return;
}
long actualOffset = offsetManager.queryOffset(NON_LMQ_GROUP, NON_LMQ_TOPIC, QUEUE_ID);
// Verify
assertEquals("Offset should not be null.", -1, actualOffset);
Expand All @@ -66,6 +72,9 @@ public void testQueryOffsetForNonLmq() {

@Test
public void testQueryOffsetForLmqGroupWithExistingOffset() {
if (MixAll.isMac()) {
return;
}
offsetManager.commitOffset("127.0.0.1",LMQ_GROUP, LMQ_TOPIC, QUEUE_ID, OFFSET);

// Act
Expand All @@ -79,6 +88,9 @@ public void testQueryOffsetForLmqGroupWithExistingOffset() {

@Test
public void testQueryOffsetForLmqGroupWithoutExistingOffset() {
if (MixAll.isMac()) {
return;
}
// Act
Map<Integer, Long> actualOffsets = offsetManager.queryOffset(LMQ_GROUP, "nonExistingTopic");
// Assert
Expand All @@ -87,6 +99,9 @@ public void testQueryOffsetForLmqGroupWithoutExistingOffset() {

@Test
public void testQueryOffsetForNonLmqGroup() {
if (MixAll.isMac()) {
return;
}
// Arrange
Map<Integer, Long> mockOffsets = new HashMap<>();
mockOffsets.put(QUEUE_ID, OFFSET);
Expand All @@ -103,6 +118,9 @@ public void testQueryOffsetForNonLmqGroup() {

@Test
public void testCommitOffsetForLmq() {
if (MixAll.isMac()) {
return;
}
// Execute
offsetManager.commitOffset("clientHost", LMQ_GROUP, LMQ_TOPIC, QUEUE_ID, OFFSET);
// Verify
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.ConcurrentMap;

import org.apache.rocketmq.broker.config.v1.RocksDBOffsetSerializeWrapper;
import org.apache.rocketmq.common.MixAll;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -34,17 +35,26 @@ public class RocksDBOffsetSerializeWrapperTest {

@Before
public void setUp() {
if (MixAll.isMac()) {
return;
}
wrapper = new RocksDBOffsetSerializeWrapper();
}

@Test
public void testGetOffsetTable_ShouldReturnConcurrentHashMap() {
if (MixAll.isMac()) {
return;
}
ConcurrentMap<Integer, Long> offsetTable = wrapper.getOffsetTable();
assertNotNull("The offsetTable should not be null", offsetTable);
}

@Test
public void testSetOffsetTable_ShouldSetTheOffsetTableCorrectly() {
if (MixAll.isMac()) {
return;
}
ConcurrentMap<Integer, Long> newOffsetTable = new ConcurrentHashMap<>();
wrapper.setOffsetTable(newOffsetTable);
ConcurrentMap<Integer, Long> offsetTable = wrapper.getOffsetTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.DispatchRequest;
Expand Down Expand Up @@ -167,12 +168,12 @@ public void testRocksdbCqWrite() throws RocksDBException {
Assert.assertEquals(CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue(), result.getCheckStatus());
}

/**
* No need to skip macOS platform.
* @return true if some platform is NOT a good fit for this test case.
*/
// /**
// * No need to skip macOS platform.
// * @return true if some platform is NOT a good fit for this test case.
// */
private boolean notToBeExecuted() {
return false;
return MixAll.isMac();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.mockito.Mockito;

Expand All @@ -40,6 +42,7 @@ public class PopConsumerCacheTest {

@Test
public void consumerRecordsTest() {
Assume.assumeFalse(MixAll.isMac());
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setPopConsumerKVServiceLog(true);
PopConsumerCache.ConsumerRecords consumerRecords =
Expand Down Expand Up @@ -71,6 +74,7 @@ public void consumerRecordsTest() {

@Test
public void consumerOffsetTest() throws IllegalAccessException {
Assume.assumeFalse(MixAll.isMac());
BrokerController brokerController = Mockito.mock(BrokerController.class);
PopConsumerKVStore consumerKVStore = Mockito.mock(PopConsumerRocksdbStore.class);
PopConsumerLockService consumerLockService = Mockito.mock(PopConsumerLockService.class);
Expand All @@ -94,6 +98,7 @@ public void consumerOffsetTest() throws IllegalAccessException {

@Test
public void consumerCacheTest() {
Assume.assumeFalse(MixAll.isMac());
BrokerController brokerController = Mockito.mock(BrokerController.class);
PopConsumerKVStore consumerKVStore = Mockito.mock(PopConsumerRocksdbStore.class);
PopConsumerLockService consumerLockService = Mockito.mock(PopConsumerLockService.class);
Expand Down
Loading
Loading