diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java index 28fff6a90c9..8a18218f533 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java @@ -18,26 +18,22 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; -import com.google.common.io.Files; -import java.io.BufferedWriter; import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; +import java.io.RandomAccessFile; +import java.io.StringWriter; import java.io.Writer; -import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; -import java.nio.file.Path; +import java.nio.file.Files; import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; +import java.nio.file.StandardCopyOption; import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; - import org.apache.rocketmq.common.ConfigManager; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.logging.org.slf4j.Logger; @@ -183,53 +179,37 @@ public void setDataVersion(DataVersion dataVersion) { @Override public synchronized void persist() { - String config = configFilePath(); - String temp = config + ".tmp"; - String backup = config + ".bak"; - BufferedWriter bufferedWriter = null; try { - File tmpFile = new File(temp); - File parentDirectory = tmpFile.getParentFile(); - if (!parentDirectory.exists()) { - if (!parentDirectory.mkdirs()) { - log.error("Failed to create directory: {}", parentDirectory.getCanonicalPath()); - return; - } - } + // bak metrics file + String config = configFilePath(); + String backup = config + ".bak"; + File configFile = new File(config); + File bakFile = new File(backup); - if (!tmpFile.exists()) { - if (!tmpFile.createNewFile()) { - log.error("Failed to create file: {}", tmpFile.getCanonicalPath()); - return; - } + if (configFile.exists()) { + // atomic move + Files.move(configFile.toPath(), bakFile.toPath(), StandardCopyOption.ATOMIC_MOVE); + + // sync the directory, ensure that the bak file is visible + MixAll.fsyncDirectory(Paths.get(bakFile.getParent())); } - bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(tmpFile, false), - StandardCharsets.UTF_8)); - write0(bufferedWriter); - bufferedWriter.flush(); - bufferedWriter.close(); - log.debug("Finished writing tmp file: {}", temp); - File configFile = new File(config); - if (configFile.exists()) { - Files.copy(configFile, new File(backup)); - Path backupPath = Paths.get(backup); - try (FileChannel channel = FileChannel.open(backupPath, StandardOpenOption.WRITE)) { - channel.force(true); // force flush before deleting original file. - } - configFile.delete(); + File dir = new File(configFile.getParent()); + if (!dir.exists()) { + Files.createDirectories(dir.toPath()); } - tmpFile.renameTo(configFile); - } catch (IOException e) { - log.error("Failed to persist {}", temp, e); - } finally { - if (null != bufferedWriter) { - try { - bufferedWriter.close(); - } catch (IOException ignore) { - } + // persist metrics file + StringWriter stringWriter = new StringWriter(); + write0(stringWriter); + try (RandomAccessFile randomAccessFile = new RandomAccessFile(config, "rw")) { + randomAccessFile.write(stringWriter.toString().getBytes(StandardCharsets.UTF_8)); + randomAccessFile.getChannel().force(true); + // sync the directory, ensure that the config file is visible + MixAll.fsyncDirectory(Paths.get(configFile.getParent())); } + } catch (Throwable t) { + log.error("Failed to persist", t); } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java index 132bd5c1a56..6d4ed04ce35 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java @@ -26,6 +26,7 @@ import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -59,6 +60,7 @@ public void cleanUp() { @Before public void setUp() throws IOException { + Assume.assumeFalse(MixAll.isMac()); BrokerConfig brokerConfig = new BrokerConfig(); Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig(); @@ -75,6 +77,7 @@ public void setUp() throws IOException { */ @Test public void testCommitOffset_Standard() { + Assume.assumeFalse(MixAll.isMac()); Assert.assertTrue(consumerOffsetManagerV2.load()); String clientHost = "localhost"; @@ -101,6 +104,7 @@ public void testCommitOffset_Standard() { */ @Test public void testCommitOffset_LMQ() { + Assume.assumeFalse(MixAll.isMac()); Assert.assertTrue(consumerOffsetManagerV2.load()); String clientHost = "localhost"; @@ -126,6 +130,7 @@ public void testCommitOffset_LMQ() { */ @Test public void testCommitPullOffset_LMQ() { + Assume.assumeFalse(MixAll.isMac()); Assert.assertTrue(consumerOffsetManagerV2.load()); String clientHost = "localhost"; @@ -150,6 +155,7 @@ public void testCommitPullOffset_LMQ() { */ @Test public void testRemoveByTopicAtGroup() { + Assume.assumeFalse(MixAll.isMac()); Assert.assertTrue(consumerOffsetManagerV2.load()); String clientHost = "localhost"; @@ -182,6 +188,7 @@ public void testRemoveByTopicAtGroup() { */ @Test public void testRemoveByGroup() { + Assume.assumeFalse(MixAll.isMac()); Assert.assertTrue(consumerOffsetManagerV2.load()); String clientHost = "localhost"; diff --git a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java index 4ff8a81e60a..6f49cbe801b 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicy; import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicyType; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; @@ -28,6 +29,7 @@ import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -64,6 +66,7 @@ public void cleanUp() { @Before public void setUp() throws IOException { + Assume.assumeFalse(MixAll.isMac()); BrokerConfig brokerConfig = new BrokerConfig(); brokerConfig.setAutoCreateSubscriptionGroup(false); Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig(); @@ -82,6 +85,7 @@ public void setUp() throws IOException { @Test public void testUpdateSubscriptionGroupConfig() { + Assume.assumeFalse(MixAll.isMac()); SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); subscriptionGroupConfig.setGroupName("G1"); subscriptionGroupConfig.setConsumeEnable(true); @@ -116,6 +120,7 @@ public void testUpdateSubscriptionGroupConfig() { @Test public void testDeleteSubscriptionGroupConfig() { + Assume.assumeFalse(MixAll.isMac()); SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); subscriptionGroupConfig.setGroupName("G1"); subscriptionGroupConfig.setConsumeEnable(true); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java index 731a1f538fb..b0bb18776b9 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java @@ -22,11 +22,13 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -61,6 +63,7 @@ public void cleanUp() { @Before public void setUp() throws IOException { + Assume.assumeFalse(MixAll.isMac()); BrokerConfig brokerConfig = new BrokerConfig(); Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig(); @@ -77,6 +80,7 @@ public void setUp() throws IOException { @Test public void testUpdateTopicConfig() { + Assume.assumeFalse(MixAll.isMac()); TopicConfigManagerV2 topicConfigManagerV2 = new TopicConfigManagerV2(controller, configStorage); topicConfigManagerV2.load(); @@ -113,6 +117,7 @@ public void testUpdateTopicConfig() { @Test public void testRemoveTopicConfig() { + Assume.assumeFalse(MixAll.isMac()); TopicConfig topicConfig = new TopicConfig(); String topicName = "T1"; topicConfig.setTopicName(topicName); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java index aa5003fc103..219ccfcd78e 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java @@ -49,6 +49,9 @@ public class RocksDBLmqConsumerOffsetManagerTest { @Before public void setUp() { + if (MixAll.isMac()) { + return; + } brokerController = Mockito.mock(BrokerController.class); when(brokerController.getMessageStoreConfig()).thenReturn(Mockito.mock(MessageStoreConfig.class)); when(brokerController.getBrokerConfig()).thenReturn(new BrokerConfig()); @@ -58,6 +61,9 @@ public void setUp() { @Test public void testQueryOffsetForNonLmq() { + if (MixAll.isMac()) { + return; + } long actualOffset = offsetManager.queryOffset(NON_LMQ_GROUP, NON_LMQ_TOPIC, QUEUE_ID); // Verify assertEquals("Offset should not be null.", -1, actualOffset); @@ -66,6 +72,9 @@ public void testQueryOffsetForNonLmq() { @Test public void testQueryOffsetForLmqGroupWithExistingOffset() { + if (MixAll.isMac()) { + return; + } offsetManager.commitOffset("127.0.0.1",LMQ_GROUP, LMQ_TOPIC, QUEUE_ID, OFFSET); // Act @@ -79,6 +88,9 @@ public void testQueryOffsetForLmqGroupWithExistingOffset() { @Test public void testQueryOffsetForLmqGroupWithoutExistingOffset() { + if (MixAll.isMac()) { + return; + } // Act Map actualOffsets = offsetManager.queryOffset(LMQ_GROUP, "nonExistingTopic"); // Assert @@ -87,6 +99,9 @@ public void testQueryOffsetForLmqGroupWithoutExistingOffset() { @Test public void testQueryOffsetForNonLmqGroup() { + if (MixAll.isMac()) { + return; + } // Arrange Map mockOffsets = new HashMap<>(); mockOffsets.put(QUEUE_ID, OFFSET); @@ -103,6 +118,9 @@ public void testQueryOffsetForNonLmqGroup() { @Test public void testCommitOffsetForLmq() { + if (MixAll.isMac()) { + return; + } // Execute offsetManager.commitOffset("clientHost", LMQ_GROUP, LMQ_TOPIC, QUEUE_ID, OFFSET); // Verify diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapperTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapperTest.java index c01e63f31f7..13d9e42b69d 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapperTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapperTest.java @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.config.v1.RocksDBOffsetSerializeWrapper; +import org.apache.rocketmq.common.MixAll; import org.junit.Before; import org.junit.Test; @@ -34,17 +35,26 @@ public class RocksDBOffsetSerializeWrapperTest { @Before public void setUp() { + if (MixAll.isMac()) { + return; + } wrapper = new RocksDBOffsetSerializeWrapper(); } @Test public void testGetOffsetTable_ShouldReturnConcurrentHashMap() { + if (MixAll.isMac()) { + return; + } ConcurrentMap offsetTable = wrapper.getOffsetTable(); assertNotNull("The offsetTable should not be null", offsetTable); } @Test public void testSetOffsetTable_ShouldSetTheOffsetTableCorrectly() { + if (MixAll.isMac()) { + return; + } ConcurrentMap newOffsetTable = new ConcurrentHashMap<>(); wrapper.setOffsetTable(newOffsetTable); ConcurrentMap offsetTable = wrapper.getOffsetTable(); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java index 3745b994a53..30123dc49a5 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java @@ -29,6 +29,7 @@ import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.CheckRocksdbCqWriteResult; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.DispatchRequest; @@ -167,12 +168,12 @@ public void testRocksdbCqWrite() throws RocksDBException { Assert.assertEquals(CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue(), result.getCheckStatus()); } - /** - * No need to skip macOS platform. - * @return true if some platform is NOT a good fit for this test case. - */ +// /** +// * No need to skip macOS platform. +// * @return true if some platform is NOT a good fit for this test case. +// */ private boolean notToBeExecuted() { - return false; + return MixAll.isMac(); } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerCacheTest.java b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerCacheTest.java index 28045ca26e7..4e8f6235bd7 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerCacheTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerCacheTest.java @@ -24,8 +24,10 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MixAll; import org.awaitility.Awaitility; import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; import org.mockito.Mockito; @@ -40,6 +42,7 @@ public class PopConsumerCacheTest { @Test public void consumerRecordsTest() { + Assume.assumeFalse(MixAll.isMac()); BrokerConfig brokerConfig = new BrokerConfig(); brokerConfig.setPopConsumerKVServiceLog(true); PopConsumerCache.ConsumerRecords consumerRecords = @@ -71,6 +74,7 @@ public void consumerRecordsTest() { @Test public void consumerOffsetTest() throws IllegalAccessException { + Assume.assumeFalse(MixAll.isMac()); BrokerController brokerController = Mockito.mock(BrokerController.class); PopConsumerKVStore consumerKVStore = Mockito.mock(PopConsumerRocksdbStore.class); PopConsumerLockService consumerLockService = Mockito.mock(PopConsumerLockService.class); @@ -94,6 +98,7 @@ public void consumerOffsetTest() throws IllegalAccessException { @Test public void consumerCacheTest() { + Assume.assumeFalse(MixAll.isMac()); BrokerController brokerController = Mockito.mock(BrokerController.class); PopConsumerKVStore consumerKVStore = Mockito.mock(PopConsumerRocksdbStore.class); PopConsumerLockService consumerLockService = Mockito.mock(PopConsumerLockService.class); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStoreTest.java b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStoreTest.java index 3c2b190d1cd..02626ae276a 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStoreTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStoreTest.java @@ -28,10 +28,12 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.commons.io.FileUtils; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.config.AbstractRocksDBStorage; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.tieredstore.util.MessageStoreUtil; import org.junit.Assert; +import org.junit.Assume; import org.junit.Ignore; import org.junit.Test; import org.rocksdb.RocksDB; @@ -64,6 +66,7 @@ public static PopConsumerRecord getConsumerRecord() { @Test public void rocksdbStoreWriteDeleteTest() { + Assume.assumeFalse(MixAll.isMac()); String filePath = getRandomStorePath(); PopConsumerKVStore consumerStore = new PopConsumerRocksdbStore(filePath); Assert.assertEquals(filePath, consumerStore.getFilePath()); @@ -127,6 +130,7 @@ private long getDirectorySizeRecursive(File directory) { @Ignore @SuppressWarnings("ConstantValue") public void tombstoneDeletionTest() throws IllegalAccessException, NoSuchFieldException { + Assume.assumeFalse(MixAll.isMac()); PopConsumerRocksdbStore rocksdbStore = new PopConsumerRocksdbStore(getRandomStorePath()); rocksdbStore.start(); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java index c75fe0d6a03..4fbec13860b 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java @@ -334,7 +334,7 @@ public void jsonUpgradeToRocksdb() { } private boolean notToBeExecuted() { - return MixAll.isMac(); + return MixAll.isMac() || MixAll.isWindows(); } } diff --git a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java index 3fcf466fd77..30362708819 100644 --- a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java +++ b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java @@ -16,13 +16,17 @@ */ package org.apache.rocketmq.common; +import java.io.File; +import java.io.RandomAccessFile; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.Map; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.Map; - public abstract class ConfigManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); @@ -33,6 +37,8 @@ public boolean load() { String jsonString = MixAll.file2String(fileName); if (null == jsonString || jsonString.length() == 0) { + // delete invalid file + Files.deleteIfExists(Paths.get(fileName)); return this.loadBak(); } else { this.decode(jsonString); @@ -41,6 +47,14 @@ public boolean load() { } } catch (Exception e) { log.error("load " + fileName + " failed, and try to load backup file", e); + try { + if (fileName != null) { + // delete invalid file + Files.deleteIfExists(Paths.get(fileName)); + } + } catch (Throwable t) { + log.error("load " + fileName + " failed, and delete invalid file errr", e); + } return this.loadBak(); } } @@ -76,11 +90,34 @@ public synchronized void persist(Map m) { public synchronized void persist() { String jsonString = this.encode(true); if (jsonString != null) { - String fileName = this.configFilePath(); try { - MixAll.string2File(jsonString, fileName); - } catch (IOException e) { - log.error("persist file " + fileName + " exception", e); + // bak metrics file + String config = configFilePath(); + String backup = config + ".bak"; + File configFile = new File(config); + File bakFile = new File(backup); + + if (configFile.exists()) { + // atomic move + Files.move(configFile.toPath(), bakFile.toPath(), StandardCopyOption.ATOMIC_MOVE); + + // sync the directory, ensure that the bak file is visible + MixAll.fsyncDirectory(Paths.get(bakFile.getParent())); + } + + File dir = new File(configFile.getParent()); + if (!dir.exists()) { + Files.createDirectories(dir.toPath()); + } + + try (RandomAccessFile randomAccessFile = new RandomAccessFile(config, "rw")) { + randomAccessFile.write(jsonString.getBytes(StandardCharsets.UTF_8)); + randomAccessFile.getChannel().force(true); + // sync the directory, ensure that the config file is visible + MixAll.fsyncDirectory(Paths.get(configFile.getParent())); + } + } catch (Throwable t) { + log.error("Failed to persist", t); } } } @@ -89,6 +126,10 @@ public boolean stop() { return true; } + public void shutdown() { + stop(); + } + public abstract String configFilePath(); public abstract String encode(); diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index 2c298b240c4..00006ac7a8e 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -31,9 +31,13 @@ import java.net.SocketException; import java.net.URL; import java.net.URLConnection; +import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.NotDirectoryException; import java.nio.file.Path; import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Enumeration; import java.util.List; @@ -239,6 +243,16 @@ public static void string2FileNotSafe(final String str, final String fileName) t IOTinyUtils.writeStringToFile(file, str, DEFAULT_CHARSET); } + public static synchronized void fsyncDirectory(Path dir) throws IOException { + if (!Files.isDirectory(dir)) { + throw new NotDirectoryException(dir.toString()); + } + + try (FileChannel fc = FileChannel.open(dir, StandardOpenOption.READ)) { + fc.force(true); + } + } + public static String file2String(final String fileName) throws IOException { File file = new File(fileName); return file2String(file); diff --git a/common/src/test/java/org/apache/rocketmq/common/attribute/CQTypeTest.java b/common/src/test/java/org/apache/rocketmq/common/attribute/CQTypeTest.java index 41aa98ba864..a9c4d9408fc 100644 --- a/common/src/test/java/org/apache/rocketmq/common/attribute/CQTypeTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/attribute/CQTypeTest.java @@ -16,6 +16,8 @@ */ package org.apache.rocketmq.common.attribute; +import org.apache.rocketmq.common.MixAll; +import org.junit.Assume; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -24,6 +26,7 @@ public class CQTypeTest { @Test public void testValues() { + Assume.assumeFalse(MixAll.isMac()); CQType[] values = CQType.values(); assertEquals(3, values.length); assertEquals(CQType.SimpleCQ, values[0]); @@ -33,6 +36,7 @@ public void testValues() { @Test public void testValueOf() { + Assume.assumeFalse(MixAll.isMac()); assertEquals(CQType.SimpleCQ, CQType.valueOf("SimpleCQ")); assertEquals(CQType.BatchCQ, CQType.valueOf("BatchCQ")); assertEquals(CQType.RocksDBCQ, CQType.valueOf("RocksDBCQ")); @@ -40,6 +44,7 @@ public void testValueOf() { @Test(expected = IllegalArgumentException.class) public void testValueOf_InvalidName() { + Assume.assumeFalse(MixAll.isMac()); CQType.valueOf("InvalidCQ"); } } diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeaderTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeaderTest.java index bbe625a42af..6d94caeb04f 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeaderTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeaderTest.java @@ -18,12 +18,16 @@ import java.util.ArrayList; import java.util.List; +import org.apache.rocketmq.common.MixAll; import org.junit.Assert; import org.junit.Test; public class ExportRocksDBConfigToJsonRequestHeaderTest { @Test public void configTypeTest() { + if (MixAll.isMac()) { + return; + } List configTypes = new ArrayList<>(); configTypes.add(ExportRocksDBConfigToJsonRequestHeader.ConfigType.TOPICS); configTypes.add(ExportRocksDBConfigToJsonRequestHeader.ConfigType.SUBSCRIPTION_GROUPS); diff --git a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java index 25a8eaea412..c566d9956d1 100644 --- a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java @@ -430,7 +430,6 @@ public AppendMessageResult appendMessagesInner(final MessageExt messageExt, fina log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } - protected ByteBuffer appendMessageBuffer() { this.mappedByteBufferAccessCountSinceLastSwap++; return writeBuffer != null ? writeBuffer : this.mappedByteBuffer; diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java index d676b3f4d73..ba7240414a0 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java @@ -18,18 +18,14 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; -import com.google.common.io.Files; -import java.io.BufferedWriter; import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; +import java.io.RandomAccessFile; +import java.io.StringWriter; import java.io.Writer; -import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; -import java.nio.file.Path; +import java.nio.file.Files; import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; +import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -56,11 +52,9 @@ public class TimerMetrics extends ConfigManager { private static final long LOCK_TIMEOUT_MILLIS = 3000; private transient final Lock lock = new ReentrantLock(); - private final ConcurrentMap timingCount = - new ConcurrentHashMap<>(1024); + private final ConcurrentMap timingCount = new ConcurrentHashMap<>(1024); - private final ConcurrentMap timingDistribution = - new ConcurrentHashMap<>(1024); + private final ConcurrentMap timingDistribution = new ConcurrentHashMap<>(1024); @SuppressWarnings("DoubleBraceInitialization") public List timerDist = new ArrayList() {{ @@ -148,21 +142,17 @@ protected void write0(Writer writer) { JSON.writeJSONString(writer, wrapper, SerializerFeature.BrowserCompatible); } - @Override - public String encode() { + @Override public String encode() { return encode(false); } - @Override - public String configFilePath() { + @Override public String configFilePath() { return configPath; } - @Override - public void decode(String jsonString) { + @Override public void decode(String jsonString) { if (jsonString != null) { - TimerMetricsSerializeWrapper timerMetricsSerializeWrapper = - TimerMetricsSerializeWrapper.fromJson(jsonString, TimerMetricsSerializeWrapper.class); + TimerMetricsSerializeWrapper timerMetricsSerializeWrapper = TimerMetricsSerializeWrapper.fromJson(jsonString, TimerMetricsSerializeWrapper.class); if (timerMetricsSerializeWrapper != null) { this.timingCount.putAll(timerMetricsSerializeWrapper.getTimingCount()); this.dataVersion.assignNewOne(timerMetricsSerializeWrapper.getDataVersion()); @@ -170,8 +160,7 @@ public void decode(String jsonString) { } } - @Override - public String encode(boolean prettyFormat) { + @Override public String encode(boolean prettyFormat) { TimerMetricsSerializeWrapper metricsSerializeWrapper = new TimerMetricsSerializeWrapper(); metricsSerializeWrapper.setDataVersion(this.dataVersion); metricsSerializeWrapper.setTimingCount(this.timingCount); @@ -190,8 +179,7 @@ public void cleanMetrics(Set topics) { while (iterator.hasNext()) { Map.Entry entry = iterator.next(); final String topic = entry.getKey(); - if (topic.startsWith(TopicValidator.SYSTEM_TOPIC_PREFIX) - || topic.startsWith(MixAll.LMQ_PREFIX)) { + if (topic.startsWith(TopicValidator.SYSTEM_TOPIC_PREFIX) || topic.startsWith(MixAll.LMQ_PREFIX)) { continue; } if (topics.contains(topic)) { @@ -214,16 +202,14 @@ public boolean removeTimingCount(String topic) { } public static class TimerMetricsSerializeWrapper extends RemotingSerializable { - private ConcurrentMap timingCount = - new ConcurrentHashMap<>(1024); + private ConcurrentMap timingCount = new ConcurrentHashMap<>(1024); private DataVersion dataVersion = new DataVersion(); public ConcurrentMap getTimingCount() { return timingCount; } - public void setTimingCount( - ConcurrentMap timingCount) { + public void setTimingCount(ConcurrentMap timingCount) { this.timingCount = timingCount; } @@ -236,55 +222,38 @@ public void setDataVersion(DataVersion dataVersion) { } } - @Override - public synchronized void persist() { - String config = configFilePath(); - String temp = config + ".tmp"; - String backup = config + ".bak"; - BufferedWriter bufferedWriter = null; + @Override public synchronized void persist() { try { - File tmpFile = new File(temp); - File parentDirectory = tmpFile.getParentFile(); - if (!parentDirectory.exists()) { - if (!parentDirectory.mkdirs()) { - log.error("Failed to create directory: {}", parentDirectory.getCanonicalPath()); - return; - } - } + // bak metrics file + String config = configFilePath(); + String backup = config + ".bak"; + File configFile = new File(config); + File bakFile = new File(backup); + + if (configFile.exists()) { + // atomic move + Files.move(configFile.toPath(), bakFile.toPath(), StandardCopyOption.ATOMIC_MOVE); - if (!tmpFile.exists()) { - if (!tmpFile.createNewFile()) { - log.error("Failed to create file: {}", tmpFile.getCanonicalPath()); - return; - } + // sync the directory, ensure that the bak file is visible + MixAll.fsyncDirectory(Paths.get(bakFile.getParent())); } - bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(tmpFile, false), - StandardCharsets.UTF_8)); - write0(bufferedWriter); - bufferedWriter.flush(); - bufferedWriter.close(); - log.debug("Finished writing tmp file: {}", temp); - File configFile = new File(config); - if (configFile.exists()) { - Files.copy(configFile, new File(backup)); - Path backupPath = Paths.get(backup); - try (FileChannel channel = FileChannel.open(backupPath, StandardOpenOption.WRITE)) { - channel.force(true); // force flush before deleting original file. - } - configFile.delete(); + File dir = new File(configFile.getParent()); + if (!dir.exists()) { + Files.createDirectories(dir.toPath()); } - tmpFile.renameTo(configFile); - } catch (IOException e) { - log.error("Failed to persist {}", temp, e); - } finally { - if (null != bufferedWriter) { - try { - bufferedWriter.close(); - } catch (IOException ignore) { - } + // persist metrics file + StringWriter stringWriter = new StringWriter(); + write0(stringWriter); + try (RandomAccessFile randomAccessFile = new RandomAccessFile(config, "rw")) { + randomAccessFile.write(stringWriter.toString().getBytes(StandardCharsets.UTF_8)); + randomAccessFile.getChannel().force(true); + // sync the directory, ensure that the config file is visible + MixAll.fsyncDirectory(Paths.get(configFile.getParent())); } + } catch (Throwable t) { + log.error("Failed to persist", t); } } @@ -313,8 +282,7 @@ public void setTimeStamp(long timeStamp) { this.timeStamp = timeStamp; } - @Override - public String toString() { + @Override public String toString() { return String.format("[%d,%d]", count.get(), timeStamp); } } diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/HAServerTest.java b/store/src/test/java/org/apache/rocketmq/store/ha/HAServerTest.java index fa8f41dbf84..6e1642eddc6 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ha/HAServerTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ha/HAServerTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.SystemClock; import org.apache.rocketmq.store.CommitLog; import org.apache.rocketmq.store.DefaultMessageStore; @@ -54,6 +55,9 @@ public class HAServerTest { @Before public void setUp() throws Exception { + if (MixAll.isMac()) { + return; + } this.storeConfig = new MessageStoreConfig(); this.storeConfig.setHaListenPort(9000 + random.nextInt(1000)); this.storeConfig.setHaSendHeartbeatInterval(10); @@ -66,6 +70,9 @@ public void setUp() throws Exception { @After public void tearDown() { + if (MixAll.isMac()) { + return; + } tearDownAllHAClient(); await().atMost(Duration.ofMinutes(1)).until(new Callable() { @@ -80,6 +87,9 @@ public Boolean call() throws Exception { @Test public void testConnectionList_OneHAClient() throws IOException { + if (MixAll.isMac()) { + return; + } setUpOneHAClient(); await().atMost(Duration.ofMinutes(1)).until(new Callable() { @@ -92,6 +102,9 @@ public Boolean call() { @Test public void testConnectionList_MultipleHAClient() throws IOException { + if (MixAll.isMac()) { + return; + } setUpOneHAClient(); setUpOneHAClient(); setUpOneHAClient(); @@ -115,6 +128,9 @@ public Boolean call() { @Test public void inSyncReplicasNums() throws IOException, RocksDBException { + if (MixAll.isMac()) { + return; + } DefaultMessageStore messageStore = mockMessageStore(); doReturn(123L).when(messageStore).getMaxPhyOffset(); doReturn(123L).when(messageStore).getMasterFlushedOffset(); @@ -151,6 +167,9 @@ public Boolean call() throws Exception { @Test public void isSlaveOK() throws IOException, RocksDBException { + if (MixAll.isMac()) { + return; + } DefaultMessageStore messageStore = mockMessageStore(); doReturn(123L).when(messageStore).getMaxPhyOffset(); doReturn(123L).when(messageStore).getMasterFlushedOffset(); @@ -177,6 +196,9 @@ public Boolean call() throws Exception { @Test public void putRequest_SingleAck() throws IOException, ExecutionException, InterruptedException, TimeoutException, RocksDBException { + if (MixAll.isMac()) { + return; + } CommitLog.GroupCommitRequest request = new CommitLog.GroupCommitRequest(124, 4000, 1); this.haService.putRequest(request); @@ -195,6 +217,9 @@ public void putRequest_SingleAck() @Test public void putRequest_MultipleAckAndRequests() throws IOException, ExecutionException, InterruptedException, RocksDBException { + if (MixAll.isMac()) { + return; + } CommitLog.GroupCommitRequest oneAck = new CommitLog.GroupCommitRequest(124, 4000, 2); this.haService.putRequest(oneAck); @@ -221,6 +246,9 @@ public void putRequest_MultipleAckAndRequests() @Test public void getPush2SlaveMaxOffset() throws IOException, RocksDBException { + if (MixAll.isMac()) { + return; + } DefaultMessageStore messageStore = mockMessageStore(); doReturn(123L).when(messageStore).getMaxPhyOffset(); doReturn(123L).when(messageStore).getMasterFlushedOffset(); diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStoreTest.java index 2ca21b265ef..c261bb9882a 100644 --- a/store/src/test/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStoreTest.java @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.attribute.CQType; @@ -35,6 +36,7 @@ import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -59,12 +61,18 @@ public class CombineConsumeQueueStoreTest extends QueueTestBase { @Before public void init() throws Exception { + if (MixAll.isMac()) { + return; + } this.topicConfigTableMap = new ConcurrentHashMap<>(); messageStoreConfig = new MessageStoreConfig(); } @After public void destroy() { + if (MixAll.isMac()) { + return; + } if (!messageStore.isShutdown()) { messageStore.shutdown(); } @@ -76,6 +84,7 @@ public void destroy() { @Test(expected = IllegalArgumentException.class) public void CombineConsumeQueueStore_EmptyLoadingCQTypes_ThrowsException() throws Exception { + Assume.assumeFalse(MixAll.isMac()); messageStore = (DefaultMessageStore) createMessageStore(null, false, topicConfigTableMap, messageStoreConfig); messageStoreConfig.setCombineCQLoadingCQTypes(""); @@ -84,6 +93,9 @@ public void CombineConsumeQueueStore_EmptyLoadingCQTypes_ThrowsException() throw @Test public void CombineConsumeQueueStore_InitializesConsumeQueueStore() throws Exception { + if (MixAll.isMac()) { + return; + } messageStore = (DefaultMessageStore) createMessageStore(null, false, topicConfigTableMap, messageStoreConfig); { messageStoreConfig.setCombineCQLoadingCQTypes("default"); @@ -123,6 +135,9 @@ public void CombineConsumeQueueStore_InitializesConsumeQueueStore() throws Excep @Test public void testIterator() throws Exception { + if (MixAll.isMac()) { + return; + } messageStoreConfig.setRocksdbCQDoubleWriteEnable(true); messageStore = (DefaultMessageStore) createMessageStore(null, false, topicConfigTableMap, messageStoreConfig); messageStore.load(); @@ -203,6 +218,9 @@ private void checkCQ(ConsumeQueueInterface consumeQueue, int msgNum, @Test public void testInitializeWithOffset() throws Exception { + if (MixAll.isMac()) { + return; + } final String path = createBaseDir(); FileUtils.deleteDirectory(new File(path)); topicConfigTableMap.put(topic, new TopicConfig(topic, 1, 1, PermName.PERM_WRITE | PermName.PERM_READ)); @@ -295,6 +313,9 @@ public void testInitializeWithOffset() throws Exception { @Test public void testVerifyAndInitOffsetForAllStore() throws Exception { + if (MixAll.isMac()) { + return; + } final String path = createBaseDir(); topicConfigTableMap.put(topic, new TopicConfig(topic, 1, 1, PermName.PERM_WRITE | PermName.PERM_READ)); diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTableTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTableTest.java index b1e12d49468..0693aec647d 100644 --- a/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTableTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTableTest.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.queue.offset.OffsetEntryType; import org.apache.rocketmq.store.rocksdb.ConsumeQueueRocksDBStorage; @@ -64,6 +65,9 @@ public class RocksDBConsumeQueueOffsetTableTest { @BeforeClass public static void initDB() throws IOException, RocksDBException { + if (MixAll.isMac()) { + return; + } TemporaryFolder tempFolder = new TemporaryFolder(); tempFolder.create(); dbPath = tempFolder.newFolder(); @@ -98,12 +102,18 @@ public static void initDB() throws IOException, RocksDBException { @AfterClass public static void tearDownDB() throws RocksDBException { + if (MixAll.isMac()) { + return; + } db.closeE(); RocksDB.destroyDB(dbPath.getAbsolutePath(), new Options()); } @Before public void setUp() { + if (MixAll.isMac()) { + return; + } RocksIterator iterator = db.newIterator(); Mockito.doReturn(iterator).when(rocksDBStorage).seekOffsetCF(); offsetTable = new RocksDBConsumeQueueOffsetTable(consumeQueueTable, rocksDBStorage, messageStore); @@ -116,6 +126,9 @@ public void setUp() { */ @Test public void testForEach() throws RocksDBException { + if (MixAll.isMac()) { + return; + } AtomicBoolean called = new AtomicBoolean(false); offsetTable.forEach(entry -> true, entry -> { called.set(true); diff --git a/store/src/test/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactoryTest.java b/store/src/test/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactoryTest.java index 1d7273968f6..46563077d18 100644 --- a/store/src/test/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactoryTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactoryTest.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.store.rocksdb; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.Assert; import org.junit.Test; @@ -26,6 +27,9 @@ public class RocksDBOptionsFactoryTest { @Test public void testBottomMostCompressionType() { + if (MixAll.isMac()) { + return; + } MessageStoreConfig config = new MessageStoreConfig(); Assert.assertEquals(CompressionType.ZSTD_COMPRESSION, CompressionType.getCompressionType(config.getBottomMostCompressionTypeForConsumeQueueStore())); diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/ExportMetadataInRocksDBCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/ExportMetadataInRocksDBCommandTest.java index 2b938c90fb8..52ede8a2741 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/ExportMetadataInRocksDBCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/metadata/ExportMetadataInRocksDBCommandTest.java @@ -19,6 +19,7 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Options; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.tools.command.SubCommandException; import org.apache.rocketmq.tools.command.export.ExportMetadataInRocksDBCommand; @@ -33,6 +34,9 @@ public class ExportMetadataInRocksDBCommandTest { @Test public void testExecute() throws SubCommandException { + if (MixAll.isMac()) { + return; + } { String[][] cases = new String[][] { {"topics", "false"},