diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 286f31cd4a7..3b92f1a745b 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -738,6 +738,7 @@ public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBExc // recover by the minimum time stamp boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover(); boolean checkDupInfo = this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable(); + boolean checkCommitLogOffsetOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCommitLogOffsetOnRecover(); int maxRecoverNum = this.defaultMessageStore.getMessageStoreConfig().getCommitLogRecoverMaxNum(); if (maxRecoverNum <= 0) { maxRecoverNum = 10; @@ -792,8 +793,18 @@ public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBExc while (true) { DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo); int size = dispatchRequest.getMsgSize(); - if (dispatchRequest.isSuccess()) { + // Check commitlog offset validity if enabled + if (checkCommitLogOffsetOnRecover) { + if (dispatchRequest.getCommitLogOffset() < mappedFile.getFileFromOffset() + || dispatchRequest.getCommitLogOffset() > mappedFile.getFileFromOffset() + mappedFile.getFileSize()) { + log.warn("found illegal commitlog offset {} in {}, current pos={}, it will be truncated.", + dispatchRequest.getCommitLogOffset(), mappedFile.getFileName(), processOffset + mappedFileOffset); + log.info("recover physics file end, {} pos={}", mappedFile.getFileName(), byteBuffer.position()); + + break; + } + } // Normal data if (size > 0) { lastValidMsgPhyOffset = processOffset + mappedFileOffset; @@ -925,7 +936,8 @@ private boolean isMappedFileMatchedRecover(final MappedFile mappedFile, return isMappedFileMatchedRecover(phyOffset, storeTimestamp, recoverNormally); } - private boolean isMappedFileMatchedRecover(long phyOffset, long storeTimestamp, boolean recoverNormally) throws RocksDBException { + private boolean isMappedFileMatchedRecover(long phyOffset, long storeTimestamp, + boolean recoverNormally) throws RocksDBException { boolean result = this.defaultMessageStore.getQueueStore().isMappedFileMatchedRecover(phyOffset, storeTimestamp, recoverNormally); if (null != this.defaultMessageStore.getTransMessageRocksDBStore() && defaultMessageStore.getMessageStoreConfig().isTransRocksDBEnable() && !defaultMessageStore.getMessageStoreConfig().isTransWriteOriginTransHalfEnable()) { result = result && this.defaultMessageStore.getTransMessageRocksDBStore().isMappedFileMatchedRecover(phyOffset); @@ -2386,7 +2398,7 @@ public void run() { long costTime = this.systemClock.now() - beginClockTimestamp; log.info("[{}] scanFilesInPageCache-cost {} ms.", costTime > 30 * 1000 ? "NOTIFYME" : "OK", costTime); } catch (Throwable e) { - log.warn("{} service has e: ", this.getServiceName() , e); + log.warn("{} service has e: ", this.getServiceName(), e); } } log.info("{} service end", this.getServiceName()); diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index ad773192644..318cea3f76a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -194,6 +194,9 @@ public class MessageStoreConfig { // This ensures no on-the-wire or on-disk corruption to the messages occurred. // This check adds some overhead,so it may be disabled in cases seeking extreme performance. private boolean checkCRCOnRecover = true; + // Whether check the commitlog offset validity during abnormal recovery. + // This helps detect and truncate old file data that may pass CRC checks but contains invalid offsets. + private boolean checkCommitLogOffsetOnRecover = false; // How many pages are to be flushed when flush CommitLog private int flushCommitLogLeastPages = 4; // How many pages are to be committed when commit data to file @@ -795,6 +798,14 @@ public void setCheckCRCOnRecover(boolean checkCRCOnRecover) { this.checkCRCOnRecover = checkCRCOnRecover; } + public boolean isCheckCommitLogOffsetOnRecover() { + return checkCommitLogOffsetOnRecover; + } + + public void setCheckCommitLogOffsetOnRecover(boolean checkCommitLogOffsetOnRecover) { + this.checkCommitLogOffsetOnRecover = checkCommitLogOffsetOnRecover; + } + public boolean isForceVerifyPropCRC() { return forceVerifyPropCRC; }