Skip to content

Commit 628230d

Browse files
RongtongJinRongtongJin
andauthored
[ISSUE #10011] Optimize accelerated recovery process and refactor code (#10012)
* When IndexRocksDBEnable or TransRocksDBEnable are enabled, we need to take these two offsets into account to accelerate recovery. * Add UTs * Refactor the code based on the review comments * Revert "[ISSUE #8127]Optimize the metric calculation logic of the time wheel" * Remove useless import * Refactor Code * Refactor Code * Refactor Code * Refactor Code * Refactor Code * Implement accelerated recovery for the file-based ConsumeQueue. * Implement accelerated recovery for the file-based ConsumeQueue. Change-Id: Ieac45d0582f2f83d977aeb8e6f5084268b7f8752 * Implement accelerated recovery for the file-based ConsumeQueue. * Ignore testTruncateCQ UT --------- Co-authored-by: RongtongJin <user@example.com>
1 parent 7030c45 commit 628230d

18 files changed

Lines changed: 345 additions & 87 deletions

store/src/main/java/org/apache/rocketmq/store/CommitLog.java

Lines changed: 9 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -365,15 +365,6 @@ public void recoverNormally(long dispatchFromPhyOffset) throws RocksDBException
365365
long mappedFileOffset = 0;
366366
long lastValidMsgPhyOffset = this.getConfirmOffset();
367367

368-
if (defaultMessageStore.getMessageStoreConfig().isEnableRocksDBStore()
369-
&& defaultMessageStore.getMessageStoreConfig().isEnableAcceleratedRecovery()) {
370-
mappedFileOffset = dispatchFromPhyOffset - mappedFile.getFileFromOffset();
371-
if (mappedFileOffset > 0) {
372-
log.info("recover using acceleration, recovery offset is {}", dispatchFromPhyOffset);
373-
lastValidMsgPhyOffset = dispatchFromPhyOffset;
374-
byteBuffer.position((int) mappedFileOffset);
375-
}
376-
}
377368
while (true) {
378369
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo);
379370
int size = dispatchRequest.getMsgSize();
@@ -744,7 +735,7 @@ public long getLastFileFromOffset() {
744735
/**
745736
* @throws RocksDBException only in rocksdb mode
746737
*/
747-
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBException {
738+
public void recoverAbnormally(long dispatchFromPhyOffset) throws RocksDBException {
748739
// recover by the minimum time stamp
749740
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
750741
boolean checkDupInfo = this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable();
@@ -779,18 +770,17 @@ public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBExc
779770
long lastValidMsgPhyOffset;
780771
long lastConfirmValidMsgPhyOffset;
781772

782-
if (defaultMessageStore.getMessageStoreConfig().isEnableRocksDBStore()
783-
&& defaultMessageStore.getMessageStoreConfig().isEnableAcceleratedRecovery()) {
784-
mappedFileOffset = maxPhyOffsetOfConsumeQueue - mappedFile.getFileFromOffset();
773+
if (defaultMessageStore.getMessageStoreConfig().isEnableAcceleratedRecovery()) {
774+
mappedFileOffset = dispatchFromPhyOffset - mappedFile.getFileFromOffset();
785775
// Protective measures, falling back to non-accelerated mode, which is extremely unlikely to occur
786776
if (mappedFileOffset < 0) {
787777
mappedFileOffset = 0;
788778
lastValidMsgPhyOffset = processOffset;
789779
lastConfirmValidMsgPhyOffset = processOffset;
790780
} else {
791-
log.info("recover using acceleration, recovery offset is {}", maxPhyOffsetOfConsumeQueue);
792-
lastValidMsgPhyOffset = maxPhyOffsetOfConsumeQueue;
793-
lastConfirmValidMsgPhyOffset = maxPhyOffsetOfConsumeQueue;
781+
log.info("recover using acceleration, recovery offset is {}", dispatchFromPhyOffset);
782+
lastValidMsgPhyOffset = dispatchFromPhyOffset;
783+
lastConfirmValidMsgPhyOffset = dispatchFromPhyOffset;
794784
byteBuffer.position((int) mappedFileOffset);
795785
}
796786
} else {
@@ -933,27 +923,15 @@ private boolean isMappedFileMatchedRecover(final MappedFile mappedFile,
933923
return false;
934924
}
935925

936-
if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable() &&
937-
this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
938-
if (storeTimestamp > this.defaultMessageStore.getStoreCheckpoint().getIndexMsgTimestamp()) {
939-
return false;
940-
}
941-
log.info("CommitLog isMmapFileMatchedRecover find satisfied MmapFile for index, " +
942-
"MmapFile storeTimestamp={}, MmapFile phyOffset={}, indexMsgTimestamp={}, recoverNormally={}",
943-
storeTimestamp, phyOffset, this.defaultMessageStore.getStoreCheckpoint().getIndexMsgTimestamp(), recoverNormally);
944-
}
945-
946926
return isMappedFileMatchedRecover(phyOffset, storeTimestamp, recoverNormally);
947927
}
948928

949929
private boolean isMappedFileMatchedRecover(long phyOffset, long storeTimestamp,
950930
boolean recoverNormally) throws RocksDBException {
951931
boolean result = this.defaultMessageStore.getQueueStore().isMappedFileMatchedRecover(phyOffset, storeTimestamp, recoverNormally);
952-
if (null != this.defaultMessageStore.getTransMessageRocksDBStore() && defaultMessageStore.getMessageStoreConfig().isTransRocksDBEnable() && !defaultMessageStore.getMessageStoreConfig().isTransWriteOriginTransHalfEnable()) {
953-
result = result && this.defaultMessageStore.getTransMessageRocksDBStore().isMappedFileMatchedRecover(phyOffset);
954-
}
955-
if (null != this.defaultMessageStore.getIndexRocksDBStore() && defaultMessageStore.getMessageStoreConfig().isIndexRocksDBEnable()) {
956-
result = result && this.defaultMessageStore.getIndexRocksDBStore().isMappedFileMatchedRecover(phyOffset);
932+
// Check all registered CommitLogDispatchStore instances
933+
for (CommitLogDispatchStore store : defaultMessageStore.getCommitLogDispatchStores()) {
934+
result = result && store.isMappedFileMatchedRecover(phyOffset, storeTimestamp, recoverNormally);
957935
}
958936
return result;
959937
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.store;
18+
19+
import org.rocksdb.RocksDBException;
20+
21+
/**
22+
* Interface for stores that require commitlog dispatch and recovery. Each store implementing this interface should
23+
* register itself in the commitlog when loading. This abstraction allows the commitlog recovery process to
24+
* automatically consider all registered stores without needing to modify the recovery logic when adding a new store.
25+
*/
26+
public interface CommitLogDispatchStore {
27+
28+
/**
29+
* Get the dispatch offset in the store. Messages whose phyOffset larger than this offset need to be dispatched. The
30+
* dispatch offset is only used during recovery.
31+
*
32+
* @param recoverNormally true if broker exited normally last time (normal recovery), false for abnormal recovery
33+
* @return the dispatch phyOffset, or null if the store is not enabled or has no valid offset
34+
* @throws RocksDBException if there is an error accessing RocksDB storage
35+
*/
36+
Long getDispatchFromPhyOffset(boolean recoverNormally) throws RocksDBException;
37+
38+
/**
39+
* Used to determine whether to start doDispatch from this commitLog mappedFile.
40+
*
41+
* @param phyOffset the offset of the first message in this commitlog mappedFile
42+
* @param storeTimestamp the timestamp of the first message in this commitlog mappedFile
43+
* @param recoverNormally whether this is a normal recovery
44+
* @return whether to start recovering from this MappedFile
45+
* @throws RocksDBException if there is an error accessing RocksDB storage
46+
*/
47+
boolean isMappedFileMatchedRecover(long phyOffset, long storeTimestamp,
48+
boolean recoverNormally) throws RocksDBException;
49+
}
50+

store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -712,6 +712,7 @@ public void putMessagePositionInfoWrapper(DispatchRequest request) {
712712
this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
713713
}
714714
this.messageStore.getStoreCheckpoint().setTmpLogicsMsgTimestamp(request.getStoreTimestamp());
715+
this.messageStore.getStoreCheckpoint().setTmpLogicsPhysicalOffset(request.getCommitLogOffset());
715716
if (MultiDispatchUtils.checkMultiDispatchQueue(this.messageStore.getMessageStoreConfig(), request)) {
716717
multiDispatchLmqQueue(request, maxRetries);
717718
}

store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java

Lines changed: 61 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,11 @@ public class DefaultMessageStore implements MessageStore {
175175

176176
private final LinkedList<CommitLogDispatcher> dispatcherList = new LinkedList<>();
177177

178+
/**
179+
* List of stores that require commitlog dispatch and recovery. Each store registers itself when loading.
180+
*/
181+
private final List<CommitLogDispatchStore> commitLogDispatchStores = new ArrayList<>();
182+
178183
private final RandomAccessFile lockFile;
179184

180185
private FileLock lock;
@@ -333,6 +338,11 @@ public boolean load() {
333338
// load Consume Queue
334339
result = result && this.consumeQueueStore.load();
335340
stateMachine.transitTo(MessageStoreStateMachine.MessageStoreState.LOAD_CONSUME_QUEUE_OK, result);
341+
// Register consume queue store for commitlog dispatch
342+
// AbstractConsumeQueueStore implements CommitLogDispatchStore, so we can register it directly
343+
if (this.consumeQueueStore != null) {
344+
registerCommitLogDispatchStore(this.consumeQueueStore);
345+
}
336346

337347
if (messageStoreConfig.isEnableCompaction()) {
338348
result = result && this.compactionService.load(lastExitOK);
@@ -342,7 +352,15 @@ public boolean load() {
342352
if (result) {
343353
loadCheckPoint();
344354
result = this.indexService.load(lastExitOK);
355+
registerCommitLogDispatchStore(this.indexService);
345356
stateMachine.transitTo(MessageStoreStateMachine.MessageStoreState.LOAD_INDEX_OK, result);
357+
// Register IndexRocksDBStore and TransMessageRocksDBStore for commit-log dispatch
358+
if (messageStoreConfig.isIndexRocksDBEnable()) {
359+
registerCommitLogDispatchStore(this.indexRocksDBStore);
360+
}
361+
if (messageStoreConfig.isTransRocksDBEnable() && transMessageRocksDBStore != null) {
362+
registerCommitLogDispatchStore(this.transMessageRocksDBStore);
363+
}
346364
this.recover(lastExitOK);
347365
LOGGER.info("message store recover end, and the max phy offset = {}", this.getMaxPhyOffset());
348366
}
@@ -377,7 +395,16 @@ private void recover(final boolean lastExitOK) throws RocksDBException {
377395
this.stateMachine.transitTo(MessageStoreStateMachine.MessageStoreState.RECOVER_CONSUME_QUEUE_OK);
378396

379397
// recover commitlog
380-
long dispatchFromPhyOffset = this.consumeQueueStore.getDispatchFromPhyOffset();
398+
// Calculate the minimum dispatch offset from all registered stores
399+
Long dispatchFromPhyOffset = this.consumeQueueStore.getDispatchFromPhyOffset(lastExitOK);
400+
401+
for (CommitLogDispatchStore store : commitLogDispatchStores) {
402+
Long storeOffset = store.getDispatchFromPhyOffset(lastExitOK);
403+
if (storeOffset != null && storeOffset > 0) {
404+
dispatchFromPhyOffset = Math.min(dispatchFromPhyOffset, storeOffset);
405+
}
406+
}
407+
381408
if (lastExitOK) {
382409
this.commitLog.recoverNormally(dispatchFromPhyOffset);
383410
} else {
@@ -1102,6 +1129,31 @@ public void setTimerMessageRocksDBStore(TimerMessageRocksDBStore timerMessageRoc
11021129
@Override
11031130
public void setTransMessageRocksDBStore(TransMessageRocksDBStore transMessageRocksDBStore) {
11041131
this.transMessageRocksDBStore = transMessageRocksDBStore;
1132+
// Register TransMessageRocksDBStore for commitlog dispatch if enabled
1133+
if (transMessageRocksDBStore != null && messageStoreConfig.isTransRocksDBEnable()) {
1134+
registerCommitLogDispatchStore(this.transMessageRocksDBStore);
1135+
}
1136+
}
1137+
1138+
/**
1139+
* Register a store that requires commitlog dispatch and recovery. Each store should register itself when loading.
1140+
*
1141+
* @param store the store to register
1142+
*/
1143+
public void registerCommitLogDispatchStore(CommitLogDispatchStore store) {
1144+
if (store != null) {
1145+
commitLogDispatchStores.add(store);
1146+
LOGGER.info("Registered CommitLogDispatchStore: {}", store.getClass().getSimpleName());
1147+
}
1148+
}
1149+
1150+
/**
1151+
* Get all registered CommitLogDispatchStore instances.
1152+
*
1153+
* @return list of registered stores
1154+
*/
1155+
public List<CommitLogDispatchStore> getCommitLogDispatchStores() {
1156+
return commitLogDispatchStores;
11051157
}
11061158

11071159
@Override
@@ -1400,7 +1452,8 @@ public QueryMessageResult queryMessage(String topic, String key, int maxNum, lon
14001452
}
14011453

14021454
@Override
1403-
public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end, String indexType, String lastKey) {
1455+
public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end, String indexType,
1456+
String lastKey) {
14041457
QueryMessageResult queryMessageResult = new QueryMessageResult();
14051458
long lastQueryMsgTime = end;
14061459
for (int i = 0; i < 3; i++) {
@@ -1510,10 +1563,9 @@ public long now() {
15101563
}
15111564

15121565
/**
1513-
* Lazy clean queue offset table.
1514-
* If offset table is cleaned, and old messages are dispatching after the old consume queue is cleaned,
1515-
* consume queue will be created with old offset, then later message with new offset table can not be
1516-
* dispatched to consume queue.
1566+
* Lazy clean queue offset table. If offset table is cleaned, and old messages are dispatching after the old consume
1567+
* queue is cleaned, consume queue will be created with old offset, then later message with new offset table can not
1568+
* be dispatched to consume queue.
15171569
*/
15181570
@Override
15191571
public int deleteTopics(final Set<String> deleteTopics) {
@@ -1677,6 +1729,7 @@ public boolean checkInStoreByConsumeOffset(String topic, int queueId, long consu
16771729
public long dispatchBehindBytes() {
16781730
return this.reputMessageService.behind();
16791731
}
1732+
16801733
@Override
16811734
public long dispatchBehindMilliseconds() {
16821735
return this.reputMessageService.behindMs();
@@ -1818,8 +1871,8 @@ public boolean checkInDiskByCommitOffset(long offsetPy) {
18181871
}
18191872

18201873
/**
1821-
* The ratio val is estimated by the experiment and experience
1822-
* so that the result is not high accurate for different business
1874+
* The ratio val is estimated by the experiment and experience so that the result is not high accurate for different
1875+
* business
18231876
*
18241877
* @return
18251878
*/

store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public class StoreCheckpoint {
3636
private volatile long tmpLogicsMsgTimestamp = 0;
3737
private volatile long physicMsgTimestamp = 0;
3838
private volatile long logicsMsgTimestamp = 0;
39+
private volatile long tmpLogicsPhysicalOffset = 0;
40+
private volatile long logicsPhysicalOffset = 0;
3941
private volatile long indexMsgTimestamp = 0;
4042
private volatile long masterFlushedOffset = 0;
4143
private volatile long confirmPhyOffset = 0;
@@ -56,6 +58,7 @@ public StoreCheckpoint(final String scpPath) throws IOException {
5658
this.indexMsgTimestamp = this.mappedByteBuffer.getLong(16);
5759
this.masterFlushedOffset = this.mappedByteBuffer.getLong(24);
5860
this.confirmPhyOffset = this.mappedByteBuffer.getLong(32);
61+
this.logicsPhysicalOffset = this.mappedByteBuffer.getLong(40);
5962

6063
log.info("store checkpoint file physicMsgTimestamp " + this.physicMsgTimestamp + ", "
6164
+ UtilAll.timeMillisToHumanString(this.physicMsgTimestamp));
@@ -65,6 +68,7 @@ public StoreCheckpoint(final String scpPath) throws IOException {
6568
+ UtilAll.timeMillisToHumanString(this.indexMsgTimestamp));
6669
log.info("store checkpoint file masterFlushedOffset " + this.masterFlushedOffset);
6770
log.info("store checkpoint file confirmPhyOffset " + this.confirmPhyOffset);
71+
log.info("store checkpoint file logicsPhysicalOffset " + this.logicsPhysicalOffset);
6872
} else {
6973
log.info("store checkpoint file not exists, " + scpPath);
7074
}
@@ -91,6 +95,7 @@ public void flush() {
9195
this.mappedByteBuffer.putLong(16, this.indexMsgTimestamp);
9296
this.mappedByteBuffer.putLong(24, this.masterFlushedOffset);
9397
this.mappedByteBuffer.putLong(32, this.confirmPhyOffset);
98+
this.mappedByteBuffer.putLong(40, this.logicsPhysicalOffset);
9499
this.mappedByteBuffer.force();
95100
} catch (Throwable e) {
96101
log.error("Failed to flush", e);
@@ -121,6 +126,22 @@ public void setTmpLogicsMsgTimestamp(long tmpLogicsMsgTimestamp) {
121126
this.tmpLogicsMsgTimestamp = tmpLogicsMsgTimestamp;
122127
}
123128

129+
public long getTmpLogicsPhysicalOffset() {
130+
return tmpLogicsPhysicalOffset;
131+
}
132+
133+
public void setTmpLogicsPhysicalOffset(long tmpLogicsPhysicalOffset) {
134+
this.tmpLogicsPhysicalOffset = tmpLogicsPhysicalOffset;
135+
}
136+
137+
public long getLogicsPhysicalOffset() {
138+
return logicsPhysicalOffset;
139+
}
140+
141+
public void setLogicsPhysicalOffset(long logicsPhysicalOffset) {
142+
this.logicsPhysicalOffset = logicsPhysicalOffset;
143+
}
144+
124145
public long getConfirmPhyOffset() {
125146
return confirmPhyOffset;
126147
}

store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -512,9 +512,8 @@ public class MessageStoreConfig {
512512
private long rocksdbWalFileRollingThreshold = SizeUnit.GB;
513513

514514
/**
515-
* Note: For correctness, this switch should be enabled only if the previous startup was configured with SYNC_FLUSH
516-
* and the storeType was defaultRocksDB. This switch is not recommended for normal use cases (include master-slave
517-
* or controller mode).
515+
* Note: For correctness, this switch should be enabled only if the previous startup was configured with SYNC_FLUSH.
516+
* This switch is not recommended for normal use cases (include master-slave or controller mode).
518517
*/
519518
private boolean enableAcceleratedRecovery = false;
520519

store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -467,8 +467,8 @@ public void recoverNormally(long dispatchFromPhyOffset) throws RocksDBException
467467
}
468468

469469
@Override
470-
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBException {
471-
dledgerRecoverAbnormally(maxPhyOffsetOfConsumeQueue);
470+
public void recoverAbnormally(long dispatchFromPhyOffset) throws RocksDBException {
471+
dledgerRecoverAbnormally(dispatchFromPhyOffset);
472472
}
473473

474474
@Override

0 commit comments

Comments
 (0)