Skip to content

Commit 93cf16d

Browse files
committed
fix
Change-Id: I645edeaf6b19617b6fafdd3e88d69bea3d12abd6
1 parent 9399d7e commit 93cf16d

2 files changed

Lines changed: 90 additions & 56 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java

Lines changed: 79 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,13 @@
4646
import org.apache.rocketmq.remoting.protocol.header.NotificationResponseHeader;
4747
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
4848
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
49+
import org.apache.rocketmq.store.ConsumeQueue;
50+
import org.apache.rocketmq.store.MessageFilter;
4951
import org.apache.rocketmq.store.exception.ConsumeQueueException;
52+
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
53+
import org.apache.rocketmq.store.queue.CqUnit;
54+
import org.apache.rocketmq.store.queue.ReferredIterator;
55+
import org.rocksdb.RocksDBException;
5056

5157
public class NotificationProcessor implements NettyRequestProcessor {
5258
private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
@@ -142,67 +148,58 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx,
142148
int randomQ = random.nextInt(100);
143149
boolean hasMsg = false;
144150
BrokerConfig brokerConfig = brokerController.getBrokerConfig();
151+
152+
SubscriptionData subscriptionData = null;
153+
ExpressionMessageFilter messageFilter = null;
154+
if (brokerConfig.isUseMessageFilterForNotification() && requestHeader.getExp() != null && !requestHeader.getExp().isEmpty()) {
155+
try {
156+
// origin topic
157+
subscriptionData = FilterAPI.build(
158+
requestHeader.getTopic(), requestHeader.getExp(), requestHeader.getExpType());
159+
160+
ConsumerFilterData consumerFilterData = null;
161+
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
162+
consumerFilterData = ConsumerFilterManager.build(
163+
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getExp(),
164+
requestHeader.getExpType(), System.currentTimeMillis());
165+
if (consumerFilterData == null) {
166+
POP_LOGGER.warn("Parse the consumer's subscription[{}] failed, group: {}",
167+
requestHeader.getExp(), requestHeader.getConsumerGroup());
168+
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
169+
response.setRemark("parse the consumer's subscription failed");
170+
return response;
171+
}
172+
}
173+
messageFilter = new ExpressionMessageFilter(
174+
subscriptionData, consumerFilterData, brokerController.getConsumerFilterManager());
175+
} catch (Exception e) {
176+
POP_LOGGER.warn("Parse the consumer's subscription[{}] error, group: {}", requestHeader.getExp(),
177+
requestHeader.getConsumerGroup());
178+
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
179+
response.setRemark("parse the consumer's subscription failed");
180+
return response;
181+
}
182+
}
183+
145184
if (requestHeader.getQueueId() < 0) {
146185
// read all queue
147-
hasMsg = hasMsgFromTopic(topicConfig, randomQ, requestHeader);
186+
hasMsg = hasMsgFromTopic(topicConfig, randomQ, requestHeader, subscriptionData, messageFilter);
148187
} else {
149188
int queueId = requestHeader.getQueueId();
150-
hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), requestHeader, queueId);
189+
hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), requestHeader, queueId, subscriptionData, messageFilter);
151190
}
152191
// if it doesn't have message, fetch retry
153192
if (!hasMsg) {
154193
String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2());
155-
hasMsg = hasMsgFromTopic(retryTopic, randomQ, requestHeader);
194+
hasMsg = hasMsgFromTopic(retryTopic, randomQ, requestHeader, null, null);
156195
if (!hasMsg && brokerConfig.isEnableRetryTopicV2() && brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
157196
String retryTopicConfigV1 = KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup());
158-
hasMsg = hasMsgFromTopic(retryTopicConfigV1, randomQ, requestHeader);
197+
hasMsg = hasMsgFromTopic(retryTopicConfigV1, randomQ, requestHeader, null, null);
159198
}
160199
}
161200

