Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 43 additions & 11 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,16 @@ public void recoverNormally(long dispatchFromPhyOffset) throws RocksDBException
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
long lastValidMsgPhyOffset = this.getConfirmOffset();

if (defaultMessageStore.getMessageStoreConfig().isEnableRocksDBStore()
&& defaultMessageStore.getMessageStoreConfig().isEnableAcceleratedRecovery()) {
mappedFileOffset = dispatchFromPhyOffset - mappedFile.getFileFromOffset();
if (mappedFileOffset > 0) {
log.info("recover using acceleration, recovery offset is {}", dispatchFromPhyOffset);
lastValidMsgPhyOffset = dispatchFromPhyOffset;
byteBuffer.position((int) mappedFileOffset);
}
}
while (true) {
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo);
int size = dispatchRequest.getMsgSize();
Expand Down Expand Up @@ -728,9 +738,29 @@ public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBExc

ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
long lastValidMsgPhyOffset = processOffset;
long lastConfirmValidMsgPhyOffset = processOffset;
long mappedFileOffset;
long lastValidMsgPhyOffset;
long lastConfirmValidMsgPhyOffset;

if (defaultMessageStore.getMessageStoreConfig().isEnableRocksDBStore()
&& defaultMessageStore.getMessageStoreConfig().isEnableAcceleratedRecovery()) {
mappedFileOffset = maxPhyOffsetOfConsumeQueue - mappedFile.getFileFromOffset();
// Protective measures, falling back to non-accelerated mode, which is extremely unlikely to occur
if (mappedFileOffset < 0) {
mappedFileOffset = 0;
lastValidMsgPhyOffset = processOffset;
lastConfirmValidMsgPhyOffset = processOffset;
} else {
log.info("recover using acceleration, recovery offset is {}", maxPhyOffsetOfConsumeQueue);
lastValidMsgPhyOffset = maxPhyOffsetOfConsumeQueue;
lastConfirmValidMsgPhyOffset = maxPhyOffsetOfConsumeQueue;
byteBuffer.position((int) mappedFileOffset);
}
} else {
mappedFileOffset = 0;
lastValidMsgPhyOffset = processOffset;
lastConfirmValidMsgPhyOffset = processOffset;
}
// abnormal recover require dispatching
boolean doDispatch = true;
while (true) {
Expand Down Expand Up @@ -840,19 +870,21 @@ private boolean isMappedFileMatchedRecover(final MappedFile mappedFile,
boolean recoverNormally) throws RocksDBException {
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();

int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSITION);
if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE && magicCode != MessageDecoder.MESSAGE_MAGIC_CODE_V2) {
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
boolean checkDupInfo = this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable();

DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo);

if (!dispatchRequest.isSuccess()) {
return false;
}

int sysFlag = byteBuffer.getInt(MessageDecoder.SYSFLAG_POSITION);
int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
int msgStoreTimePos = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + bornHostLength;
long storeTimestamp = byteBuffer.getLong(msgStoreTimePos);
if (0 == storeTimestamp) {
long storeTimestamp = dispatchRequest.getStoreTimestamp();
long phyOffset = dispatchRequest.getCommitLogOffset();

if (0 == dispatchRequest.getStoreTimestamp()) {
return false;
}
long phyOffset = byteBuffer.getLong(MessageDecoder.MESSAGE_PHYSIC_OFFSET_POSITION);

if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable() &&
this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,13 @@ public class MessageStoreConfig {

private long rocksdbWalFileRollingThreshold = SizeUnit.GB;

/**
* Note: For correctness, this switch should be enabled only if the previous startup was configured with SYNC_FLUSH
* and the storeType was defaultRocksDB. This switch is not recommended for normal use cases (include master-slave
* or controller mode).
*/
private boolean enableAcceleratedRecovery = false;

public String getRocksdbCompressionType() {
return rocksdbCompressionType;
}
Expand Down Expand Up @@ -2008,7 +2015,16 @@ public boolean isEnableLogConsumeQueueRepeatedlyBuildWhenRecover() {
return enableLogConsumeQueueRepeatedlyBuildWhenRecover;
}

public void setEnableLogConsumeQueueRepeatedlyBuildWhenRecover(boolean enableLogConsumeQueueRepeatedlyBuildWhenRecover) {
public void setEnableLogConsumeQueueRepeatedlyBuildWhenRecover(
boolean enableLogConsumeQueueRepeatedlyBuildWhenRecover) {
this.enableLogConsumeQueueRepeatedlyBuildWhenRecover = enableLogConsumeQueueRepeatedlyBuildWhenRecover;
}

public boolean isEnableAcceleratedRecovery() {
return enableAcceleratedRecovery;
}

public void setEnableAcceleratedRecovery(boolean enableAcceleratedRecovery) {
this.enableAcceleratedRecovery = enableAcceleratedRecovery;
}
}
Loading