Skip to content

Commit 2c2cc92

Browse files
authored
[ISSUE #10173] Support LMQ in CombineConsumeQueueStore without migration to RocksDB CQ (#10174)
1 parent 614b816 commit 2c2cc92

File tree

16 files changed

+605
-39
lines changed

16 files changed

+605
-39
lines changed

broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ public BrokerController(
396396
this.authorizationMetadataManager = AuthorizationFactory.getMetadataManager(this.authConfig);
397397
this.topicRouteInfoManager = new TopicRouteInfoManager(this);
398398
this.liteSharding = new LiteShardingImpl(this, this.topicRouteInfoManager);
399-
this.liteLifecycleManager = this.messageStoreConfig.isEnableRocksDBStore() ?
399+
this.liteLifecycleManager = this.messageStoreConfig.isEnableRocksDBStore() || this.messageStoreConfig.isRocksdbCQDoubleWriteEnable() ?
400400
new RocksDBLiteLifecycleManager(this, this.liteSharding) : new LiteLifecycleManager(this, this.liteSharding);
401401
this.liteSubscriptionRegistry = new LiteSubscriptionRegistryImpl(this, liteLifecycleManager);
402402
this.liteSubscriptionCtlProcessor = new LiteSubscriptionCtlProcessor(this, liteSubscriptionRegistry);
@@ -951,6 +951,8 @@ public boolean recoverAndInitService() throws CloneNotSupportedException {
951951
//scheduleMessageService load after messageStore load success
952952
result = result && this.scheduleMessageService.load();
953953

954+
result = result && initLiteService();
955+
954956
for (BrokerAttachedPlugin brokerAttachedPlugin : brokerAttachedPlugins) {
955957
if (brokerAttachedPlugin != null) {
956958
result = result && brokerAttachedPlugin.load();
@@ -975,8 +977,6 @@ public boolean recoverAndInitService() throws CloneNotSupportedException {
975977

976978
initialRequestPipeline();
977979

978-
initLiteService();
979-
980980
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
981981
// Register a listener to reload SslContext
982982
try {
@@ -1153,9 +1153,9 @@ private void initialRequestPipeline() {
11531153
}
11541154
}
11551155

1156-
private void initLiteService() {
1156+
private boolean initLiteService() {
11571157
this.liteEventDispatcher.init();
1158-
this.liteLifecycleManager.init();
1158+
return this.liteLifecycleManager.init();
11591159
}
11601160

11611161
public void registerProcessor() {

broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ private boolean merge() {
109109
log.error("load json consumerOffset info failed, startup will exit");
110110
return false;
111111
}
112-
this.persist();
112+
this.persist0(); // ensure full persistence
113113
this.getDataVersion().assignNewOne(dataVersion);
114114
updateDataVersion();
115115
log.info("update offset from json, dataVersion:{}, offsetTable: {} ", this.getDataVersion(), JSON.toJSONString(this.getOffsetTable()));
@@ -159,12 +159,16 @@ public String configFilePath() {
159159

160160
@Override
161161
public synchronized void persist() {
162+
if (brokerController.getBrokerConfig().isPersistConsumerOffsetIncrementally()) {
163+
updateDataVersion();
164+
this.rocksDBConfigManager.flushWAL();
165+
return;
166+
}
167+
persist0();
168+
}
169+
170+
private void persist0() {
162171
if (rocksDBConfigManager.isLoaded()) {
163-
if (brokerController.getBrokerConfig().isPersistConsumerOffsetIncrementally()) {
164-
updateDataVersion();
165-
this.rocksDBConfigManager.flushWAL();
166-
return;
167-
}
168172
try (WriteBatch writeBatch = new WriteBatch()) {
169173
for (Entry<String, ConcurrentMap<Integer, Long>> entry : this.offsetTable.entrySet()) {
170174
putWriteBatch(writeBatch, entry.getKey(), entry.getValue());

broker/src/main/java/org/apache/rocketmq/broker/lite/AbstractLiteLifecycleManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,10 @@ public AbstractLiteLifecycleManager(BrokerController brokerController, LiteShard
6060
this.liteSharding = liteSharding;
6161
}
6262

63-
public void init() {
63+
public boolean init() {
6464
this.messageStore = brokerController.getMessageStore();
6565
assert messageStore != null;
66+
return true;
6667
}
6768

6869
/**

broker/src/main/java/org/apache/rocketmq/broker/lite/RocksDBLiteLifecycleManager.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.rocketmq.logging.org.slf4j.Logger;
2828
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
2929
import org.apache.rocketmq.store.RocksDBMessageStore;
30+
import org.apache.rocketmq.store.queue.CombineConsumeQueueStore;
3031
import org.apache.rocketmq.store.queue.RocksDBConsumeQueueOffsetTable;
3132
import org.apache.rocketmq.store.queue.RocksDBConsumeQueueStore;
3233
import org.apache.rocketmq.tieredstore.TieredMessageStore;
@@ -91,17 +92,30 @@ public List<Pair<String, String>> collectExpiredLiteTopic() {
9192
}
9293

9394
@Override
94-
public void init() {
95+
public boolean init() {
9596
super.init();
9697
if (messageStore instanceof TieredMessageStore) { // only support TieredMessageStore plugin
9798
messageStore = ((TieredMessageStore) messageStore).getDefaultStore();
9899
}
99-
if (!(messageStore instanceof RocksDBMessageStore)) {
100-
LOGGER.warn("init failed, not a RocksDB store. {}", messageStore.getClass());
101-
return; // startup with lite feature disabled
100+
101+
RocksDBConsumeQueueStore queueStore; // underlay rocksdb consume queue store
102+
if (messageStore instanceof RocksDBMessageStore) { // storeType = defaultRocksDB
103+
queueStore = (RocksDBConsumeQueueStore) messageStore.getQueueStore();
104+
} else { // storeType = default && double write enable
105+
if (!(messageStore.getQueueStore() instanceof CombineConsumeQueueStore)) {
106+
LOGGER.warn("unexpected, not a CombineConsumeQueueStore. {}", messageStore.getQueueStore().getClass());
107+
return false; // abort startup
108+
}
109+
CombineConsumeQueueStore combineConsumeQueueStore = (CombineConsumeQueueStore) messageStore.getQueueStore();
110+
queueStore = combineConsumeQueueStore.getRocksDBConsumeQueueStore();
111+
if (!messageStore.getMessageStoreConfig().isCombineCQUseRocksdbForLmq() || null == queueStore) {
112+
LOGGER.warn("unexpected, rocksdbCQ is not ready for LMQ.");
113+
return false; // abort startup
114+
}
115+
LOGGER.info("LiteLifecycleManager init with CombineConsumeQueueStore.");
102116
}
117+
103118
try {
104-
RocksDBConsumeQueueStore queueStore = (RocksDBConsumeQueueStore) messageStore.getQueueStore();
105119
RocksDBConsumeQueueOffsetTable cqOffsetTable = (RocksDBConsumeQueueOffsetTable) FieldUtils.readField(
106120
FieldUtils.getField(RocksDBConsumeQueueStore.class, "rocksDBConsumeQueueOffsetTable", true), queueStore);
107121
@SuppressWarnings("unchecked")
@@ -110,7 +124,9 @@ public void init() {
110124
maxCqOffsetTable = Collections.unmodifiableMap(innerMaxCqOffsetTable);
111125
} catch (Exception e) {
112126
LOGGER.error("LiteLifecycleManager-init error", e);
127+
return false;
113128
}
129+
return true;
114130
}
115131

116132
@Override

broker/src/main/java/org/apache/rocketmq/broker/processor/LiteManagerProcessor.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@
5858
import org.apache.rocketmq.remoting.protocol.header.GetParentTopicInfoRequestHeader;
5959
import org.apache.rocketmq.remoting.protocol.header.TriggerLiteDispatchRequestHeader;
6060
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
61+
import org.apache.rocketmq.store.queue.CombineConsumeQueueStore;
62+
import org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface;
6163

6264
import java.util.HashSet;
6365
import java.util.Map;
@@ -112,12 +114,18 @@ protected RemotingCommand getBrokerLiteInfo(ChannelHandlerContext ctx,
112114
body.setCurrentLmqNum(brokerController.getMessageStore().getQueueStore().getLmqNum());
113115
body.setLiteSubscriptionCount(brokerController.getLiteSubscriptionRegistry().getActiveSubscriptionNum());
114116
body.setOrderInfoCount(brokerController.getPopLiteMessageProcessor().getConsumerOrderInfoManager().getOrderInfoCount());
115-
body.setCqTableSize(brokerController.getMessageStore().getQueueStore().getConsumeQueueTable().size());
116117
body.setOffsetTableSize(brokerController.getConsumerOffsetManager().getOffsetTable().size());
117118
body.setEventMapSize(brokerController.getLiteEventDispatcher().getEventMapSize());
118119
body.setTopicMeta(LiteMetadataUtil.getTopicTtlMap(brokerController));
119120
body.setGroupMeta(LiteMetadataUtil.getSubscriberGroupMap(brokerController));
120121

122+
ConsumeQueueStoreInterface consumeQueueStore = brokerController.getMessageStore().getQueueStore();
123+
if (consumeQueueStore instanceof CombineConsumeQueueStore
124+
&& brokerController.getMessageStoreConfig().isCombineCQUseRocksdbForLmq()) {
125+
consumeQueueStore = ((CombineConsumeQueueStore) consumeQueueStore).getRocksDBConsumeQueueStore(); // not null
126+
}
127+
body.setCqTableSize(consumeQueueStore.getConsumeQueueTable().size());
128+
121129
response.setBody(body.encode());
122130
response.setCode(ResponseCode.SUCCESS);
123131
response.setRemark(null);

broker/src/test/java/org/apache/rocketmq/broker/lite/LiteTestUtil.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,20 @@
3737

3838
public class LiteTestUtil {
3939

40+
public static MessageStore buildMessageStore(final BrokerConfig brokerConfig,
41+
MessageStoreConfig storeConfig, final ConcurrentMap<String, TopicConfig> topicConfigTable,
42+
boolean isRocksDBStore) throws Exception {
43+
44+
BrokerStatsManager brokerStatsManager = new BrokerStatsManager(brokerConfig);
45+
MessageStore messageStore;
46+
if (isRocksDBStore) {
47+
messageStore = new RocksDBMessageStore(storeConfig, brokerStatsManager, null, brokerConfig, topicConfigTable);
48+
} else {
49+
messageStore = new DefaultMessageStore(storeConfig, brokerStatsManager, null, brokerConfig, topicConfigTable);
50+
}
51+
return messageStore;
52+
}
53+
4054
public static MessageStore buildMessageStore(String storePathRootDir, final BrokerConfig brokerConfig,
4155
final ConcurrentMap<String, TopicConfig> topicConfigTable, boolean isRocksDBStore) throws Exception {
4256
MessageStoreConfig storeConfig = new MessageStoreConfig();
@@ -51,14 +65,7 @@ public static MessageStore buildMessageStore(String storePathRootDir, final Brok
5165
storeConfig.setEnableMultiDispatch(true);
5266
storeConfig.setStorePathRootDir(storePathRootDir);
5367

54-
BrokerStatsManager brokerStatsManager = new BrokerStatsManager(brokerConfig);
55-
MessageStore messageStore;
56-
if (isRocksDBStore) {
57-
messageStore = new RocksDBMessageStore(storeConfig, brokerStatsManager, null, brokerConfig, topicConfigTable);
58-
} else {
59-
messageStore = new DefaultMessageStore(storeConfig, brokerStatsManager, null, brokerConfig, topicConfigTable);
60-
}
61-
return messageStore;
68+
return buildMessageStore(brokerConfig, storeConfig, topicConfigTable, isRocksDBStore);
6269
}
6370

6471
public static MessageExtBrokerInner buildMessage(String parentTopic, String liteTopic) {

broker/src/test/java/org/apache/rocketmq/broker/lite/RocksDBLiteLifecycleManagerTest.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@
2828
import org.apache.rocketmq.common.attribute.TopicMessageType;
2929
import org.apache.rocketmq.common.lite.LiteUtil;
3030
import org.apache.rocketmq.store.MessageStore;
31+
import org.apache.rocketmq.store.config.MessageStoreConfig;
3132
import org.apache.rocketmq.store.plugin.AbstractPluginMessageStore;
3233
import org.apache.rocketmq.store.plugin.MessageStorePluginContext;
34+
import org.apache.rocketmq.store.queue.AbstractConsumeQueueStore;
3335
import org.apache.rocketmq.tieredstore.TieredMessageStore;
3436
import org.junit.AfterClass;
3537
import org.junit.Assert;
@@ -125,9 +127,11 @@ public void testInit_otherStore() {
125127

126128
when(brokerController.getBrokerConfig()).thenReturn(BROKER_CONFIG);
127129
when(brokerController.getMessageStore()).thenReturn(pluginMessageStore);
130+
when(pluginMessageStore.getQueueStore()).thenReturn(Mockito.mock(AbstractConsumeQueueStore.class));
128131

129132
RocksDBLiteLifecycleManager manager = new RocksDBLiteLifecycleManager(brokerController, liteSharding);
130-
manager.init();
133+
134+
Assert.assertFalse(manager.init());
131135
Assert.assertThrows(NullPointerException.class, () -> manager.getMaxOffsetInQueue("HW"));
132136
}
133137

@@ -239,4 +243,33 @@ public void testCleanByParentTopic() throws Exception {
239243
Assert.assertEquals(0, liteLifecycleManager.getMaxOffsetInQueue(lmqName));
240244
}
241245
}
246+
247+
@Test
248+
public void testInit_combineConsumeQueueStore() throws Exception {
249+
MessageStoreConfig storeConfig = new MessageStoreConfig();
250+
storeConfig.setStorePathRootDir(
251+
System.getProperty("java.io.tmpdir") + File.separator + "store-rocksDBLifecycleTest-" + UUID.randomUUID());
252+
storeConfig.setRocksdbCQDoubleWriteEnable(true);
253+
MessageStore messageStore = LiteTestUtil.buildMessageStore(BROKER_CONFIG, storeConfig, TOPIC_CONFIG_TABLE, false);
254+
BrokerController brokerController = Mockito.mock(BrokerController.class);
255+
LiteSharding liteSharding = Mockito.mock(LiteSharding.class);
256+
when(brokerController.getBrokerConfig()).thenReturn(BROKER_CONFIG);
257+
when(brokerController.getMessageStore()).thenReturn(messageStore);
258+
259+
// enable
260+
storeConfig.setCombineCQUseRocksdbForLmq(true);
261+
RocksDBLiteLifecycleManager manager = new RocksDBLiteLifecycleManager(brokerController, liteSharding);
262+
Assert.assertTrue(manager.init());
263+
Assert.assertEquals(0, manager.getMaxOffsetInQueue(UUID.randomUUID().toString()));
264+
265+
// disable
266+
storeConfig.setCombineCQUseRocksdbForLmq(false);
267+
RocksDBLiteLifecycleManager manager2 = new RocksDBLiteLifecycleManager(brokerController, liteSharding);
268+
Assert.assertFalse(manager2.init());
269+
Assert.assertThrows(NullPointerException.class, () -> manager2.getMaxOffsetInQueue("HW"));
270+
271+
messageStore.shutdown();
272+
messageStore.destroy();
273+
UtilAll.deleteFile(new File(storeConfig.getStorePathRootDir()));
274+
}
242275
}

broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,58 @@ public void testCommitOffset_persist_incrementally() {
154154
Assert.assertTrue(consumerOffsetManager.getOffsetTable().containsKey(key)); // reload from kv
155155
}
156156

157+
@Test
158+
public void testLoadAndMerge_persist_periodically() {
159+
brokerConfig.setPersistConsumerOffsetIncrementally(false);
160+
String group = UUID.randomUUID().toString();
161+
String topic = UUID.randomUUID().toString();
162+
String key = topic + TOPIC_GROUP_SEPARATOR + group;
163+
164+
ConsumerOffsetManager jsonConsumerOffsetManager = new ConsumerOffsetManager(brokerController);
165+
jsonConsumerOffsetManager.commitOffset("ClientID", group, topic, 0, 1);
166+
jsonConsumerOffsetManager.updateDataVersion();
167+
jsonConsumerOffsetManager.persist();
168+
169+
Assert.assertFalse(consumerOffsetManager.getOffsetTable().containsKey(key));
170+
171+
consumerOffsetManager.stop();
172+
consumerOffsetManager.getOffsetTable().clear();
173+
consumerOffsetManager.load(); // merge from json file
174+
Assert.assertTrue(consumerOffsetManager.getOffsetTable().containsKey(key));
175+
176+
UtilAll.deleteFile(new File(jsonConsumerOffsetManager.configFilePath()));
177+
consumerOffsetManager.stop();
178+
consumerOffsetManager.getOffsetTable().clear();
179+
consumerOffsetManager.load();
180+
Assert.assertTrue(consumerOffsetManager.getOffsetTable().containsKey(key)); // already persisted in kv
181+
}
182+
183+
@Test
184+
public void testLoadAndMerge_persist_incrementally() {
185+
brokerConfig.setPersistConsumerOffsetIncrementally(true);
186+
String group = UUID.randomUUID().toString();
187+
String topic = UUID.randomUUID().toString();
188+
String key = topic + TOPIC_GROUP_SEPARATOR + group;
189+
190+
ConsumerOffsetManager jsonConsumerOffsetManager = new ConsumerOffsetManager(brokerController);
191+
jsonConsumerOffsetManager.commitOffset("ClientID", group, topic, 0, 1);
192+
jsonConsumerOffsetManager.updateDataVersion();
193+
jsonConsumerOffsetManager.persist();
194+
195+
Assert.assertFalse(consumerOffsetManager.getOffsetTable().containsKey(key));
196+
197+
consumerOffsetManager.stop();
198+
consumerOffsetManager.getOffsetTable().clear();
199+
consumerOffsetManager.load(); // merge from json file
200+
Assert.assertTrue(consumerOffsetManager.getOffsetTable().containsKey(key));
201+
202+
UtilAll.deleteFile(new File(jsonConsumerOffsetManager.configFilePath()));
203+
consumerOffsetManager.stop();
204+
consumerOffsetManager.getOffsetTable().clear();
205+
consumerOffsetManager.load();
206+
Assert.assertTrue(consumerOffsetManager.getOffsetTable().containsKey(key)); // already persisted in kv
207+
}
208+
157209
@Test
158210
public void testRemoveConsumerOffset() {
159211
String group = UUID.randomUUID().toString();

store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,7 @@ public class MessageStoreConfig {
484484
private String combineAssignOffsetCQType = StoreType.DEFAULT.getStoreType();
485485
private boolean combineCQEnableCheckSelf = false;
486486
private int combineCQMaxExtraSearchCommitLogFiles = 3;
487+
private boolean combineCQUseRocksdbForLmq = false;
487488

488489
/**
489490
* If ConsumeQueueStore is RocksDB based, this option is to configure bottom-most tier compression type.
@@ -520,6 +521,8 @@ public class MessageStoreConfig {
520521
// Shared byte buffer manager configuration
521522
private int sharedByteBufferNum = 16;
522523

524+
private boolean useSeparateStorePathForRocksdbCQ = false;
525+
523526
public String getRocksdbCompressionType() {
524527
return rocksdbCompressionType;
525528
}
@@ -2110,6 +2113,14 @@ public void setCombineCQMaxExtraSearchCommitLogFiles(int combineCQMaxExtraSearch
21102113
this.combineCQMaxExtraSearchCommitLogFiles = combineCQMaxExtraSearchCommitLogFiles;
21112114
}
21122115

2116+
public boolean isCombineCQUseRocksdbForLmq() {
2117+
return combineCQUseRocksdbForLmq;
2118+
}
2119+
2120+
public void setCombineCQUseRocksdbForLmq(boolean combineCQUseRocksdbForLmq) {
2121+
this.combineCQUseRocksdbForLmq = combineCQUseRocksdbForLmq;
2122+
}
2123+
21132124
public boolean isEnableLogConsumeQueueRepeatedlyBuildWhenRecover() {
21142125
return enableLogConsumeQueueRepeatedlyBuildWhenRecover;
21152126
}
@@ -2302,4 +2313,12 @@ public boolean isAppendTopicForTimerDeleteKey() {
23022313
public void setAppendTopicForTimerDeleteKey(boolean appendTopicForTimerDeleteKey) {
23032314
this.appendTopicForTimerDeleteKey = appendTopicForTimerDeleteKey;
23042315
}
2316+
2317+
public boolean isUseSeparateStorePathForRocksdbCQ() {
2318+
return useSeparateStorePathForRocksdbCQ;
2319+
}
2320+
2321+
public void setUseSeparateStorePathForRocksdbCQ(boolean useSeparateStorePathForRocksdbCQ) {
2322+
this.useSeparateStorePathForRocksdbCQ = useSeparateStorePathForRocksdbCQ;
2323+
}
23052324
}

store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ public static String getStorePathBatchConsumeQueue(final String rootDir) {
3131
return rootDir + File.separator + "batchconsumequeue";
3232
}
3333

34+
public static String getStorePathRocksDBConsumeQueue(final String rootDir) {
35+
return rootDir + File.separator + "consumequeue_r";
36+
}
37+
3438
public static String getStorePathIndex(final String rootDir) {
3539
return rootDir + File.separator + "index";
3640
}

0 commit comments

Comments
 (0)