diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 2fdd2450e61..269499eba9e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -344,6 +344,16 @@ public void recoverNormally(long dispatchFromPhyOffset) throws RocksDBException long processOffset = mappedFile.getFileFromOffset(); long mappedFileOffset = 0; long lastValidMsgPhyOffset = this.getConfirmOffset(); + + if (defaultMessageStore.getMessageStoreConfig().isEnableRocksDBStore() + && defaultMessageStore.getMessageStoreConfig().isEnableAcceleratedRecovery()) { + mappedFileOffset = dispatchFromPhyOffset - mappedFile.getFileFromOffset(); + if (mappedFileOffset > 0) { + log.info("recover using acceleration, recovery offset is {}", dispatchFromPhyOffset); + lastValidMsgPhyOffset = dispatchFromPhyOffset; + byteBuffer.position((int) mappedFileOffset); + } + } while (true) { DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo); int size = dispatchRequest.getMsgSize(); @@ -728,9 +738,29 @@ public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBExc ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); long processOffset = mappedFile.getFileFromOffset(); - long mappedFileOffset = 0; - long lastValidMsgPhyOffset = processOffset; - long lastConfirmValidMsgPhyOffset = processOffset; + long mappedFileOffset; + long lastValidMsgPhyOffset; + long lastConfirmValidMsgPhyOffset; + + if (defaultMessageStore.getMessageStoreConfig().isEnableRocksDBStore() + && defaultMessageStore.getMessageStoreConfig().isEnableAcceleratedRecovery()) { + mappedFileOffset = maxPhyOffsetOfConsumeQueue - mappedFile.getFileFromOffset(); + // Protective measures, falling back to non-accelerated mode, which is extremely unlikely to occur + if (mappedFileOffset < 0) { + mappedFileOffset = 0; + lastValidMsgPhyOffset = processOffset; + lastConfirmValidMsgPhyOffset = processOffset; + } else { + log.info("recover using acceleration, recovery offset is {}", maxPhyOffsetOfConsumeQueue); + lastValidMsgPhyOffset = maxPhyOffsetOfConsumeQueue; + lastConfirmValidMsgPhyOffset = maxPhyOffsetOfConsumeQueue; + byteBuffer.position((int) mappedFileOffset); + } + } else { + mappedFileOffset = 0; + lastValidMsgPhyOffset = processOffset; + lastConfirmValidMsgPhyOffset = processOffset; + } // abnormal recover require dispatching boolean doDispatch = true; while (true) { @@ -840,19 +870,21 @@ private boolean isMappedFileMatchedRecover(final MappedFile mappedFile, boolean recoverNormally) throws RocksDBException { ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); - int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSITION); - if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE && magicCode != MessageDecoder.MESSAGE_MAGIC_CODE_V2) { + boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover(); + boolean checkDupInfo = this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable(); + + DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo); + + if (!dispatchRequest.isSuccess()) { return false; } - int sysFlag = byteBuffer.getInt(MessageDecoder.SYSFLAG_POSITION); - int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20; - int msgStoreTimePos = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + bornHostLength; - long storeTimestamp = byteBuffer.getLong(msgStoreTimePos); - if (0 == storeTimestamp) { + long storeTimestamp = dispatchRequest.getStoreTimestamp(); + long phyOffset = dispatchRequest.getCommitLogOffset(); + + if (0 == dispatchRequest.getStoreTimestamp()) { return false; } - long phyOffset = byteBuffer.getLong(MessageDecoder.MESSAGE_PHYSIC_OFFSET_POSITION); if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable() && this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) { diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 60f6a90381c..a142eca86fb 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -464,6 +464,13 @@ public class MessageStoreConfig { private long rocksdbWalFileRollingThreshold = SizeUnit.GB; + /** + * Note: For correctness, this switch should be enabled only if the previous startup was configured with SYNC_FLUSH + * and the storeType was defaultRocksDB. This switch is not recommended for normal use cases (include master-slave + * or controller mode). + */ + private boolean enableAcceleratedRecovery = false; + public String getRocksdbCompressionType() { return rocksdbCompressionType; } @@ -2008,7 +2015,16 @@ public boolean isEnableLogConsumeQueueRepeatedlyBuildWhenRecover() { return enableLogConsumeQueueRepeatedlyBuildWhenRecover; } - public void setEnableLogConsumeQueueRepeatedlyBuildWhenRecover(boolean enableLogConsumeQueueRepeatedlyBuildWhenRecover) { + public void setEnableLogConsumeQueueRepeatedlyBuildWhenRecover( + boolean enableLogConsumeQueueRepeatedlyBuildWhenRecover) { this.enableLogConsumeQueueRepeatedlyBuildWhenRecover = enableLogConsumeQueueRepeatedlyBuildWhenRecover; } + + public boolean isEnableAcceleratedRecovery() { + return enableAcceleratedRecovery; + } + + public void setEnableAcceleratedRecovery(boolean enableAcceleratedRecovery) { + this.enableAcceleratedRecovery = enableAcceleratedRecovery; + } }