Skip to content

Commit 8639dcc

Browse files
authored
[ISSUE #9254] Optimize the logs of the message store (#9528)
1 parent 2accb97 commit 8639dcc

8 files changed

Lines changed: 321 additions & 21 deletions

File tree

store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -792,7 +792,12 @@ private boolean putMessagePositionInfo(final long offset, final int size, final
792792
final long cqOffset) {
793793

794794
if (offset + size <= this.getMaxPhysicOffset()) {
795-
log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", this.getMaxPhysicOffset(), offset);
795+
// During the recovery process after broker crashes, this logs will cause the scrolling of valid logs.
796+
if (messageStore.getStateMachine().getCurrentState().isAfter(MessageStoreStateMachine.MessageStoreState.RECOVER_COMMITLOG_OK) ||
797+
messageStore.getMessageStoreConfig().isEnableLogConsumeQueueRepeatedlyBuildWhenRecover()) {
798+
log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}",
799+
this.getMaxPhysicOffset(), offset);
800+
}
796801
return true;
797802
}
798803

store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,8 @@ public class DefaultMessageStore implements MessageStore {
204204
// this is a unmodifiableMap
205205
private final ConcurrentMap<String, TopicConfig> topicConfigTable;
206206

207+
private final MessageStoreStateMachine stateMachine;
208+
207209
private final ScheduledExecutorService scheduledCleanQueueExecutorService =
208210
ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreCleanQueueScheduledThread"));
209211

@@ -250,6 +252,8 @@ public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final Br
250252
lockFile = new RandomAccessFile(file, "rw");
251253

252254
parseDelayLevel();
255+
256+
stateMachine = new MessageStoreStateMachine(LOGGER);
253257
}
254258

