Skip to content

Commit 7fe0349

Browse files
authored
[ISSUE #10063] Notification request adds subscription expression to support on-demand wake-up (#10064)
1 parent a6c5604 commit 7fe0349

4 files changed

Lines changed: 137 additions & 14 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java

Lines changed: 90 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@
2020
import io.netty.channel.ChannelHandlerContext;
2121
import java.util.Map;
2222
import java.util.Random;
23+
import org.apache.commons.lang3.StringUtils;
2324
import org.apache.rocketmq.broker.BrokerController;
25+
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
26+
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
27+
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
2428
import org.apache.rocketmq.broker.longpolling.PollingHeader;
2529
import org.apache.rocketmq.broker.longpolling.PollingResult;
2630
import org.apache.rocketmq.broker.longpolling.PopLongPollingService;
@@ -29,6 +33,7 @@
2933
import org.apache.rocketmq.common.TopicConfig;
3034
import org.apache.rocketmq.common.constant.LoggerName;
3135
import org.apache.rocketmq.common.constant.PermName;
36+
import org.apache.rocketmq.common.filter.ExpressionType;
3237
import org.apache.rocketmq.common.help.FAQUrl;
3338
import org.apache.rocketmq.logging.org.slf4j.Logger;
3439
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -37,10 +42,17 @@
3742
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
3843
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
3944
import org.apache.rocketmq.remoting.protocol.ResponseCode;
45+
import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
4046
import org.apache.rocketmq.remoting.protocol.header.NotificationRequestHeader;
4147
import org.apache.rocketmq.remoting.protocol.header.NotificationResponseHeader;
48+
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
4249
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
50+
import org.apache.rocketmq.store.MessageFilter;
4351
import org.apache.rocketmq.store.exception.ConsumeQueueException;
52+
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
53+
import org.apache.rocketmq.store.queue.CqUnit;
54+
import org.apache.rocketmq.store.queue.ReferredIterator;
55+
import org.rocksdb.RocksDBException;
4456

4557
public class NotificationProcessor implements NettyRequestProcessor {
4658
private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
@@ -136,25 +148,60 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx,
136148
int randomQ = random.nextInt(100);
137149
boolean hasMsg = false;
138150
BrokerConfig brokerConfig = brokerController.getBrokerConfig();
151+
152+
SubscriptionData subscriptionData = null;
153+
ExpressionMessageFilter messageFilter = null;
154+
if (brokerConfig.isUseMessageFilterForNotification() &&
155+
StringUtils.isNotEmpty(requestHeader.getExpType()) &&
156+
StringUtils.isNotEmpty(requestHeader.getExp())) {
157+
try {
158+
// origin topic
159+
subscriptionData = FilterAPI.build(
160+
requestHeader.getTopic(), requestHeader.getExp(), requestHeader.getExpType());
161+
162+
ConsumerFilterData consumerFilterData = null;
163+
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
164+
consumerFilterData = ConsumerFilterManager.build(
165+
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getExp(),
166+
requestHeader.getExpType(), System.currentTimeMillis());
167+
if (consumerFilterData == null) {
168+
POP_LOGGER.warn("Parse the consumer's subscription[{}] failed, group: {}",
169+
requestHeader.getExp(), requestHeader.getConsumerGroup());
170+
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
171+
response.setRemark("parse the consumer's subscription failed");
172+
return response;
173+
}
174+
}
175+
messageFilter = new ExpressionMessageFilter(
176+
subscriptionData, consumerFilterData, brokerController.getConsumerFilterManager());
177+
} catch (Exception e) {
178+
POP_LOGGER.warn("Parse the consumer's subscription[{}] error, group: {}", requestHeader.getExp(),
179+
requestHeader.getConsumerGroup());
180+
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
181+
response.setRemark("parse the consumer's subscription failed");
182+
return response;
183+
}
184+
}
185+
139186
if (requestHeader.getQueueId() < 0) {
140187
// read all queue
141-
hasMsg = hasMsgFromTopic(topicConfig, randomQ, requestHeader);
188+
hasMsg = hasMsgFromTopic(topicConfig, randomQ, requestHeader, subscriptionData, messageFilter);
142189
} else {
143190
int queueId = requestHeader.getQueueId();
144-
hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), requestHeader, queueId);
191+
hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), requestHeader, queueId, subscriptionData, messageFilter);
145192
}
146193
// if it doesn't have message, fetch retry
147194
if (!hasMsg) {
148195
String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2());
149-
hasMsg = hasMsgFromTopic(retryTopic, randomQ, requestHeader);
196+
hasMsg = hasMsgFromTopic(retryTopic, randomQ, requestHeader, null, null);
150197
if (!hasMsg && brokerConfig.isEnableRetryTopicV2() && brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
151198
String retryTopicConfigV1 = KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup());
152-
hasMsg = hasMsgFromTopic(retryTopicConfigV1, randomQ, requestHeader);
199+
hasMsg = hasMsgFromTopic(retryTopicConfigV1, randomQ, requestHeader, null, null);
153200
}
154201
}
155202

