Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,7 @@ public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBExc
// recover by the minimum time stamp
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
boolean checkDupInfo = this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable();
boolean checkCommitLogOffsetOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCommitLogOffsetOnRecover();
int maxRecoverNum = this.defaultMessageStore.getMessageStoreConfig().getCommitLogRecoverMaxNum();
if (maxRecoverNum <= 0) {
maxRecoverNum = 10;
Expand Down Expand Up @@ -792,8 +793,18 @@ public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBExc
while (true) {
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo);
int size = dispatchRequest.getMsgSize();

if (dispatchRequest.isSuccess()) {
// Check commitlog offset validity if enabled
if (checkCommitLogOffsetOnRecover) {
if (dispatchRequest.getCommitLogOffset() < mappedFile.getFileFromOffset()
|| dispatchRequest.getCommitLogOffset() > mappedFile.getFileFromOffset() + mappedFile.getFileSize()) {
log.warn("found illegal commitlog offset {} in {}, current pos={}, it will be truncated.",
dispatchRequest.getCommitLogOffset(), mappedFile.getFileName(), processOffset + mappedFileOffset);
log.info("recover physics file end, {} pos={}", mappedFile.getFileName(), byteBuffer.position());

break;
}
}
// Normal data
if (size > 0) {
lastValidMsgPhyOffset = processOffset + mappedFileOffset;
Expand Down Expand Up @@ -925,7 +936,8 @@ private boolean isMappedFileMatchedRecover(final MappedFile mappedFile,
return isMappedFileMatchedRecover(phyOffset, storeTimestamp, recoverNormally);
}

private boolean isMappedFileMatchedRecover(long phyOffset, long storeTimestamp, boolean recoverNormally) throws RocksDBException {
private boolean isMappedFileMatchedRecover(long phyOffset, long storeTimestamp,
boolean recoverNormally) throws RocksDBException {
boolean result = this.defaultMessageStore.getQueueStore().isMappedFileMatchedRecover(phyOffset, storeTimestamp, recoverNormally);
if (null != this.defaultMessageStore.getTransMessageRocksDBStore() && defaultMessageStore.getMessageStoreConfig().isTransRocksDBEnable() && !defaultMessageStore.getMessageStoreConfig().isTransWriteOriginTransHalfEnable()) {
result = result && this.defaultMessageStore.getTransMessageRocksDBStore().isMappedFileMatchedRecover(phyOffset);
Expand Down Expand Up @@ -2386,7 +2398,7 @@ public void run() {
long costTime = this.systemClock.now() - beginClockTimestamp;
log.info("[{}] scanFilesInPageCache-cost {} ms.", costTime > 30 * 1000 ? "NOTIFYME" : "OK", costTime);
} catch (Throwable e) {
log.warn("{} service has e: ", this.getServiceName() , e);
log.warn("{} service has e: ", this.getServiceName(), e);
}
}
log.info("{} service end", this.getServiceName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ public class MessageStoreConfig {
// This ensures no on-the-wire or on-disk corruption to the messages occurred.
// This check adds some overhead,so it may be disabled in cases seeking extreme performance.
private boolean checkCRCOnRecover = true;
// Whether check the commitlog offset validity during abnormal recovery.
// This helps detect and truncate old file data that may pass CRC checks but contains invalid offsets.
private boolean checkCommitLogOffsetOnRecover = false;
// How many pages are to be flushed when flush CommitLog
private int flushCommitLogLeastPages = 4;
// How many pages are to be committed when commit data to file
Expand Down Expand Up @@ -795,6 +798,14 @@ public void setCheckCRCOnRecover(boolean checkCRCOnRecover) {
this.checkCRCOnRecover = checkCRCOnRecover;
}

public boolean isCheckCommitLogOffsetOnRecover() {
return checkCommitLogOffsetOnRecover;
}

public void setCheckCommitLogOffsetOnRecover(boolean checkCommitLogOffsetOnRecover) {
this.checkCommitLogOffsetOnRecover = checkCommitLogOffsetOnRecover;
}

public boolean isForceVerifyPropCRC() {
return forceVerifyPropCRC;
}
Expand Down
Loading