From cd9286294bc2dd23f5eaa0a9a06d58c85d22bbfe Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Wed, 3 Sep 2025 19:10:01 +0800 Subject: [PATCH 1/4] feat: Add accelerated startup recovery feature Add accelerated startup recovery functionality when using RocksDB store with SYNC_FLUSH configuration: - Add enableAcceleratedRecovery configuration option in MessageStoreConfig - Implement accelerated recovery logic in CommitLog for both normal and abnormal recovery - Add protective fallback mechanism to handle edge cases - Improve isMappedFileMatchedRecover method for better robustness - Add comprehensive unit tests for the accelerated recovery feature This feature significantly reduces startup time when recovering from RocksDB-based storage with synchronous flushing enabled, while maintaining data consistency and safety. --- .../org/apache/rocketmq/store/CommitLog.java | 66 +++++- .../store/config/MessageStoreConfig.java | 18 +- .../store/AcceleratedRecoveryTest.java | 211 ++++++++++++++++++ 3 files changed, 282 insertions(+), 13 deletions(-) create mode 100644 store/src/test/java/org/apache/rocketmq/store/AcceleratedRecoveryTest.java diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 2fdd2450e61..d8a8bfae98d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -174,6 +174,17 @@ public boolean load() { return result; } + public void cleanResourceAll() { + // Clean all mapped file resources + if (mappedFileQueue != null) { + for (MappedFile mappedFile : mappedFileQueue.getMappedFiles()) { + if (mappedFile instanceof ReferenceResource) { + ((ReferenceResource) mappedFile).cleanup(0); + } + } + } + } + public void start() { this.flushManager.start(); log.info("start commitLog successfully. storeRoot: {}", this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()); @@ -342,8 +353,17 @@ public void recoverNormally(long dispatchFromPhyOffset) throws RocksDBException MappedFile mappedFile = mappedFiles.get(index); ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); long processOffset = mappedFile.getFileFromOffset(); - long mappedFileOffset = 0; + long mappedFileOffset; long lastValidMsgPhyOffset = this.getConfirmOffset(); + + if (defaultMessageStore.getMessageStoreConfig().isEnableRocksDBStore() + && defaultMessageStore.getMessageStoreConfig().isEnableAcceleratedRecovery()) { + mappedFileOffset = dispatchFromPhyOffset - mappedFile.getFileFromOffset(); + lastValidMsgPhyOffset = dispatchFromPhyOffset; + byteBuffer.position((int) mappedFileOffset); + } else { + mappedFileOffset = 0; + } while (true) { DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo); int size = dispatchRequest.getMsgSize(); @@ -728,9 +748,29 @@ public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBExc ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); long processOffset = mappedFile.getFileFromOffset(); - long mappedFileOffset = 0; - long lastValidMsgPhyOffset = processOffset; - long lastConfirmValidMsgPhyOffset = processOffset; + long mappedFileOffset; + long lastValidMsgPhyOffset; + long lastConfirmValidMsgPhyOffset; + + if (defaultMessageStore.getMessageStoreConfig().isEnableRocksDBStore() + && defaultMessageStore.getMessageStoreConfig().isEnableAcceleratedRecovery()) { + mappedFileOffset = maxPhyOffsetOfConsumeQueue - mappedFile.getFileFromOffset(); + // Protective measures, falling back to non-accelerated mode, which is extremely unlikely to occur + if (mappedFileOffset < 0) { + mappedFileOffset = 0; + lastValidMsgPhyOffset = processOffset; + lastConfirmValidMsgPhyOffset = processOffset; + } else { + log.info("recover using acceleration, recovery offset is {}", maxPhyOffsetOfConsumeQueue); + lastValidMsgPhyOffset = maxPhyOffsetOfConsumeQueue; + lastConfirmValidMsgPhyOffset = maxPhyOffsetOfConsumeQueue; + byteBuffer.position((int) mappedFileOffset); + } + } else { + mappedFileOffset = 0; + lastValidMsgPhyOffset = processOffset; + lastConfirmValidMsgPhyOffset = processOffset; + } // abnormal recover require dispatching boolean doDispatch = true; while (true) { @@ -840,19 +880,21 @@ private boolean isMappedFileMatchedRecover(final MappedFile mappedFile, boolean recoverNormally) throws RocksDBException { ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); - int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSITION); - if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE && magicCode != MessageDecoder.MESSAGE_MAGIC_CODE_V2) { + boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover(); + boolean checkDupInfo = this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable(); + + DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo); + + if (!dispatchRequest.isSuccess()) { return false; } - int sysFlag = byteBuffer.getInt(MessageDecoder.SYSFLAG_POSITION); - int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20; - int msgStoreTimePos = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + bornHostLength; - long storeTimestamp = byteBuffer.getLong(msgStoreTimePos); - if (0 == storeTimestamp) { + long storeTimestamp = dispatchRequest.getStoreTimestamp(); + long phyOffset = dispatchRequest.getCommitLogOffset(); + + if (0 == dispatchRequest.getStoreTimestamp()) { return false; } - long phyOffset = byteBuffer.getLong(MessageDecoder.MESSAGE_PHYSIC_OFFSET_POSITION); if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable() && this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) { diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 60f6a90381c..a142eca86fb 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -464,6 +464,13 @@ public class MessageStoreConfig { private long rocksdbWalFileRollingThreshold = SizeUnit.GB; + /** + * Note: For correctness, this switch should be enabled only if the previous startup was configured with SYNC_FLUSH + * and the storeType was defaultRocksDB. This switch is not recommended for normal use cases (include master-slave + * or controller mode). + */ + private boolean enableAcceleratedRecovery = false; + public String getRocksdbCompressionType() { return rocksdbCompressionType; } @@ -2008,7 +2015,16 @@ public boolean isEnableLogConsumeQueueRepeatedlyBuildWhenRecover() { return enableLogConsumeQueueRepeatedlyBuildWhenRecover; } - public void setEnableLogConsumeQueueRepeatedlyBuildWhenRecover(boolean enableLogConsumeQueueRepeatedlyBuildWhenRecover) { + public void setEnableLogConsumeQueueRepeatedlyBuildWhenRecover( + boolean enableLogConsumeQueueRepeatedlyBuildWhenRecover) { this.enableLogConsumeQueueRepeatedlyBuildWhenRecover = enableLogConsumeQueueRepeatedlyBuildWhenRecover; } + + public boolean isEnableAcceleratedRecovery() { + return enableAcceleratedRecovery; + } + + public void setEnableAcceleratedRecovery(boolean enableAcceleratedRecovery) { + this.enableAcceleratedRecovery = enableAcceleratedRecovery; + } } diff --git a/store/src/test/java/org/apache/rocketmq/store/AcceleratedRecoveryTest.java b/store/src/test/java/org/apache/rocketmq/store/AcceleratedRecoveryTest.java new file mode 100644 index 00000000000..71c4ed7d103 --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/AcceleratedRecoveryTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.MessageExtBrokerInner; +import org.apache.rocketmq.store.config.FlushDiskType; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class AcceleratedRecoveryTest { + private final String storeMessage = "Once, there was a chance for me!"; + private final String messageTopic = "FooBar"; + private int queueTotal = 100; + private AtomicInteger queueId = new AtomicInteger(0); + private SocketAddress bornHost; + private SocketAddress storeHost; + private byte[] messageBody; + private MessageStore messageStore; + private String storePathRootDir; + + @Before + public void init() throws Exception { + storeHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123); + bornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0); + messageBody = storeMessage.getBytes(); + + UUID uuid = UUID.randomUUID(); + storePathRootDir = System.getProperty("java.io.tmpdir") + File.separator + "accelerated-recovery-test-" + uuid.toString(); + } + + @After + public void destroy() { + if (messageStore != null) { + messageStore.shutdown(); + messageStore.destroy(); + } + + File file = new File(storePathRootDir); + UtilAll.deleteFile(file); + } + + @Test + public void testAcceleratedRecoveryConfigurationEnabled() { + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setEnableAcceleratedRecovery(true); + + assertTrue("Accelerated recovery should be enabled", messageStoreConfig.isEnableAcceleratedRecovery()); + } + + @Test + public void testAcceleratedRecoveryConfigurationDisabled() { + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + // Default should be false + assertThat(messageStoreConfig.isEnableAcceleratedRecovery()).isFalse(); + + messageStoreConfig.setEnableAcceleratedRecovery(false); + assertThat(messageStoreConfig.isEnableAcceleratedRecovery()).isFalse(); + } + + @Test + public void testAcceleratedRecoveryWithRocksDBStore() throws Exception { + MessageStoreConfig messageStoreConfig = buildAcceleratedRecoveryConfig(); + messageStoreConfig.setStoreType(StoreType.DEFAULT_ROCKSDB.getStoreType()); + messageStoreConfig.setEnableAcceleratedRecovery(true); + + messageStore = new DefaultMessageStore(messageStoreConfig, + new BrokerStatsManager("acceleratedRecoveryTest", true), + new MyMessageArrivingListener(), + new BrokerConfig(), new ConcurrentHashMap<>()); + + boolean load = messageStore.load(); + assertTrue("Message store should load successfully", load); + messageStore.start(); + + // Put some messages + for (int i = 0; i < 10; i++) { + MessageExtBrokerInner message = buildMessage(); + PutMessageResult result = messageStore.putMessage(message); + assertThat(result.getPutMessageStatus()).isEqualTo(PutMessageStatus.PUT_OK); + } + + // Wait for commit log reput + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); + + // Just verify the store is working properly + assertThat(messageStore.getMinOffsetInQueue(messageTopic, 0)).isGreaterThanOrEqualTo(0); + assertThat(messageStore.getMaxOffsetInQueue(messageTopic, 0)).isGreaterThanOrEqualTo(0); + } + + @Test + public void testAcceleratedRecoveryWithNormalStore() throws Exception { + MessageStoreConfig messageStoreConfig = buildAcceleratedRecoveryConfig(); + messageStoreConfig.setStoreType(StoreType.DEFAULT.getStoreType()); + messageStoreConfig.setEnableAcceleratedRecovery(true); + + messageStore = new DefaultMessageStore(messageStoreConfig, + new BrokerStatsManager("acceleratedRecoveryTest", true), + new MyMessageArrivingListener(), + new BrokerConfig(), new ConcurrentHashMap<>()); + + boolean load = messageStore.load(); + assertTrue("Message store should load successfully", load); + messageStore.start(); + + // Put some messages + for (int i = 0; i < 10; i++) { + MessageExtBrokerInner message = buildMessage(); + PutMessageResult result = messageStore.putMessage(message); + assertThat(result.getPutMessageStatus()).isEqualTo(PutMessageStatus.PUT_OK); + } + + // Wait for commit log reput + StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); + + // Just verify the store is working properly + assertThat(messageStore.getMinOffsetInQueue(messageTopic, 0)).isGreaterThanOrEqualTo(0); + assertThat(messageStore.getMaxOffsetInQueue(messageTopic, 0)).isGreaterThanOrEqualTo(0); + } + + @Test + public void testAcceleratedRecoveryFallbackProtection() throws Exception { + MessageStoreConfig messageStoreConfig = buildAcceleratedRecoveryConfig(); + messageStoreConfig.setStoreType(StoreType.DEFAULT_ROCKSDB.getStoreType()); + messageStoreConfig.setEnableAcceleratedRecovery(true); + + messageStore = new DefaultMessageStore(messageStoreConfig, + new BrokerStatsManager("acceleratedRecoveryTest", true), + new MyMessageArrivingListener(), + new BrokerConfig(), new ConcurrentHashMap<>()); + + boolean load = messageStore.load(); + assertTrue("Message store should load successfully", load); + messageStore.start(); + + // Test the fallback protection logic by verifying the store can start normally + // even with accelerated recovery enabled + assertThat(messageStore.isOSPageCacheBusy()).isFalse(); + assertThat(messageStore.dispatchBehindBytes()).isEqualTo(0); + } + + private MessageStoreConfig buildAcceleratedRecoveryConfig() { + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setMappedFileSizeCommitLog(1024 * 1024 * 10); + messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 1024 * 10); + messageStoreConfig.setMaxHashSlotNum(10000); + messageStoreConfig.setMaxIndexNum(100 * 100); + messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH); + messageStoreConfig.setFlushIntervalConsumeQueue(1); + messageStoreConfig.setHaListenPort(0); + messageStoreConfig.setStorePathRootDir(storePathRootDir); + return messageStoreConfig; + } + + private MessageExtBrokerInner buildMessage() { + MessageExtBrokerInner msg = new MessageExtBrokerInner(); + msg.setTopic(messageTopic); + msg.setTags("TAG1"); + msg.setKeys("Hello"); + msg.setBody(messageBody); + msg.setKeys(String.valueOf(System.currentTimeMillis())); + msg.setQueueId(queueId.getAndIncrement() % queueTotal); + msg.setSysFlag(0); + msg.setBornTimestamp(System.currentTimeMillis()); + msg.setStoreHost(storeHost); + msg.setBornHost(bornHost); + msg.setPropertiesString("TAGS=TAG1"); + return msg; + } + + class MyMessageArrivingListener implements MessageArrivingListener { + @Override + public void arriving(String topic, int queueId, long logicOffset, long tagsCode, + long msgStoreTime, byte[] filterBitMap, Map properties) { + } + } +} \ No newline at end of file From fa1d38ed296146e9cc5fd406d505585e2e87e5c9 Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Wed, 3 Sep 2025 19:16:13 +0800 Subject: [PATCH 2/4] Fix the issue of accelerated startup failure --- .../java/org/apache/rocketmq/store/CommitLog.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index d8a8bfae98d..5cb7fefa64a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -353,16 +353,17 @@ public void recoverNormally(long dispatchFromPhyOffset) throws RocksDBException MappedFile mappedFile = mappedFiles.get(index); ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); long processOffset = mappedFile.getFileFromOffset(); - long mappedFileOffset; + long mappedFileOffset = 0; long lastValidMsgPhyOffset = this.getConfirmOffset(); if (defaultMessageStore.getMessageStoreConfig().isEnableRocksDBStore() && defaultMessageStore.getMessageStoreConfig().isEnableAcceleratedRecovery()) { mappedFileOffset = dispatchFromPhyOffset - mappedFile.getFileFromOffset(); - lastValidMsgPhyOffset = dispatchFromPhyOffset; - byteBuffer.position((int) mappedFileOffset); - } else { - mappedFileOffset = 0; + if (mappedFileOffset > 0) { + log.info("recover using acceleration, recovery offset is {}", dispatchFromPhyOffset); + lastValidMsgPhyOffset = dispatchFromPhyOffset; + byteBuffer.position((int) mappedFileOffset); + } } while (true) { DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo); From e5e090582dff634c3196fb0f756fb04a14b91359 Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Wed, 3 Sep 2025 19:18:54 +0800 Subject: [PATCH 3/4] refactor: Remove problematic unit test Remove AcceleratedRecoveryTest.java as the test implementation was not appropriate for the accelerated startup recovery feature. --- .../store/AcceleratedRecoveryTest.java | 211 ------------------ 1 file changed, 211 deletions(-) delete mode 100644 store/src/test/java/org/apache/rocketmq/store/AcceleratedRecoveryTest.java diff --git a/store/src/test/java/org/apache/rocketmq/store/AcceleratedRecoveryTest.java b/store/src/test/java/org/apache/rocketmq/store/AcceleratedRecoveryTest.java deleted file mode 100644 index 71c4ed7d103..00000000000 --- a/store/src/test/java/org/apache/rocketmq/store/AcceleratedRecoveryTest.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.store; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.rocketmq.common.BrokerConfig; -import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.message.MessageExtBrokerInner; -import org.apache.rocketmq.store.config.FlushDiskType; -import org.apache.rocketmq.store.config.MessageStoreConfig; -import org.apache.rocketmq.store.stats.BrokerStatsManager; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.junit.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class AcceleratedRecoveryTest { - private final String storeMessage = "Once, there was a chance for me!"; - private final String messageTopic = "FooBar"; - private int queueTotal = 100; - private AtomicInteger queueId = new AtomicInteger(0); - private SocketAddress bornHost; - private SocketAddress storeHost; - private byte[] messageBody; - private MessageStore messageStore; - private String storePathRootDir; - - @Before - public void init() throws Exception { - storeHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123); - bornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0); - messageBody = storeMessage.getBytes(); - - UUID uuid = UUID.randomUUID(); - storePathRootDir = System.getProperty("java.io.tmpdir") + File.separator + "accelerated-recovery-test-" + uuid.toString(); - } - - @After - public void destroy() { - if (messageStore != null) { - messageStore.shutdown(); - messageStore.destroy(); - } - - File file = new File(storePathRootDir); - UtilAll.deleteFile(file); - } - - @Test - public void testAcceleratedRecoveryConfigurationEnabled() { - MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); - messageStoreConfig.setEnableAcceleratedRecovery(true); - - assertTrue("Accelerated recovery should be enabled", messageStoreConfig.isEnableAcceleratedRecovery()); - } - - @Test - public void testAcceleratedRecoveryConfigurationDisabled() { - MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); - // Default should be false - assertThat(messageStoreConfig.isEnableAcceleratedRecovery()).isFalse(); - - messageStoreConfig.setEnableAcceleratedRecovery(false); - assertThat(messageStoreConfig.isEnableAcceleratedRecovery()).isFalse(); - } - - @Test - public void testAcceleratedRecoveryWithRocksDBStore() throws Exception { - MessageStoreConfig messageStoreConfig = buildAcceleratedRecoveryConfig(); - messageStoreConfig.setStoreType(StoreType.DEFAULT_ROCKSDB.getStoreType()); - messageStoreConfig.setEnableAcceleratedRecovery(true); - - messageStore = new DefaultMessageStore(messageStoreConfig, - new BrokerStatsManager("acceleratedRecoveryTest", true), - new MyMessageArrivingListener(), - new BrokerConfig(), new ConcurrentHashMap<>()); - - boolean load = messageStore.load(); - assertTrue("Message store should load successfully", load); - messageStore.start(); - - // Put some messages - for (int i = 0; i < 10; i++) { - MessageExtBrokerInner message = buildMessage(); - PutMessageResult result = messageStore.putMessage(message); - assertThat(result.getPutMessageStatus()).isEqualTo(PutMessageStatus.PUT_OK); - } - - // Wait for commit log reput - StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); - - // Just verify the store is working properly - assertThat(messageStore.getMinOffsetInQueue(messageTopic, 0)).isGreaterThanOrEqualTo(0); - assertThat(messageStore.getMaxOffsetInQueue(messageTopic, 0)).isGreaterThanOrEqualTo(0); - } - - @Test - public void testAcceleratedRecoveryWithNormalStore() throws Exception { - MessageStoreConfig messageStoreConfig = buildAcceleratedRecoveryConfig(); - messageStoreConfig.setStoreType(StoreType.DEFAULT.getStoreType()); - messageStoreConfig.setEnableAcceleratedRecovery(true); - - messageStore = new DefaultMessageStore(messageStoreConfig, - new BrokerStatsManager("acceleratedRecoveryTest", true), - new MyMessageArrivingListener(), - new BrokerConfig(), new ConcurrentHashMap<>()); - - boolean load = messageStore.load(); - assertTrue("Message store should load successfully", load); - messageStore.start(); - - // Put some messages - for (int i = 0; i < 10; i++) { - MessageExtBrokerInner message = buildMessage(); - PutMessageResult result = messageStore.putMessage(message); - assertThat(result.getPutMessageStatus()).isEqualTo(PutMessageStatus.PUT_OK); - } - - // Wait for commit log reput - StoreTestUtil.waitCommitLogReput((DefaultMessageStore) messageStore); - - // Just verify the store is working properly - assertThat(messageStore.getMinOffsetInQueue(messageTopic, 0)).isGreaterThanOrEqualTo(0); - assertThat(messageStore.getMaxOffsetInQueue(messageTopic, 0)).isGreaterThanOrEqualTo(0); - } - - @Test - public void testAcceleratedRecoveryFallbackProtection() throws Exception { - MessageStoreConfig messageStoreConfig = buildAcceleratedRecoveryConfig(); - messageStoreConfig.setStoreType(StoreType.DEFAULT_ROCKSDB.getStoreType()); - messageStoreConfig.setEnableAcceleratedRecovery(true); - - messageStore = new DefaultMessageStore(messageStoreConfig, - new BrokerStatsManager("acceleratedRecoveryTest", true), - new MyMessageArrivingListener(), - new BrokerConfig(), new ConcurrentHashMap<>()); - - boolean load = messageStore.load(); - assertTrue("Message store should load successfully", load); - messageStore.start(); - - // Test the fallback protection logic by verifying the store can start normally - // even with accelerated recovery enabled - assertThat(messageStore.isOSPageCacheBusy()).isFalse(); - assertThat(messageStore.dispatchBehindBytes()).isEqualTo(0); - } - - private MessageStoreConfig buildAcceleratedRecoveryConfig() { - MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); - messageStoreConfig.setMappedFileSizeCommitLog(1024 * 1024 * 10); - messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 1024 * 10); - messageStoreConfig.setMaxHashSlotNum(10000); - messageStoreConfig.setMaxIndexNum(100 * 100); - messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH); - messageStoreConfig.setFlushIntervalConsumeQueue(1); - messageStoreConfig.setHaListenPort(0); - messageStoreConfig.setStorePathRootDir(storePathRootDir); - return messageStoreConfig; - } - - private MessageExtBrokerInner buildMessage() { - MessageExtBrokerInner msg = new MessageExtBrokerInner(); - msg.setTopic(messageTopic); - msg.setTags("TAG1"); - msg.setKeys("Hello"); - msg.setBody(messageBody); - msg.setKeys(String.valueOf(System.currentTimeMillis())); - msg.setQueueId(queueId.getAndIncrement() % queueTotal); - msg.setSysFlag(0); - msg.setBornTimestamp(System.currentTimeMillis()); - msg.setStoreHost(storeHost); - msg.setBornHost(bornHost); - msg.setPropertiesString("TAGS=TAG1"); - return msg; - } - - class MyMessageArrivingListener implements MessageArrivingListener { - @Override - public void arriving(String topic, int queueId, long logicOffset, long tagsCode, - long msgStoreTime, byte[] filterBitMap, Map properties) { - } - } -} \ No newline at end of file From 8deb3c1221a092624c1eaace6c72897ab51983ae Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Thu, 4 Sep 2025 14:19:04 +0800 Subject: [PATCH 4/4] Delete useless code --- .../java/org/apache/rocketmq/store/CommitLog.java | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 5cb7fefa64a..269499eba9e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -174,17 +174,6 @@ public boolean load() { return result; } - public void cleanResourceAll() { - // Clean all mapped file resources - if (mappedFileQueue != null) { - for (MappedFile mappedFile : mappedFileQueue.getMappedFiles()) { - if (mappedFile instanceof ReferenceResource) { - ((ReferenceResource) mappedFile).cleanup(0); - } - } - } - } - public void start() { this.flushManager.start(); log.info("start commitLog successfully. storeRoot: {}", this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());