Skip to content

Commit 86e8ed6

Browse files
authored
[ISSUE #9666] Add accelerated startup recovery feature for RocksDB store with SYNC_FLUSH (#9667)
* feat: Add accelerated startup recovery feature Add accelerated startup recovery functionality when using RocksDB store with SYNC_FLUSH configuration: - Add enableAcceleratedRecovery configuration option in MessageStoreConfig - Implement accelerated recovery logic in CommitLog for both normal and abnormal recovery - Add protective fallback mechanism to handle edge cases - Improve isMappedFileMatchedRecover method for better robustness - Add comprehensive unit tests for the accelerated recovery feature This feature significantly reduces startup time when recovering from RocksDB-based storage with synchronous flushing enabled, while maintaining data consistency and safety. * Fix the issue of accelerated startup failure * refactor: Remove problematic unit test Remove AcceleratedRecoveryTest.java as the test implementation was not appropriate for the accelerated startup recovery feature. * Delete useless code
1 parent 40622ce commit 86e8ed6

2 files changed

Lines changed: 60 additions & 12 deletions

File tree

store/src/main/java/org/apache/rocketmq/store/CommitLog.java

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,16 @@ public void recoverNormally(long dispatchFromPhyOffset) throws RocksDBException
344344
long processOffset = mappedFile.getFileFromOffset();
345345
long mappedFileOffset = 0;
346346
long lastValidMsgPhyOffset = this.getConfirmOffset();
347+
348+
if (defaultMessageStore.getMessageStoreConfig().isEnableRocksDBStore()
349+
&& defaultMessageStore.getMessageStoreConfig().isEnableAcceleratedRecovery()) {
350+
mappedFileOffset = dispatchFromPhyOffset - mappedFile.getFileFromOffset();
351+
if (mappedFileOffset > 0) {
352+
log.info("recover using acceleration, recovery offset is {}", dispatchFromPhyOffset);
353+
lastValidMsgPhyOffset = dispatchFromPhyOffset;
354+
byteBuffer.position((int) mappedFileOffset);
355+
}
356+
}
347357
while (true) {
348358
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo);
349359
int size = dispatchRequest.getMsgSize();
@@ -728,9 +738,29 @@ public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBExc
728738

729739
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
730740
long processOffset = mappedFile.getFileFromOffset();
731-
long mappedFileOffset = 0;
732-
long lastValidMsgPhyOffset = processOffset;
733-
long lastConfirmValidMsgPhyOffset = processOffset;
741+
long mappedFileOffset;
742+
long lastValidMsgPhyOffset;
743+
long lastConfirmValidMsgPhyOffset;
744+
745+
if (defaultMessageStore.getMessageStoreConfig().isEnableRocksDBStore()
746+
&& defaultMessageStore.getMessageStoreConfig().isEnableAcceleratedRecovery()) {
747+
mappedFileOffset = maxPhyOffsetOfConsumeQueue - mappedFile.getFileFromOffset();
748+
// Protective measures, falling back to non-accelerated mode, which is extremely unlikely to occur
749+
if (mappedFileOffset < 0) {
750+
mappedFileOffset = 0;
751+
lastValidMsgPhyOffset = processOffset;
752+
lastConfirmValidMsgPhyOffset = processOffset;
753+
} else {
754+
log.info("recover using acceleration, recovery offset is {}", maxPhyOffsetOfConsumeQueue);
755+
lastValidMsgPhyOffset = maxPhyOffsetOfConsumeQueue;
756+
lastConfirmValidMsgPhyOffset = maxPhyOffsetOfConsumeQueue;
757+
byteBuffer.position((int) mappedFileOffset);
758+
}
759+
} else {
760+
mappedFileOffset = 0;
761+
lastValidMsgPhyOffset = processOffset;
762+
lastConfirmValidMsgPhyOffset = processOffset;
763+
}
734764
// abnormal recover require dispatching
735765
boolean doDispatch = true;
736766
while (true) {
@@ -840,19 +870,21 @@ private boolean isMappedFileMatchedRecover(final MappedFile mappedFile,
840870
boolean recoverNormally) throws RocksDBException {
841871
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
842872

843-
int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSITION);
844-
if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE && magicCode != MessageDecoder.MESSAGE_MAGIC_CODE_V2) {
873+
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
874+
boolean checkDupInfo = this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable();
875+
876+
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo);
877+
878+
if (!dispatchRequest.isSuccess()) {
845879
return false;
846880
}
847881

848-
int sysFlag = byteBuffer.getInt(MessageDecoder.SYSFLAG_POSITION);
849-
int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
850-
int msgStoreTimePos = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + bornHostLength;
851-
long storeTimestamp = byteBuffer.getLong(msgStoreTimePos);
852-
if (0 == storeTimestamp) {
882+
long storeTimestamp = dispatchRequest.getStoreTimestamp();
883+
long phyOffset = dispatchRequest.getCommitLogOffset();
884+
885+
if (0 == dispatchRequest.getStoreTimestamp()) {
853886
return false;
854887
}
855-
long phyOffset = byteBuffer.getLong(MessageDecoder.MESSAGE_PHYSIC_OFFSET_POSITION);
856888

857889
if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable() &&
858890
this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {

store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,13 @@ public class MessageStoreConfig {
464464

465465
private long rocksdbWalFileRollingThreshold = SizeUnit.GB;
466466

467+
/**
468+
* Note: For correctness, this switch should be enabled only if the previous startup was configured with SYNC_FLUSH
469+
* and the storeType was defaultRocksDB. This switch is not recommended for normal use cases (include master-slave
470+
* or controller mode).
471+
*/
472+
private boolean enableAcceleratedRecovery = false;
473+
467474
public String getRocksdbCompressionType() {
468475
return rocksdbCompressionType;
469476
}
@@ -2008,7 +2015,16 @@ public boolean isEnableLogConsumeQueueRepeatedlyBuildWhenRecover() {
20082015
return enableLogConsumeQueueRepeatedlyBuildWhenRecover;
20092016
}
20102017

2011-
public void setEnableLogConsumeQueueRepeatedlyBuildWhenRecover(boolean enableLogConsumeQueueRepeatedlyBuildWhenRecover) {
2018+
public void setEnableLogConsumeQueueRepeatedlyBuildWhenRecover(
2019+
boolean enableLogConsumeQueueRepeatedlyBuildWhenRecover) {
20122020
this.enableLogConsumeQueueRepeatedlyBuildWhenRecover = enableLogConsumeQueueRepeatedlyBuildWhenRecover;
20132021
}
2022+
2023+
public boolean isEnableAcceleratedRecovery() {
2024+
return enableAcceleratedRecovery;
2025+
}
2026+
2027+
public void setEnableAcceleratedRecovery(boolean enableAcceleratedRecovery) {
2028+
this.enableAcceleratedRecovery = enableAcceleratedRecovery;
2029+
}
20142030
}

0 commit comments

Comments
 (0)