156203
if (!hasMsg) {
157-
PollingResult pollingResult = popLongPollingService.polling(ctx, request, new PollingHeader(requestHeader));
204+
PollingResult pollingResult = popLongPollingService.polling(ctx, request, new PollingHeader(requestHeader), subscriptionData, messageFilter);
158205
if (pollingResult == PollingResult.POLLING_SUC) {
159206
return null;
160207
} else if (pollingResult == PollingResult.POLLING_FULL) {
@@ -166,19 +213,19 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx,
166213
return response;
167214
}
168215

169-
private boolean hasMsgFromTopic(String topicName, int randomQ, NotificationRequestHeader requestHeader)
216+
private boolean hasMsgFromTopic(String topicName, int randomQ, NotificationRequestHeader requestHeader, SubscriptionData subscriptionData, MessageFilter messageFilter)
170217
throws RemotingCommandException {
171218
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topicName);
172-
return hasMsgFromTopic(topicConfig, randomQ, requestHeader);
219+
return hasMsgFromTopic(topicConfig, randomQ, requestHeader, subscriptionData, messageFilter);
173220
}
174221

175-
private boolean hasMsgFromTopic(TopicConfig topicConfig, int randomQ, NotificationRequestHeader requestHeader)
222+
private boolean hasMsgFromTopic(TopicConfig topicConfig, int randomQ, NotificationRequestHeader requestHeader, SubscriptionData subscriptionData, MessageFilter messageFilter)
176223
throws RemotingCommandException {
177224
boolean hasMsg;
178225
if (topicConfig != null) {
179226
for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
180227
int queueId = (randomQ + i) % topicConfig.getReadQueueNums();
181-
hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), requestHeader, queueId);
228+
hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), requestHeader, queueId, subscriptionData, messageFilter);
182229
if (hasMsg) {
183230
return true;
184231
}
@@ -187,7 +234,7 @@ private boolean hasMsgFromTopic(TopicConfig topicConfig, int randomQ, Notificati
187234
return false;
188235
}
189236

