Skip to content

Commit 7a08b4c

Browse files
zk-drizzledrizzle.zkzhouli11
authored
[RIP-82] Implement Timer message, transaction message, and index based on RocksDB (#9787)
* add isWakeCommitWhenPutMessage for AIO * Implemented Timer message, Transaction Message, Index based on rocksdb * add timeline roll interval can be config * optimize the code * optimize the code * add sone logic for recover commitlog * optimize the code * update * update * optimize the code * update * update * optimize the code * optimize the code * update * update * update * update * update * update * update * update * update * update * update * update Change-Id: I700798ef625187ac32c16c8ea5aae5f2debfdddf * update Change-Id: I1d2dfa24317d2ad7c0ca41d90cd2b3c1ba260b0b * update Change-Id: I974e30eb909722ea2a17a14e505759bf20f4f860 * update Change-Id: Ica01fbd08a2d60e9a0f4ab54b41ae4768cc4fa5d * update Change-Id: I764ce6cea3159b025bf1e6e65b60e20514cecb4f * update Change-Id: I0cc735896e1266934293ec06cb4f07746a1894a1 * update Change-Id: I6538b6a60631c62e162bf511ae20185a07b72508 * update Change-Id: I03f83b9e8199f3595ca28471df4a2e94671fbaae * update Change-Id: I5b19b671fa0b40cd2aba63f1d0eb654b92775cb4 * update the code Change-Id: I22788d112b7f214310719e2d3ebad839ef818e67 * update Change-Id: I7da89fe635b58ac5026095a8e484fa46f68521e7 * update the code Change-Id: I5eade6c31803b8cf05c1a4bb83c226d57f4980c7 * update Change-Id: Id67f600fad1a876926cd505dba047cc82011160c * update Change-Id: Ia4b0e2a21aa5e12570a80713432fd48ddfb210e6 * optimize the code Change-Id: Idf8080c7b17c25e14fe34ef6bad1e1150dab58d6 * update Change-Id: I904f0120a728b4eb87226159f9c0cde3d18ef26a * update code Change-Id: I21653d6bcad905daa8f10d12298d1d27535af9d9 * update Change-Id: I712b9cde9a18c730fd020ea76f05560e594a9edd * update Change-Id: Iafe10c007e9b5057212e2dcfb21671f970b1d344 * update Change-Id: I40c1c7f54852ee033e0caa22573436ee505a27b8 * optimize the code Change-Id: Iae006b01608df4f363c63a0ae876dbf63cd56fbd --------- Co-authored-by: drizzle.zk <drizzle.zk@alibaba-inc.com> Co-authored-by: zhouli11 <04081337@163.com>
1 parent f73fe6c commit 7a08b4c

50 files changed

Lines changed: 4379 additions & 78 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@
117117
import org.apache.rocketmq.broker.transaction.TransactionMetricsFlushService;
118118
import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
119119
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
120+
import org.apache.rocketmq.broker.transaction.rocksdb.TransactionalMessageRocksDBService;
120121
import org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageCheckListener;
121122
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge;
122123
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl;
@@ -178,6 +179,8 @@
178179
import org.apache.rocketmq.store.timer.TimerCheckpoint;
179180
import org.apache.rocketmq.store.timer.TimerMessageStore;
180181
import org.apache.rocketmq.store.timer.TimerMetrics;
182+
import org.apache.rocketmq.store.timer.rocksdb.TimerMessageRocksDBStore;
183+
import org.apache.rocketmq.store.transaction.TransMessageRocksDBStore;
181184

182185
public class BrokerController {
183186
protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -269,6 +272,8 @@ public class BrokerController {
269272
private BrokerStats brokerStats;
270273
private InetSocketAddress storeHost;
271274
private TimerMessageStore timerMessageStore;
275+
private TimerMessageRocksDBStore timerMessageRocksDBStore;
276+
private TransMessageRocksDBStore transMessageRocksDBStore;
272277
private TimerCheckpoint timerCheckpoint;
273278
protected BrokerFastFailure brokerFastFailure;
274279
private Configuration configuration;
@@ -277,6 +282,7 @@ public class BrokerController {
277282
protected TransactionalMessageCheckService transactionalMessageCheckService;
278283
protected TransactionalMessageService transactionalMessageService;
279284
protected AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;
285+
protected TransactionalMessageRocksDBService transactionalMessageRocksDBService;
280286
protected volatile boolean shutdown = false;
281287
protected ShutdownHook shutdownHook;
282288
private volatile boolean isScheduleServiceStart = false;
@@ -865,6 +871,14 @@ public boolean initializeMessageStore() {
865871
this.timerMessageStore = new TimerMessageStore(messageStore, messageStoreConfig, timerCheckpoint, timerMetrics, brokerStatsManager);
866872
this.timerMessageStore.registerEscapeBridgeHook(msg -> escapeBridge.putMessage(msg));
867873
this.messageStore.setTimerMessageStore(this.timerMessageStore);
874+
if (messageStoreConfig.isTimerRocksDBEnable()) {
875+
this.timerMessageRocksDBStore = new TimerMessageRocksDBStore(messageStore, timerMetrics, brokerStatsManager);
876+
this.messageStore.setTimerMessageRocksDBStore(timerMessageRocksDBStore);
877+
}
878+
}
879+
if (messageStoreConfig.isTransRocksDBEnable()) {
880+
this.transMessageRocksDBStore = new TransMessageRocksDBStore(messageStore, brokerStatsManager, new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
881+
this.messageStore.setTransRocksDBStore(transMessageRocksDBStore);
868882
}
869883
} catch (Exception e) {
870884
result = false;
@@ -904,6 +918,9 @@ public boolean recoverAndInitService() throws CloneNotSupportedException {
904918

905919
if (messageStoreConfig.isTimerWheelEnable()) {
906920
result = result && this.timerMessageStore.load();
921+
if (messageStoreConfig.isTimerRocksDBEnable()) {
922+
result = result && this.timerMessageRocksDBStore.load();
923+
}
907924
}
908925

909926
//scheduleMessageService load after messageStore load success
@@ -1060,6 +1077,10 @@ private void initialTransaction() {
10601077
this.transactionMetricsFlushService = new TransactionMetricsFlushService(this);
10611078
this.transactionMetricsFlushService.start();
10621079

1080+
if (messageStoreConfig.isTransRocksDBEnable()) {
1081+
this.transactionalMessageRocksDBService = new TransactionalMessageRocksDBService(messageStore, this);
1082+
this.transactionalMessageRocksDBService.start();
1083+
}
10631084
}
10641085

10651086
private void initialRpcHooks() {
@@ -1400,6 +1421,14 @@ public void setTimerMessageStore(TimerMessageStore timerMessageStore) {
14001421
this.timerMessageStore = timerMessageStore;
14011422
}
14021423

1424+
public TimerMessageRocksDBStore getTimerMessageRocksDBStore() {
1425+
return timerMessageRocksDBStore;
1426+
}
1427+
1428+
public void setTimerMessageRocksDBStore(TimerMessageRocksDBStore timerMessageRocksDBStore) {
1429+
this.timerMessageRocksDBStore = timerMessageRocksDBStore;
1430+
}
1431+
14031432
public AckMessageProcessor getAckMessageProcessor() {
14041433
return ackMessageProcessor;
14051434
}
@@ -1473,6 +1502,10 @@ protected void shutdownBasicService() {
14731502
this.transactionMetricsFlushService.shutdown();
14741503
}
14751504

1505+
if (this.transactionalMessageRocksDBService != null) {
1506+
this.transactionalMessageRocksDBService.shutdown();
1507+
}
1508+
14761509
if (this.notificationProcessor != null) {
14771510
this.notificationProcessor.getPopLongPollingService().shutdown();
14781511
}
@@ -1488,6 +1521,15 @@ protected void shutdownBasicService() {
14881521
if (this.timerMessageStore != null) {
14891522
this.timerMessageStore.shutdown();
14901523
}
1524+
1525+
if (this.timerMessageRocksDBStore != null) {
1526+
this.timerMessageRocksDBStore.shutdown();
1527+
}
1528+
1529+
if (this.transMessageRocksDBStore != null) {
1530+
this.transMessageRocksDBStore.shutdown();
1531+
}
1532+
14911533
if (this.fileWatchService != null) {
14921534
this.fileWatchService.shutdown();
14931535
}
@@ -1562,7 +1604,7 @@ protected void shutdownBasicService() {
15621604
if (this.transactionalMessageCheckService != null) {
15631605
this.transactionalMessageCheckService.shutdown(false);
15641606
}
1565-
1607+
15661608
if (this.loadBalanceExecutor != null) {
15671609
this.loadBalanceExecutor.shutdown();
15681610
}
@@ -1693,6 +1735,10 @@ protected void startBasicService() throws Exception {
16931735
this.timerMessageStore.start();
16941736
}
16951737

1738+
if (this.timerMessageRocksDBStore != null && this.messageStoreConfig.isTimerRocksDBEnable()) {
1739+
this.timerMessageRocksDBStore.start();
1740+
}
1741+
16961742
if (this.replicasManager != null) {
16971743
this.replicasManager.start();
16981744
}

broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@
237237

238238
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_INVOCATION_STATUS;
239239
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
240+
import static org.apache.rocketmq.common.message.MessageConst.TIMER_ENGINE_TYPE;
240241
import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
241242

242243
public class AdminBrokerProcessor implements NettyRequestProcessor {
@@ -406,6 +407,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx,
406407
return this.listAcl(ctx, request);
407408
case RequestCode.POP_ROLLBACK:
408409
return this.transferPopToFsStore(ctx, request);
410+
case RequestCode.SWITCH_TIMER_ENGINE:
411+
return this.switchTimerEngine(ctx, request);
409412
default:
410413
return getUnknownCmdResponse(ctx, request);
411414
}
@@ -2880,7 +2883,11 @@ private RemotingCommand resumeCheckHalfMessage(ChannelHandlerContext ctx,
28802883

28812884
private MessageExtBrokerInner toMessageExtBrokerInner(MessageExt msgExt) {
28822885
MessageExtBrokerInner inner = new MessageExtBrokerInner();
2883-
inner.setTopic(TransactionalMessageUtil.buildHalfTopic());
2886+
if (brokerController.getMessageStoreConfig().isTransRocksDBEnable() && !brokerController.getMessageStoreConfig().isTransWriteOriginTransHalfEnable()) {
2887+
inner.setTopic(TransactionalMessageUtil.buildHalfTopicForRocksDB());
2888+
} else {
2889+
inner.setTopic(TransactionalMessageUtil.buildHalfTopic());
2890+
}
28842891
inner.setBody(msgExt.getBody());
28852892
inner.setFlag(msgExt.getFlag());
28862893
MessageAccessor.setProperties(inner, msgExt.getProperties());
@@ -3409,4 +3416,64 @@ private RemotingCommand transferPopToFsStore(ChannelHandlerContext ctx, Remoting
34093416
}
34103417
return response;
34113418
}
3419+
3420+
private synchronized RemotingCommand switchTimerEngine(ChannelHandlerContext ctx, RemotingCommand request) {
3421+
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
3422+
if (!this.brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
3423+
LOGGER.info("switchTimerEngine error, broker timerWheelEnable is false");
3424+
response.setCode(ResponseCode.INVALID_PARAMETER);
3425+
response.setRemark("broker timerWheelEnable is false");
3426+
return response;
3427+
}
3428+
if (null == request.getExtFields()) {
3429+
LOGGER.info("switchTimerEngine extFields is null");
3430+
response.setCode(ResponseCode.INVALID_PARAMETER);
3431+
response.setRemark("param error, extFields is null");
3432+
return response;
3433+
}
3434+
String engineType = request.getExtFields().get(TIMER_ENGINE_TYPE);
3435+
if (StringUtils.isEmpty(engineType) || !MessageConst.TIMER_ENGINE_ROCKSDB_TIMELINE.equals(engineType) && !MessageConst.TIMER_ENGINE_FILE_TIME_WHEEL.equals(engineType)) {
3436+
response.setCode(ResponseCode.INVALID_PARAMETER);
3437+
response.setRemark("param error");
3438+
return response;
3439+
}
3440+
try {
3441+
Properties properties = new Properties();
3442+
boolean result = false;
3443+
if (MessageConst.TIMER_ENGINE_ROCKSDB_TIMELINE.equals(engineType)) {
3444+
if (this.brokerController.getTimerMessageRocksDBStore() == null) {
3445+
response.setCode(ResponseCode.INVALID_PARAMETER);
3446+
response.setRemark("timerUseRocksDB muse be configured true when broker start");
3447+
return response;
3448+
}
3449+
result = this.brokerController.getTimerMessageRocksDBStore().restart();
3450+
if (result) {
3451+
properties.put("timerStopEnqueue", Boolean.TRUE.toString());
3452+
properties.put("timerUseRocksDB", Boolean.TRUE.toString());
3453+
properties.put("timerRocksDBStopScan", Boolean.FALSE.toString());
3454+
}
3455+
} else {
3456+
result = this.brokerController.getTimerMessageStore().restart();
3457+
if (result) {
3458+
properties.put("timerRocksDBStopScan", Boolean.TRUE.toString());
3459+
properties.put("timerStopEnqueue", Boolean.FALSE.toString());
3460+
}
3461+
}
3462+
if (result) {
3463+
this.brokerController.getConfiguration().update(properties);
3464+
response.setCode(ResponseCode.SUCCESS);
3465+
response.setRemark("switch timer engine success");
3466+
LOGGER.info("switchTimerEngine success");
3467+
} else {
3468+
response.setCode(ResponseCode.SYSTEM_ERROR);
3469+
response.setRemark("switch timer engine error");
3470+
LOGGER.info("switchTimerEngine error");
3471+
}
3472+
} catch (Exception e) {
3473+
response.setCode(ResponseCode.SYSTEM_ERROR);
3474+
response.setRemark("switch timer engine error");
3475+
LOGGER.error("switchTimerEngine error : {}", e.getMessage());
3476+
}
3477+
return response;
3478+
}
34123479
}

broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.rocketmq.common.message.MessageExt;
3131
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
3232
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
33+
import org.apache.rocketmq.common.topic.TopicValidator;
3334
import org.apache.rocketmq.logging.org.slf4j.Logger;
3435
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
3536
import org.apache.rocketmq.remoting.common.RemotingHelper;
@@ -146,7 +147,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
146147
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
147148
RemotingCommand sendResult = sendFinalMessage(msgInner);
148149
if (sendResult.getCode() == ResponseCode.SUCCESS) {
149-
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
150+
deletePrepareMessage(result);
150151
// successful committed, then total num of half-messages minus 1
151152
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(msgInner.getTopic(), -1);
152153
this.brokerController.getBrokerMetricsManager().getCommitMessagesTotal().add(1, this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
@@ -173,7 +174,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
173174
}
174175
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
175176
if (res.getCode() == ResponseCode.SUCCESS) {
176-
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
177+
deletePrepareMessage(result);
177178
// roll back, then total num of half-messages minus 1
178179
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(result.getPrepareMessage().getProperty(MessageConst.PROPERTY_REAL_TOPIC), -1);
179180
this.brokerController.getBrokerMetricsManager().getRollBackMessagesTotal().add(1, this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
@@ -188,6 +189,26 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
188189
return response;
189190
}
190191

192+
private void deletePrepareMessage(OperationResult result) {
193+
if (null == result || null == result.getPrepareMessage()) {
194+
LOGGER.error("deletePrepareMessage param error, result is null or prepareMessage is null");
195+
return;
196+
}
197+
MessageExt prepareMessage = result.getPrepareMessage();
198+
String halfTopic = prepareMessage.getTopic();
199+
if (StringUtils.isEmpty(halfTopic)) {
200+
LOGGER.error("deletePrepareMessage halfTopic is empty, halfTopic: {}", halfTopic);
201+
return;
202+
}
203+
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(halfTopic)) {
204+
this.brokerController.getTransactionalMessageService().deletePrepareMessage(prepareMessage);
205+
} else if (this.brokerController.getMessageStoreConfig().isTransRocksDBEnable() && TopicValidator.RMQ_SYS_ROCKSDB_TRANS_HALF_TOPIC.equals(halfTopic)) {
206+
this.brokerController.getMessageStore().getTransRocksDBStore().deletePrepareMessage(prepareMessage);
207+
} else {
208+
LOGGER.warn("deletePrepareMessage error, topic of half message is: {}, transRocksDBEnable: {}", halfTopic, this.brokerController.getMessageStoreConfig().isTransRocksDBEnable());
209+
}
210+
}
211+
191212
/**
192213
* If you specify a custom first check time CheckImmunityTimeInSeconds,
193214
* And the commit/rollback request whose validity period exceeds CheckImmunityTimeInSeconds and is not checked back will be processed and failed
@@ -265,10 +286,17 @@ private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {
265286
: TopicFilterType.SINGLE_TAG;
266287
long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
267288
msgInner.setTagsCode(tagsCodeValue);
268-
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
269-
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
289+
String checkTimes = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES);
290+
if (StringUtils.isEmpty(checkTimes) && this.brokerController.getMessageStoreConfig().isTransRocksDBEnable() && null != this.brokerController.getMessageStore().getTransRocksDBStore()) {
291+
Integer checkTimesRocksDB = this.brokerController.getMessageStore().getTransRocksDBStore().getCheckTimes(msgInner.getTopic(), msgInner.getTransactionId(), msgExt.getCommitLogOffset());
292+
if (null != checkTimesRocksDB && checkTimesRocksDB >= 0) {
293+
msgExt.putUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, String.valueOf(checkTimesRocksDB));
294+
}
295+
}
296+
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(MessageDecoder.messageProperties2String(msgExt.getProperties())));
270297
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC);
271298
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID);
299+
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
272300
return msgInner;
273301
}
274302

broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@
2121
import io.netty.channel.FileRegion;
2222
import io.opentelemetry.api.common.Attributes;
2323
import java.util.concurrent.TimeUnit;
24+
import org.apache.commons.lang3.StringUtils;
2425
import org.apache.rocketmq.broker.BrokerController;
2526
import org.apache.rocketmq.broker.pagecache.OneMessageTransfer;
2627
import org.apache.rocketmq.broker.pagecache.QueryMessageTransfer;
2728
import org.apache.rocketmq.common.MixAll;
2829
import org.apache.rocketmq.common.constant.LoggerName;
30+
import org.apache.rocketmq.common.message.MessageConst;
2931
import org.apache.rocketmq.logging.org.slf4j.Logger;
3032
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
3133
import org.apache.rocketmq.remoting.common.RemotingHelper;
@@ -84,16 +86,19 @@ public RemotingCommand queryMessage(ChannelHandlerContext ctx, RemotingCommand r
8486
.decodeCommandCustomHeader(QueryMessageRequestHeader.class);
8587

8688
response.setOpaque(request.getOpaque());
87-
88-
String isUniqueKey = request.getExtFields().get(MixAll.UNIQUE_MSG_QUERY_FLAG);
89-
if (isUniqueKey != null && isUniqueKey.equals("true")) {
89+
String indexType = requestHeader.getIndexType();
90+
String lastKey = requestHeader.getLastKey();
91+
String isUniqueKey = null;
92+
if (null != request.getExtFields()) {
93+
isUniqueKey = request.getExtFields().get(MixAll.UNIQUE_MSG_QUERY_FLAG);
94+
}
95+
if (!StringUtils.isEmpty(isUniqueKey) && Boolean.parseBoolean(isUniqueKey)) {
9096
requestHeader.setMaxNum(this.brokerController.getMessageStoreConfig().getDefaultQueryMaxNum());
97+
indexType = MessageConst.INDEX_UNIQUE_TYPE;
98+
} else if (StringUtils.isEmpty(indexType)) {
99+
indexType = MessageConst.INDEX_KEY_TYPE;
91100
}
92-
93-
final QueryMessageResult queryMessageResult =
94-
this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(),
95-
requestHeader.getKey(), requestHeader.getMaxNum(), requestHeader.getBeginTimestamp(),
96-
requestHeader.getEndTimestamp());
101+
final QueryMessageResult queryMessageResult = this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(), requestHeader.getKey(), requestHeader.getMaxNum(), requestHeader.getBeginTimestamp(), requestHeader.getEndTimestamp(), indexType, lastKey);
97102
assert queryMessageResult != null;
98103

99104
responseHeader.setIndexLastUpdatePhyoffset(queryMessageResult.getIndexLastUpdatePhyoffset());

0 commit comments

Comments
 (0)