162201
if (!hasMsg) {
163-
SubscriptionData subscriptionData = null;
164-
ExpressionMessageFilter messageFilter = null;
165-
if (requestHeader.getExp() != null && !requestHeader.getExp().isEmpty()) {
166-
try {
167-
// origin topic
168-
subscriptionData = FilterAPI.build(
169-
requestHeader.getTopic(), requestHeader.getExp(), requestHeader.getExpType());
170-
171-
ConsumerFilterData consumerFilterData = null;
172-
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
173-
consumerFilterData = ConsumerFilterManager.build(
174-
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getExp(),
175-
requestHeader.getExpType(), System.currentTimeMillis());
176-
if (consumerFilterData == null) {
177-
POP_LOGGER.warn("Parse the consumer's subscription[{}] failed, group: {}",
178-
requestHeader.getExp(), requestHeader.getConsumerGroup());
179-
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
180-
response.setRemark("parse the consumer's subscription failed");
181-
return response;
182-
}
183-
}
184-
messageFilter = new ExpressionMessageFilter(
185-
subscriptionData, consumerFilterData, brokerController.getConsumerFilterManager());
186-
} catch (Exception e) {
187-
POP_LOGGER.warn("Parse the consumer's subscription[{}] error, group: {}", requestHeader.getExp(),
188-
requestHeader.getConsumerGroup());
189-
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
190-
response.setRemark("parse the consumer's subscription failed");
191-
return response;
192-
}
193-
} else {
194-
try {
195-
// origin topic
196-
subscriptionData = FilterAPI.build(requestHeader.getTopic(), "*", ExpressionType.TAG);
197-
} catch (Exception e) {
198-
POP_LOGGER.warn("Build default subscription error, group: {}", requestHeader.getConsumerGroup());
199-
}
200-
}
201-
202-
ExpressionMessageFilter finalMessageFilter = messageFilter;
203-
SubscriptionData finalSubscriptionData = subscriptionData;
204-
205-
PollingResult pollingResult = popLongPollingService.polling(ctx, request, new PollingHeader(requestHeader), finalSubscriptionData, finalMessageFilter);
202+
PollingResult pollingResult = popLongPollingService.polling(ctx, request, new PollingHeader(requestHeader), subscriptionData, messageFilter);
206203
if (pollingResult == PollingResult.POLLING_SUC) {
207204
return null;
208205
} else if (pollingResult == PollingResult.POLLING_FULL) {
@@ -214,19 +211,19 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx,
214211
return response;
215212
}
216213

217-
private boolean hasMsgFromTopic(String topicName, int randomQ, NotificationRequestHeader requestHeader)
214+
private boolean hasMsgFromTopic(String topicName, int randomQ, NotificationRequestHeader requestHeader, SubscriptionData subscriptionData, MessageFilter messageFilter)
218215
throws RemotingCommandException {
219216
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topicName);
220-
return hasMsgFromTopic(topicConfig, randomQ, requestHeader);
217+
return hasMsgFromTopic(topicConfig, randomQ, requestHeader, subscriptionData, messageFilter);
221218
}
222219

223-
private boolean hasMsgFromTopic(TopicConfig topicConfig, int randomQ, NotificationRequestHeader requestHeader)
220+
private boolean hasMsgFromTopic(TopicConfig topicConfig, int randomQ, NotificationRequestHeader requestHeader, SubscriptionData subscriptionData, MessageFilter messageFilter)
224221
throws RemotingCommandException {
225222
boolean hasMsg;
226223
if (topicConfig != null) {
227224
for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
228225
int queueId = (randomQ + i) % topicConfig.getReadQueueNums();
229-
hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), requestHeader, queueId);
226+
hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), requestHeader, queueId, subscriptionData, messageFilter);
230227
if (hasMsg) {
231228
return true;
232229
}
@@ -235,7 +232,7 @@ private boolean hasMsgFromTopic(TopicConfig topicConfig, int randomQ, Notificati
235232
return false;
236233
}
237234