190-
private boolean hasMsgFromQueue(String targetTopic, NotificationRequestHeader requestHeader, int queueId) throws RemotingCommandException {
237+
private boolean hasMsgFromQueue(String targetTopic, NotificationRequestHeader requestHeader, int queueId, SubscriptionData subscriptionData, MessageFilter messageFilter) throws RemotingCommandException {
191238
if (Boolean.TRUE.equals(requestHeader.getOrder())) {
192239
if (this.brokerController.getConsumerOrderInfoManager().checkBlock(requestHeader.getAttemptId(), requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId, 0)) {
193240
return false;
@@ -196,9 +243,40 @@ private boolean hasMsgFromQueue(String targetTopic, NotificationRequestHeader re
196243
long offset = getPopOffset(targetTopic, requestHeader.getConsumerGroup(), queueId);
197244
try {
198245
long restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(targetTopic, queueId) - offset;
246+
int maxFilterMessageNum = this.brokerController.getBrokerConfig().getMaxMessageFilterNumForNotification();
247+
boolean needFilter = restNum < maxFilterMessageNum &&
248+
subscriptionData != null &&
249+
messageFilter != null &&
250+
ExpressionType.isTagType(subscriptionData.getExpressionType());
251+
if (needFilter) {
252+
ConsumeQueueInterface queue = this.brokerController.getMessageStore().getConsumeQueue(targetTopic, queueId);
253+
// If the ConsumeQueue doesn't exist, it's not readable.
254+
if (queue == null) {
255+
return false;
256+
}
257+
ReferredIterator<CqUnit> iterator = null;
258+
try {
259+
// In order to take into account both the file CQ and the Rocksdb CQ,
260+
// the count passed here is 32.
261+
iterator = queue.iterateFrom(offset, 32);
262+
if (iterator != null) {
263+
while (iterator.hasNext()) {
264+
CqUnit cqUnit = iterator.next();
265+
if (messageFilter.isMatchedByConsumeQueue(cqUnit.getValidTagsCodeAsLong(), cqUnit.getCqExtUnit())) {
266+
return true;
267+
}
268+
}
269+
return false;
270+
}
271+
} finally {
272+
if (iterator != null) {
273+
iterator.release();
274+
}
275+
}
276+
}
199277
return restNum > 0;
200-
} catch (ConsumeQueueException e) {
201-
throw new RemotingCommandException("Failed tp get max offset in queue", e);
278+
} catch (ConsumeQueueException | RocksDBException e) {
279+
throw new RemotingCommandException("Failed to get max offset in queue or iterate in queue", e);
202280
}
203281
}
204282

common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.rocketmq.common;
1818

19+
import java.util.concurrent.TimeUnit;
1920
import org.apache.rocketmq.common.annotation.ImportantField;
2021
import org.apache.rocketmq.common.config.ConfigManagerVersion;
2122
import org.apache.rocketmq.common.constant.PermName;
@@ -24,8 +25,6 @@
2425
import org.apache.rocketmq.common.topic.TopicValidator;
2526
import org.apache.rocketmq.common.utils.NetworkUtil;
2627

27-
import java.util.concurrent.TimeUnit;
28-
2928
public class BrokerConfig extends BrokerIdentity {
3029

3130
private String brokerConfigPath = null;
@@ -256,6 +255,9 @@ public class BrokerConfig extends BrokerIdentity {
256255
private boolean useSeparateRetryQueue = false;
257256
private boolean realTimeNotifyConsumerChange = true;
258257

258+
private boolean useMessageFilterForNotification = true;
259+
private int maxMessageFilterNumForNotification = 64;
260+
259261
private boolean litePullMessageEnable = true;
260262

261263
// The period to sync broker member group from namesrv, default value is 1 second
@@ -2407,4 +2409,20 @@ public int getLiteLagLatencyTopK() {
24072409
public void setLiteLagLatencyTopK(int liteLagLatencyTopK) {
24082410
this.liteLagLatencyTopK = liteLagLatencyTopK;
24092411
}
2412+
2413+
public boolean isUseMessageFilterForNotification() {
2414+
return useMessageFilterForNotification;
2415+
}
2416+
2417+
public void setUseMessageFilterForNotification(boolean useMessageFilterForNotification) {
2418+
this.useMessageFilterForNotification = useMessageFilterForNotification;
2419+
}
2420+
2421+
public int getMaxMessageFilterNumForNotification() {
2422+
return maxMessageFilterNumForNotification;
2423+
}
2424+
2425+
public void setMaxMessageFilterNumForNotification(int maxMessageFilterNumForNotification) {
2426+
this.maxMessageFilterNumForNotification = maxMessageFilterNumForNotification;
2427+
}
24102428
}

remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ public class NotificationRequestHeader extends TopicQueueRequestHeader {
4444
private Boolean order = Boolean.FALSE;
4545
private String attemptId;
4646

47+
private String expType;
48+
private String exp;
49+
4750
@CFNotNull
4851
@Override
4952
public void checkFields() throws RemotingCommandException {
@@ -108,6 +111,22 @@ public void setAttemptId(String attemptId) {
108111
this.attemptId = attemptId;
109112
}
110113

114+
public String getExpType() {
115+
return expType;
116+
}
117+
118+
public void setExpType(String expType) {
119+
this.expType = expType;
120+
}
121+
122+
public String getExp() {
123+
return exp;
124+
}
125+
126+
public void setExp(String exp) {
127+
this.exp = exp;
128+
}
129+
111130
@Override
112131
public String toString() {
113132
return MoreObjects.toStringHelper(this)

test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,12 @@ public CompletableFuture<Boolean> notification(String brokerAddr, String topic,
199199

200200
public CompletableFuture<Boolean> notification(String brokerAddr, String topic,
201201
String consumerGroup, int queueId, Boolean order, String attemptId, long pollTime, long bornTime, long timeoutMillis) {
202+
return notification(brokerAddr, topic, consumerGroup, queueId, order, attemptId, pollTime, bornTime, timeoutMillis, null, null);
203+
}
204+
205+
206+
public CompletableFuture<Boolean> notification(String brokerAddr, String topic,
207+
String consumerGroup, int queueId, Boolean order, String attemptId, long pollTime, long bornTime, long timeoutMillis, String expType, String exp) {
202208
NotificationRequestHeader requestHeader = new NotificationRequestHeader();
203209
requestHeader.setConsumerGroup(consumerGroup);
204210
requestHeader.setTopic(topic);
@@ -207,6 +213,8 @@ public CompletableFuture<Boolean> notification(String brokerAddr, String topic,
207213
requestHeader.setBornTime(bornTime);
208214
requestHeader.setOrder(order);
209215
requestHeader.setAttemptId(attemptId);
216+
requestHeader.setExpType(expType);
217+
requestHeader.setExp(exp);
210218
return this.mqClientAPI.notification(brokerAddr, requestHeader, timeoutMillis);
211219
}
212220
}

0 commit comments

Comments
 (0)