Skip to content

Commit 1b6a919

Browse files
f1amingoimzszhouli11
authored
[RIP-83] Lite Topic: A New Message Model (#9800)
Co-authored-by: imzs <imzs@foxmail.com> Co-authored-by: zhouli11 <04081337@163.com>
1 parent 45fc4e0 commit 1b6a919

184 files changed

Lines changed: 14708 additions & 313 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

auth/src/main/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilder.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import apache.rocketmq.v2.Subscription;
3232
import apache.rocketmq.v2.SubscriptionEntry;
3333
import apache.rocketmq.v2.TelemetryCommand;
34+
import apache.rocketmq.v2.SyncLiteSubscriptionRequest;
3435
import com.google.protobuf.GeneratedMessageV3;
3536
import io.grpc.Metadata;
3637
import io.netty.channel.ChannelHandlerContext;
@@ -124,6 +125,13 @@ public List<DefaultAuthorizationContext> build(Metadata metadata, GeneratedMessa
124125
}
125126
result = newSubContexts(metadata, request.getGroup(), request.getMessageQueue().getTopic());
126127
}
128+
if (message instanceof SyncLiteSubscriptionRequest) {
129+
SyncLiteSubscriptionRequest request = (SyncLiteSubscriptionRequest) message;
130+
if (request.getLiteTopicSetCount() <= 0) {
131+
return null;
132+
}
133+
result = newSubContexts(metadata, request.getGroup(), request.getTopic());
134+
}
127135
if (message instanceof AckMessageRequest) {
128136
AckMessageRequest request = (AckMessageRequest) message;
129137
result = newSubContexts(metadata, request.getGroup(), request.getTopic());

broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

Lines changed: 124 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,14 @@
7373
import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
7474
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
7575
import org.apache.rocketmq.broker.latency.BrokerFastFailure;
76+
import org.apache.rocketmq.broker.lite.AbstractLiteLifecycleManager;
77+
import org.apache.rocketmq.broker.lite.LiteEventDispatcher;
78+
import org.apache.rocketmq.broker.lite.LiteSubscriptionRegistry;
79+
import org.apache.rocketmq.broker.lite.LiteSubscriptionRegistryImpl;
80+
import org.apache.rocketmq.broker.lite.LiteLifecycleManager;
81+
import org.apache.rocketmq.broker.lite.LiteSharding;
82+
import org.apache.rocketmq.broker.lite.LiteShardingImpl;
83+
import org.apache.rocketmq.broker.lite.RocksDBLiteLifecycleManager;
7684
import org.apache.rocketmq.broker.longpolling.LmqPullRequestHoldService;
7785
import org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener;
7886
import org.apache.rocketmq.broker.longpolling.PullRequestHoldService;
@@ -93,10 +101,13 @@
93101
import org.apache.rocketmq.broker.processor.ClientManageProcessor;
94102
import org.apache.rocketmq.broker.processor.ConsumerManageProcessor;
95103
import org.apache.rocketmq.broker.processor.EndTransactionProcessor;
104+
import org.apache.rocketmq.broker.processor.LiteManagerProcessor;
105+
import org.apache.rocketmq.broker.processor.LiteSubscriptionCtlProcessor;
96106
import org.apache.rocketmq.broker.processor.NotificationProcessor;
97107
import org.apache.rocketmq.broker.processor.PeekMessageProcessor;
98108
import org.apache.rocketmq.broker.processor.PollingInfoProcessor;
99109
import org.apache.rocketmq.broker.processor.PopInflightMessageCounter;
110+
import org.apache.rocketmq.broker.processor.PopLiteMessageProcessor;
100111
import org.apache.rocketmq.broker.processor.PopMessageProcessor;
101112
import org.apache.rocketmq.broker.processor.PullMessageProcessor;
102113
import org.apache.rocketmq.broker.processor.QueryAssignmentProcessor;
@@ -206,12 +217,19 @@ public class BrokerController {
206217
protected final PullMessageProcessor pullMessageProcessor;
207218
protected final PeekMessageProcessor peekMessageProcessor;
208219
protected final PopMessageProcessor popMessageProcessor;
220+
protected final PopLiteMessageProcessor popLiteMessageProcessor;
209221
protected final AckMessageProcessor ackMessageProcessor;
210222
protected final ChangeInvisibleTimeProcessor changeInvisibleTimeProcessor;
211223
protected final NotificationProcessor notificationProcessor;
212224
protected final PollingInfoProcessor pollingInfoProcessor;
213225
protected final QueryAssignmentProcessor queryAssignmentProcessor;
214226
protected final ClientManageProcessor clientManageProcessor;
227+
protected final LiteSubscriptionCtlProcessor liteSubscriptionCtlProcessor;
228+
protected final LiteSharding liteSharding;
229+
protected final AbstractLiteLifecycleManager liteLifecycleManager;
230+
protected final LiteSubscriptionRegistry liteSubscriptionRegistry;
231+
protected final LiteEventDispatcher liteEventDispatcher;
232+
protected final LiteManagerProcessor liteManagerProcessor;
215233
protected final SendMessageProcessor sendMessageProcessor;
216234
protected final RecallMessageProcessor recallMessageProcessor;
217235
protected final ReplyMessageProcessor replyMessageProcessor;
@@ -376,18 +394,27 @@ public BrokerController(
376394
this.topicQueueMappingManager = new TopicQueueMappingManager(this);
377395
this.authenticationMetadataManager = AuthenticationFactory.getMetadataManager(this.authConfig);
378396
this.authorizationMetadataManager = AuthorizationFactory.getMetadataManager(this.authConfig);
397+
this.topicRouteInfoManager = new TopicRouteInfoManager(this);
398+
this.liteSharding = new LiteShardingImpl(this, this.topicRouteInfoManager);
399+
this.liteLifecycleManager = this.messageStoreConfig.isEnableRocksDBStore() ?
400+
new RocksDBLiteLifecycleManager(this, this.liteSharding) : new LiteLifecycleManager(this, this.liteSharding);
401+
this.liteSubscriptionRegistry = new LiteSubscriptionRegistryImpl(this, liteLifecycleManager);
402+
this.liteSubscriptionCtlProcessor = new LiteSubscriptionCtlProcessor(this, liteSubscriptionRegistry);
403+
this.liteEventDispatcher = new LiteEventDispatcher(this, this.liteSubscriptionRegistry, this.liteLifecycleManager);
404+
this.liteManagerProcessor = new LiteManagerProcessor(this, liteLifecycleManager, liteSharding);
379405
this.pullMessageProcessor = new PullMessageProcessor(this);
380406
this.peekMessageProcessor = new PeekMessageProcessor(this);
381407
this.pullRequestHoldService = messageStoreConfig.isEnableLmq() ? new LmqPullRequestHoldService(this) : new PullRequestHoldService(this);
382408
this.popMessageProcessor = new PopMessageProcessor(this);
409+
this.popLiteMessageProcessor = new PopLiteMessageProcessor(this, this.liteEventDispatcher);
383410
this.notificationProcessor = new NotificationProcessor(this);
384411
this.pollingInfoProcessor = new PollingInfoProcessor(this);
385412
this.ackMessageProcessor = new AckMessageProcessor(this);
386413
this.changeInvisibleTimeProcessor = new ChangeInvisibleTimeProcessor(this);
387414
this.sendMessageProcessor = new SendMessageProcessor(this);
388415
this.recallMessageProcessor = new RecallMessageProcessor(this);
389416
this.replyMessageProcessor = new ReplyMessageProcessor(this);
390-
this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService, this.popMessageProcessor, this.notificationProcessor);
417+
this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService, this.popMessageProcessor, this.notificationProcessor, this.liteEventDispatcher);
391418
this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
392419
this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener, this.brokerStatsManager, this.brokerConfig);
393420
this.producerManager = new ProducerManager(this.brokerStatsManager);
@@ -466,8 +493,6 @@ public boolean online(String instanceId, String group, String topic) {
466493

467494
this.escapeBridge = new EscapeBridge(this);
468495

469-
this.topicRouteInfoManager = new TopicRouteInfoManager(this);
470-
471496
if (this.brokerConfig.isEnableSlaveActingMaster() && !this.brokerConfig.isSkipPreOnline()) {
472497
this.brokerPreOnlineService = new BrokerPreOnlineService(this);
473498
}
@@ -950,6 +975,8 @@ public boolean recoverAndInitService() throws CloneNotSupportedException {
950975

951976
initialRequestPipeline();
952977

978+
initLiteService();
979+
953980
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
954981
// Register a listener to reload SslContext
955982
try {
@@ -1045,6 +1072,21 @@ public PutMessageResult executeBeforePutMessage(MessageExt msg) {
10451072
}
10461073
});
10471074

1075+
putMessageHookList.add(new PutMessageHook() {
1076+
@Override
1077+
public String hookName() {
1078+
return "handleLmqQuota";
1079+
}
1080+
1081+
@Override
1082+
public PutMessageResult executeBeforePutMessage(MessageExt msg) {
1083+
if (msg instanceof MessageExtBrokerInner) {
1084+
return HookUtils.handleLmqQuota(BrokerController.this, (MessageExtBrokerInner) msg);
1085+
}
1086+
return null;
1087+
}
1088+
});
1089+
10481090
SendMessageBackHook sendMessageBackHook = new SendMessageBackHook() {
10491091
@Override
10501092
public boolean executeSendMessageBack(List<MessageExt> msgList, String brokerName, String brokerAddr) {
@@ -1111,6 +1153,11 @@ private void initialRequestPipeline() {
11111153
}
11121154
}
11131155

1156+
private void initLiteService() {
1157+
this.liteEventDispatcher.init();
1158+
this.liteLifecycleManager.init();
1159+
}
1160+
11141161
public void registerProcessor() {
11151162
RemotingServer remotingServer = remotingServerMap.get(TCP_REMOTING_SERVER);
11161163
RemotingServer fastRemotingServer = remotingServerMap.get(FAST_REMOTING_SERVER);
@@ -1145,6 +1192,7 @@ public void registerProcessor() {
11451192
* PopMessageProcessor
11461193
*/
11471194
remotingServer.registerProcessor(RequestCode.POP_MESSAGE, this.popMessageProcessor, this.pullMessageExecutor);
1195+
remotingServer.registerProcessor(RequestCode.POP_LITE_MESSAGE, this.popLiteMessageProcessor, this.pullMessageExecutor);
11481196

11491197
/**
11501198
* AckMessageProcessor
@@ -1196,10 +1244,12 @@ public void registerProcessor() {
11961244
remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor);
11971245
remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManageProcessor, this.clientManageExecutor);
11981246
remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManageProcessor, this.clientManageExecutor);
1247+
remotingServer.registerProcessor(RequestCode.LITE_SUBSCRIPTION_CTL, liteSubscriptionCtlProcessor, this.clientManageExecutor);
11991248

12001249
fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor);
12011250
fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManageProcessor, this.clientManageExecutor);
12021251
fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManageProcessor, this.clientManageExecutor);
1252+
fastRemotingServer.registerProcessor(RequestCode.LITE_SUBSCRIPTION_CTL, liteSubscriptionCtlProcessor, this.clientManageExecutor);
12031253

12041254
/**
12051255
* ConsumerManageProcessor
@@ -1227,6 +1277,23 @@ public void registerProcessor() {
12271277
remotingServer.registerProcessor(RequestCode.END_TRANSACTION, endTransactionProcessor, this.endTransactionExecutor);
12281278
fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, endTransactionProcessor, this.endTransactionExecutor);
12291279

1280+
/*
1281+
* lite admin
1282+
*/
1283+
remotingServer.registerProcessor(RequestCode.GET_BROKER_LITE_INFO, liteManagerProcessor, adminBrokerExecutor);
1284+
remotingServer.registerProcessor(RequestCode.GET_PARENT_TOPIC_INFO, liteManagerProcessor, adminBrokerExecutor);
1285+
remotingServer.registerProcessor(RequestCode.GET_LITE_TOPIC_INFO, liteManagerProcessor, adminBrokerExecutor);
1286+
remotingServer.registerProcessor(RequestCode.GET_LITE_CLIENT_INFO, liteManagerProcessor, adminBrokerExecutor);
1287+
remotingServer.registerProcessor(RequestCode.GET_LITE_GROUP_INFO, liteManagerProcessor, adminBrokerExecutor);
1288+
remotingServer.registerProcessor(RequestCode.TRIGGER_LITE_DISPATCH, liteManagerProcessor, adminBrokerExecutor);
1289+
1290+
fastRemotingServer.registerProcessor(RequestCode.GET_BROKER_LITE_INFO, liteManagerProcessor, adminBrokerExecutor);
1291+
fastRemotingServer.registerProcessor(RequestCode.GET_PARENT_TOPIC_INFO, liteManagerProcessor, adminBrokerExecutor);
1292+
fastRemotingServer.registerProcessor(RequestCode.GET_LITE_TOPIC_INFO, liteManagerProcessor, adminBrokerExecutor);
1293+
fastRemotingServer.registerProcessor(RequestCode.GET_LITE_CLIENT_INFO, liteManagerProcessor, adminBrokerExecutor);
1294+
fastRemotingServer.registerProcessor(RequestCode.GET_LITE_GROUP_INFO, liteManagerProcessor, adminBrokerExecutor);
1295+
fastRemotingServer.registerProcessor(RequestCode.TRIGGER_LITE_DISPATCH, liteManagerProcessor, adminBrokerExecutor);
1296+
12301297
/*
12311298
* Default
12321299
*/
@@ -1409,6 +1476,10 @@ public PopMessageProcessor getPopMessageProcessor() {
14091476
return popMessageProcessor;
14101477
}
14111478

1479+
public PopLiteMessageProcessor getPopLiteMessageProcessor() {
1480+
return popLiteMessageProcessor;
1481+
}
1482+
14121483
public NotificationProcessor getNotificationProcessor() {
14131484
return notificationProcessor;
14141485
}
@@ -1437,6 +1508,14 @@ public ChangeInvisibleTimeProcessor getChangeInvisibleTimeProcessor() {
14371508
return changeInvisibleTimeProcessor;
14381509
}
14391510

1511+
public LiteSubscriptionRegistry getLiteSubscriptionRegistry() {
1512+
return liteSubscriptionRegistry;
1513+
}
1514+
1515+
public AbstractLiteLifecycleManager getLiteLifecycleManager() {
1516+
return liteLifecycleManager;
1517+
}
1518+
14401519
protected void shutdownBasicService() {
14411520

14421521
shutdown = true;
@@ -1474,6 +1553,13 @@ protected void shutdownBasicService() {
14741553
this.popMessageProcessor.getPopLongPollingService().shutdown();
14751554
}
14761555

1556+
if (this.popLiteMessageProcessor != null) {
1557+
this.popLiteMessageProcessor.stopPopLiteLockManager();
1558+
if (this.popLiteMessageProcessor.getPopLiteLongPollingService() != null) {
1559+
this.popLiteMessageProcessor.getPopLiteLongPollingService().shutdown();
1560+
}
1561+
}
1562+
14771563
if (this.popMessageProcessor.getQueueLockManager() != null) {
14781564
this.popMessageProcessor.getQueueLockManager().shutdown();
14791565
}
@@ -1637,6 +1723,18 @@ protected void shutdownBasicService() {
16371723
this.coldDataCgCtrService.shutdown();
16381724
}
16391725

1726+
if (this.liteEventDispatcher != null) {
1727+
this.liteEventDispatcher.shutdown();
1728+
}
1729+
1730+
if (this.liteLifecycleManager != null) {
1731+
this.liteLifecycleManager.shutdown();
1732+
}
1733+
1734+
if (this.liteSubscriptionRegistry != null) {
1735+
this.liteSubscriptionRegistry.shutdown();
1736+
}
1737+
16401738
shutdownScheduledExecutorService(this.syncBrokerMemberGroupExecutorService);
16411739
shutdownScheduledExecutorService(this.brokerHeartbeatExecutorService);
16421740

@@ -1777,6 +1875,13 @@ protected void startBasicService() throws Exception {
17771875
this.popMessageProcessor.getQueueLockManager().start();
17781876
}
17791877

1878+
if (this.popLiteMessageProcessor != null) {
1879+
this.popLiteMessageProcessor.startPopLiteLockManager();
1880+
if (this.popLiteMessageProcessor.getPopLiteLongPollingService() != null) {
1881+
this.popLiteMessageProcessor.getPopLiteLongPollingService().start();
1882+
}
1883+
}
1884+
17801885
if (this.ackMessageProcessor != null) {
17811886
if (brokerConfig.isPopConsumerFSServiceInit()) {
17821887
this.ackMessageProcessor.startPopReviveService();
@@ -1838,6 +1943,18 @@ protected void startBasicService() throws Exception {
18381943
if (this.coldDataCgCtrService != null) {
18391944
this.coldDataCgCtrService.start();
18401945
}
1946+
1947+
if (this.liteEventDispatcher != null) {
1948+
this.liteEventDispatcher.start();
1949+
}
1950+
1951+
if (this.liteLifecycleManager != null) {
1952+
this.liteLifecycleManager.start();
1953+
}
1954+
1955+
if (this.liteSubscriptionRegistry != null) {
1956+
this.liteSubscriptionRegistry.start();
1957+
}
18411958
}
18421959

18431960
public void start() throws Exception {
@@ -2699,4 +2816,8 @@ public ConfigContext getConfigContext() {
26992816
public void setConfigContext(ConfigContext configContext) {
27002817
this.configContext = configContext;
27012818
}
2819+
2820+
public LiteEventDispatcher getLiteEventDispatcher() {
2821+
return liteEventDispatcher;
2822+
}
27022823
}

broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.rocketmq.remoting.protocol.header.CheckTransactionStateRequestHeader;
5050
import org.apache.rocketmq.remoting.protocol.header.GetConsumerStatusRequestHeader;
5151
import org.apache.rocketmq.remoting.protocol.header.NotifyConsumerIdsChangedRequestHeader;
52+
import org.apache.rocketmq.remoting.protocol.header.NotifyUnsubscribeLiteRequestHeader;
5253
import org.apache.rocketmq.remoting.protocol.header.ResetOffsetRequestHeader;
5354
import org.apache.rocketmq.store.exception.ConsumeQueueException;
5455

@@ -60,6 +61,16 @@ public Broker2Client(BrokerController brokerController) {
6061
this.brokerController = brokerController;
6162
}
6263

64+
public void notifyUnsubscribeLite(Channel channel, NotifyUnsubscribeLiteRequestHeader requestHeader) {
65+
RemotingCommand request =
66+
RemotingCommand.createRequestCommand(RequestCode.NOTIFY_UNSUBSCRIBE_LITE, requestHeader);
67+
try {
68+
this.brokerController.getRemotingServer().invokeOneway(channel, request, 100);
69+
} catch (Exception e) {
70+
log.error("notifyUnsubscribeLite failed. header={}, error={}", requestHeader, e.toString());
71+
}
72+
}
73+
6374
public void checkProducerTransactionState(
6475
final String group,
6576
final Channel channel,

0 commit comments

Comments
 (0)