238-
private boolean hasMsgFromQueue(String targetTopic, NotificationRequestHeader requestHeader, int queueId) throws RemotingCommandException {
235+
private boolean hasMsgFromQueue(String targetTopic, NotificationRequestHeader requestHeader, int queueId, SubscriptionData subscriptionData, MessageFilter messageFilter) throws RemotingCommandException {
239236
if (Boolean.TRUE.equals(requestHeader.getOrder())) {
240237
if (this.brokerController.getConsumerOrderInfoManager().checkBlock(requestHeader.getAttemptId(), requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId, 0)) {
241238
return false;
@@ -244,9 +241,37 @@ private boolean hasMsgFromQueue(String targetTopic, NotificationRequestHeader re
244241
long offset = getPopOffset(targetTopic, requestHeader.getConsumerGroup(), queueId);
245242
try {
246243
long restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(targetTopic, queueId) - offset;
244+
int maxFilterMessageNum = brokerController.getMessageStoreConfig().getMaxFilterMessageSize() / ConsumeQueue.CQ_STORE_UNIT_SIZE;
245+
boolean needFilter = restNum < maxFilterMessageNum &&
246+
subscriptionData != null &&
247+
messageFilter != null &&
248+
ExpressionType.isTagType(subscriptionData.getExpressionType());
249+
if (needFilter) {
250+
ConsumeQueueInterface queue = this.brokerController.getMessageStore().getConsumeQueue(targetTopic, queueId);
251+
// If the ConsumeQueue doesn't exist, it's not readable.
252+
if (queue == null) {
253+
return false;
254+
}
255+
ReferredIterator<CqUnit> iterator = null;
256+
try {
257+
iterator = queue.iterateFrom(offset, maxFilterMessageNum);
258+
if (iterator != null) {
259+
while (iterator.hasNext()) {
260+
CqUnit cqUnit = iterator.next();
261+
if (messageFilter.isMatchedByConsumeQueue(cqUnit.getValidTagsCodeAsLong(), cqUnit.getCqExtUnit())) {
262+
return true;
263+
}
264+
}
265+
}
266+
} finally {
267+
if (iterator != null) {
268+
iterator.release();
269+
}
270+
}
271+
}
247272
return restNum > 0;
248-
} catch (ConsumeQueueException e) {
249-
throw new RemotingCommandException("Failed tp get max offset in queue", e);
273+
} catch (ConsumeQueueException | RocksDBException e) {
274+
throw new RemotingCommandException("Failed to get max offset in queue or iterate in queue", e);
250275
}
251276
}
252277

common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.rocketmq.common;
1818

19+
import java.util.concurrent.TimeUnit;
1920
import org.apache.rocketmq.common.annotation.ImportantField;
2021
import org.apache.rocketmq.common.config.ConfigManagerVersion;
2122
import org.apache.rocketmq.common.constant.PermName;
@@ -24,8 +25,6 @@
2425
import org.apache.rocketmq.common.topic.TopicValidator;
2526
import org.apache.rocketmq.common.utils.NetworkUtil;
2627

27-
import java.util.concurrent.TimeUnit;
28-
2928
public class BrokerConfig extends BrokerIdentity {
3029

3130
private String brokerConfigPath = null;
@@ -256,6 +255,8 @@ public class BrokerConfig extends BrokerIdentity {
256255
private boolean useSeparateRetryQueue = false;
257256
private boolean realTimeNotifyConsumerChange = true;
258257

258+
private boolean useMessageFilterForNotification = true;
259+
259260
private boolean litePullMessageEnable = true;
260261

261262
// The period to sync broker member group from namesrv, default value is 1 second
@@ -2407,4 +2408,12 @@ public int getLiteLagLatencyTopK() {
24072408
public void setLiteLagLatencyTopK(int liteLagLatencyTopK) {
24082409
this.liteLagLatencyTopK = liteLagLatencyTopK;
24092410
}
2411+
2412+
public boolean isUseMessageFilterForNotification() {
2413+
return useMessageFilterForNotification;
2414+
}
2415+
2416+
public void setUseMessageFilterForNotification(boolean useMessageFilterForNotification) {
2417+
this.useMessageFilterForNotification = useMessageFilterForNotification;
2418+
}
24102419
}

0 commit comments

Comments
 (0)