255259
public ConsumeQueueStoreInterface createConsumeQueueStore() {
@@ -296,25 +300,28 @@ public boolean parseDelayLevel() {
296300
@Override
297301
public boolean load() {
298302
boolean result = true;
299-
303+
stateMachine.transitTo(MessageStoreStateMachine.MessageStoreState.LOAD_BEGIN);
300304
try {
301305
boolean lastExitOK = !this.isTempFileExist();
302306
LOGGER.info("last shutdown {}, store path root dir: {}",
303307
lastExitOK ? "normally" : "abnormally", messageStoreConfig.getStorePathRootDir());
304308

305309
// load Commit Log
306310
result = this.commitLog.load();
307-
311+
stateMachine.transitTo(MessageStoreStateMachine.MessageStoreState.LOAD_COMMITLOG_OK, result);
308312
// load Consume Queue
309313
result = result && this.consumeQueueStore.load();
314+
stateMachine.transitTo(MessageStoreStateMachine.MessageStoreState.LOAD_CONSUME_QUEUE_OK, result);
310315

311316
if (messageStoreConfig.isEnableCompaction()) {
312317
result = result && this.compactionService.load(lastExitOK);
318+
stateMachine.transitTo(MessageStoreStateMachine.MessageStoreState.LOAD_COMPACTION_OK, result);
313319
}
314320

315321
if (result) {
316322
loadCheckPoint();
317323
result = this.indexService.load(lastExitOK);
324+
stateMachine.transitTo(MessageStoreStateMachine.MessageStoreState.LOAD_INDEX_OK, result);
318325
this.recover(lastExitOK);
319326
LOGGER.info("message store recover end, and the max phy offset = {}", this.getMaxPhyOffset());
320327
}
@@ -343,28 +350,23 @@ public void loadCheckPoint() throws IOException {
343350
}
344351

345352
private void recover(final boolean lastExitOK) throws RocksDBException {
353+
this.stateMachine.transitTo(MessageStoreStateMachine.MessageStoreState.RECOVER_BEGIN);
346354
// recover consume queue
347-
long recoverConsumeQueueStart = System.currentTimeMillis();
348355
this.consumeQueueStore.recover(this.brokerConfig.isRecoverConcurrently());
349-
long dispatchFromPhyOffset = this.consumeQueueStore.getDispatchFromPhyOffset();
350-
long recoverConsumeQueueEnd = System.currentTimeMillis();
356+
this.stateMachine.transitTo(MessageStoreStateMachine.MessageStoreState.RECOVER_CONSUME_QUEUE_OK);
351357

352358
// recover commitlog
359+
long dispatchFromPhyOffset = this.consumeQueueStore.getDispatchFromPhyOffset();
353360
if (lastExitOK) {
354361
this.commitLog.recoverNormally(dispatchFromPhyOffset);
355362
} else {
356363
this.commitLog.recoverAbnormally(dispatchFromPhyOffset);
357364
}
365+
this.stateMachine.transitTo(MessageStoreStateMachine.MessageStoreState.RECOVER_COMMITLOG_OK);
358366

359367
// recover consume offset table
360-
long recoverCommitLogEnd = System.currentTimeMillis();
361368
this.recoverTopicQueueTable();
362-
long recoverConsumeOffsetEnd = System.currentTimeMillis();
363-
364-
LOGGER.info("message store recover total cost: {} ms, " +
365-
"recoverConsumeQueue: {} ms, recoverCommitLog: {} ms, recoverOffsetTable: {} ms",
366-
recoverConsumeOffsetEnd - recoverConsumeQueueStart, recoverConsumeQueueEnd - recoverConsumeQueueStart,
367-
recoverCommitLogEnd - recoverConsumeQueueEnd, recoverConsumeOffsetEnd - recoverCommitLogEnd);
369+
this.stateMachine.transitTo(MessageStoreStateMachine.MessageStoreState.RECOVER_TOPIC_QUEUE_TABLE_OK);
368370
}
369371

370372
/**
@@ -411,6 +413,8 @@ public void start() throws Exception {
411413
this.addScheduleTask();
412414
this.perfs.start();
413415
this.shutdown = false;
416+
417+
this.stateMachine.transitTo(MessageStoreStateMachine.MessageStoreState.RUNNING);
414418
}
415419

416420
private void doRecheckReputOffsetFromCq() throws InterruptedException {
@@ -470,6 +474,7 @@ private void doRecheckReputOffsetFromCq() throws InterruptedException {
470474
public void shutdown() {
471475
if (!this.shutdown) {
472476
this.shutdown = true;
477+
this.stateMachine.transitTo(MessageStoreStateMachine.MessageStoreState.SHUTDOWN_BEGIN);
473478

474479
this.scheduledExecutorService.shutdown();
475480
this.scheduledCleanQueueExecutorService.shutdown();
@@ -501,6 +506,7 @@ public void shutdown() {
501506
if (this.runningFlags.isWriteable() && dispatchBehindBytes() == 0) {
502507
this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
503508
shutDownNormal = true;
509+
this.stateMachine.transitTo(MessageStoreStateMachine.MessageStoreState.SHUTDOWN_OK);
504510
} else {
505511
LOGGER.warn("the store may be wrong, so shutdown abnormally, and keep abort file.");
506512
}
@@ -3001,4 +3007,8 @@ public ScheduledExecutorService getScheduledCleanQueueExecutorService() {
30013007
public void destroyConsumeQueueStore(boolean loadAfterDestroy) {
30023008
consumeQueueStore.destroy(loadAfterDestroy);
30033009
}
3010+
3011+
public MessageStoreStateMachine getStateMachine() {
3012+
return stateMachine;
3013+
}
30043014
}

store/src/main/java/org/apache/rocketmq/store/MessageStore.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -983,4 +983,6 @@ DispatchRequest checkMessageAndReturnSize(final ByteBuffer byteBuffer, final boo
983983
* notify message arrive if necessary
984984
*/
985985
void notifyMessageArriveIfNecessary(DispatchRequest dispatchRequest);
986+
987+
MessageStoreStateMachine getStateMachine();
986988
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.store;
19+
20+
import org.apache.rocketmq.common.constant.LoggerName;
21+
import org.apache.rocketmq.logging.org.slf4j.Logger;
22+
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
23+
24+
public class MessageStoreStateMachine {
25+
protected final Logger log;
26+
27+
private MessageStoreState currentState;
28+
private long lastStateChangeTimestamp;
29+
private final long startTimestamp;
30+
31+
public enum MessageStoreState {
32+
INIT(0),
33+
34+
LOAD_BEGIN(10),
35+
LOAD_COMMITLOG_OK(11),
36+
LOAD_CONSUME_QUEUE_OK(12),
37+
LOAD_COMPACTION_OK(13),
38+
LOAD_INDEX_OK(14),
39+
40+
RECOVER_BEGIN(20),
41+
RECOVER_CONSUME_QUEUE_OK(21),
42+
RECOVER_COMMITLOG_OK(22),
43+
RECOVER_TOPIC_QUEUE_TABLE_OK(23),
44+
45+
RUNNING(30),
46+
47+
SHUTDOWN_BEGIN(40),
48+
SHUTDOWN_OK(41);
49+
50+
final int order;
51+
52+
MessageStoreState(int order) {
53+
this.order = order;
54+
}
55+
56+
public int getOrder() {
57+
return order;
58+
}
59+
60+
public boolean isBefore(MessageStoreState storeState) {
61+
return this.order < storeState.order;
62+
}
63+
64+
public boolean isAfter(MessageStoreState storeState) {
65+
return this.order > storeState.order;
66+
}
67+
}
68+
69+
70+
public MessageStoreStateMachine(Logger log) {
71+
this.log = log == null ? LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME) : log;
72+
this.currentState = MessageStoreState.INIT;
73+
this.startTimestamp = System.currentTimeMillis();
74+
this.lastStateChangeTimestamp = startTimestamp;
75+
logStateChange(null, currentState, true);
76+
}
77+
78+
public void transitTo(MessageStoreState newState) {
79+
transitTo(newState, true);
80+
}
81+
82+
public void transitTo(MessageStoreState newState, boolean success) {
83+
if (!newState.isAfter(currentState)) {
84+
throw new IllegalStateException(
85+
String.format("Invalid state transition from %s to %s. Can only move forward.",
86+
currentState, newState)
87+
);
88+
}
89+
90+
logStateChange(currentState, newState, success);
91+
if (success) {
92+
this.currentState = newState;
93+
this.lastStateChangeTimestamp = System.currentTimeMillis();
94+
}
95+
}
96+
97+
private void logStateChange(MessageStoreState fromState, MessageStoreState toState, boolean success) {
98+
if (fromState == null && success) {
99+
log.info("MessageStoreState initialized, state={}", toState);
100+
} else if (success) {
101+
log.info("MessageStoreState transition from {} to {}; Time in previous state={}ms, Total time={}ms",
102+
fromState, toState, getCurrentStateRunningTimeMs(), getTotalRunningTimeMs());
103+
} else {
104+
log.warn("MessageStoreState transition from {} to {} failed; Time in previous state={}ms, Total "
105+
+ "time={}ms", fromState, toState, getCurrentStateRunningTimeMs(), getTotalRunningTimeMs());
106+
}
107+
}
108+
109+
public MessageStoreState getCurrentState() {
110+
return currentState;
111+
}
112+
113+
public long getTotalRunningTimeMs() {
114+
return System.currentTimeMillis() - startTimestamp;
115+
}
116+
117+
public long getCurrentStateRunningTimeMs() {
118+
return System.currentTimeMillis() - lastStateChangeTimestamp;
119+
}
120+
}

store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,8 @@ public void setRocksdbCompressionType(String compressionType) {
483483
**/
484484
private boolean useABSLock = false;
485485

486+
private boolean enableLogConsumeQueueRepeatedlyBuildWhenRecover = false;
487+
486488
public boolean isRocksdbCQDoubleWriteEnable() {
487489
return rocksdbCQDoubleWriteEnable;
488490
}
@@ -2001,4 +2003,12 @@ public int getCombineCQMaxExtraSearchCommitLogFiles() {
20012003
public void setCombineCQMaxExtraSearchCommitLogFiles(int combineCQMaxExtraSearchCommitLogFiles) {
20022004
this.combineCQMaxExtraSearchCommitLogFiles = combineCQMaxExtraSearchCommitLogFiles;
20032005
}
2006+
2007+
public boolean isEnableLogConsumeQueueRepeatedlyBuildWhenRecover() {
2008+
return enableLogConsumeQueueRepeatedlyBuildWhenRecover;
2009+
}
2010+
2011+
public void setEnableLogConsumeQueueRepeatedlyBuildWhenRecover(boolean enableLogConsumeQueueRepeatedlyBuildWhenRecover) {
2012+
this.enableLogConsumeQueueRepeatedlyBuildWhenRecover = enableLogConsumeQueueRepeatedlyBuildWhenRecover;
2013+
}
20042014
}

store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.rocketmq.store.GetMessageResult;
4343
import org.apache.rocketmq.store.MessageFilter;
4444
import org.apache.rocketmq.store.MessageStore;
45+
import org.apache.rocketmq.store.MessageStoreStateMachine;
4546
import org.apache.rocketmq.store.PutMessageResult;
4647
import org.apache.rocketmq.store.QueryMessageResult;
4748
import org.apache.rocketmq.store.RunningFlags;
@@ -660,4 +661,9 @@ public void notifyMessageArriveIfNecessary(DispatchRequest dispatchRequest) {
660661
public MessageStore getNext() {
661662
return next;
662663
}
664+
665+
@Override
666+
public MessageStoreStateMachine getStateMachine() {
667+
return next.getStateMachine();
668+
}
663669
}

store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717

1818
package org.apache.rocketmq.store;
1919

20+
import static org.assertj.core.api.Assertions.assertThat;
21+
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertTrue;
23+
2024
import com.google.common.collect.Sets;
2125
import java.io.File;
2226
import java.io.RandomAccessFile;
@@ -35,21 +39,21 @@
3539
import java.util.HashSet;
3640
import java.util.List;
3741
import java.util.Map;
42+
import java.util.Properties;
3843
import java.util.Random;
3944
import java.util.UUID;
40-
import java.util.Properties;
4145
import java.util.concurrent.ConcurrentHashMap;
4246
import java.util.concurrent.ConcurrentMap;
4347
import java.util.concurrent.TimeUnit;
4448
import java.util.concurrent.atomic.AtomicInteger;
4549
import org.apache.rocketmq.common.BrokerConfig;
50+
import org.apache.rocketmq.common.MixAll;
4651
import org.apache.rocketmq.common.UtilAll;
4752
import org.apache.rocketmq.common.message.MessageBatch;
4853
import org.apache.rocketmq.common.message.MessageDecoder;
4954
import org.apache.rocketmq.common.message.MessageExt;
5055
import org.apache.rocketmq.common.message.MessageExtBatch;
5156
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
52-
import org.apache.rocketmq.common.MixAll;
5357
import org.apache.rocketmq.store.config.BrokerRole;
5458
import org.apache.rocketmq.store.config.FlushDiskType;
5559
import org.apache.rocketmq.store.config.MessageStoreConfig;
@@ -66,10 +70,6 @@
6670
import org.junit.runner.RunWith;
6771
import org.mockito.junit.MockitoJUnitRunner;
6872

69-
import static org.assertj.core.api.Assertions.assertThat;
70-
import static org.junit.Assert.assertEquals;
71-
import static org.junit.Assert.assertTrue;
72-
7373
@RunWith(MockitoJUnitRunner.class)
7474
public class DefaultMessageStoreTest {
7575
private final String storeMessage = "Once, there was a chance for me!";
@@ -911,7 +911,7 @@ public void testDeleteTopics() {
911911
String topicName = "topic-" + i;
912912
for (int j = 0; j < 4; j++) {
913913
ConsumeQueue consumeQueue = new ConsumeQueue(topicName, j, messageStoreConfig.getStorePathRootDir(),
914-
messageStoreConfig.getMappedFileSizeConsumeQueue(), messageStore);
914+
messageStoreConfig.getMappedFileSizeConsumeQueue(), (DefaultMessageStore) messageStore);
915915
cqTable.put(j, consumeQueue);
916916
}
917917
consumeQueueTable.put(topicName, cqTable);
@@ -933,7 +933,7 @@ public void testCleanUnusedTopic() {
933933
String topicName = "topic-" + i;
934934
for (int j = 0; j < 4; j++) {
935935
ConsumeQueue consumeQueue = new ConsumeQueue(topicName, j, messageStoreConfig.getStorePathRootDir(),
936-
messageStoreConfig.getMappedFileSizeConsumeQueue(), messageStore);
936+
messageStoreConfig.getMappedFileSizeConsumeQueue(), (DefaultMessageStore) messageStore);
937937
cqTable.put(j, consumeQueue);
938938
}
939939
consumeQueueTable.put(topicName, cqTable);

0 commit comments

Comments
 (0)