From 9399d7e94a297bbb7dc40a4fb0c54b805e4ebc7f Mon Sep 17 00:00:00 2001 From: qianye Date: Mon, 2 Feb 2026 15:29:32 +0800 Subject: [PATCH 1/5] fix Change-Id: I309419c125475aec5ceeac2ce95f47cb9f45693d --- .../processor/NotificationProcessor.java | 50 ++++++++++++++++++- .../header/NotificationRequestHeader.java | 19 +++++++ .../test/client/rmq/RMQPopClient.java | 8 +++ 3 files changed, 76 insertions(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java index 4563132fe48..4df7480debb 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java @@ -21,6 +21,9 @@ import java.util.Map; import java.util.Random; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.filter.ConsumerFilterData; +import org.apache.rocketmq.broker.filter.ConsumerFilterManager; +import org.apache.rocketmq.broker.filter.ExpressionMessageFilter; import org.apache.rocketmq.broker.longpolling.PollingHeader; import org.apache.rocketmq.broker.longpolling.PollingResult; import org.apache.rocketmq.broker.longpolling.PopLongPollingService; @@ -29,6 +32,7 @@ import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.PermName; +import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.help.FAQUrl; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; @@ -37,8 +41,10 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.filter.FilterAPI; import org.apache.rocketmq.remoting.protocol.header.NotificationRequestHeader; import org.apache.rocketmq.remoting.protocol.header.NotificationResponseHeader; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.store.exception.ConsumeQueueException; @@ -154,7 +160,49 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, } if (!hasMsg) { - PollingResult pollingResult = popLongPollingService.polling(ctx, request, new PollingHeader(requestHeader)); + SubscriptionData subscriptionData = null; + ExpressionMessageFilter messageFilter = null; + if (requestHeader.getExp() != null && !requestHeader.getExp().isEmpty()) { + try { + // origin topic + subscriptionData = FilterAPI.build( + requestHeader.getTopic(), requestHeader.getExp(), requestHeader.getExpType()); + + ConsumerFilterData consumerFilterData = null; + if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) { + consumerFilterData = ConsumerFilterManager.build( + requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getExp(), + requestHeader.getExpType(), System.currentTimeMillis()); + if (consumerFilterData == null) { + POP_LOGGER.warn("Parse the consumer's subscription[{}] failed, group: {}", + requestHeader.getExp(), requestHeader.getConsumerGroup()); + response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED); + response.setRemark("parse the consumer's subscription failed"); + return response; + } + } + messageFilter = new ExpressionMessageFilter( + subscriptionData, consumerFilterData, brokerController.getConsumerFilterManager()); + } catch (Exception e) { + POP_LOGGER.warn("Parse the consumer's subscription[{}] error, group: {}", requestHeader.getExp(), + requestHeader.getConsumerGroup()); + response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED); + response.setRemark("parse the consumer's subscription failed"); + return response; + } + } else { + try { + // origin topic + subscriptionData = FilterAPI.build(requestHeader.getTopic(), "*", ExpressionType.TAG); + } catch (Exception e) { + POP_LOGGER.warn("Build default subscription error, group: {}", requestHeader.getConsumerGroup()); + } + } + + ExpressionMessageFilter finalMessageFilter = messageFilter; + SubscriptionData finalSubscriptionData = subscriptionData; + + PollingResult pollingResult = popLongPollingService.polling(ctx, request, new PollingHeader(requestHeader), finalSubscriptionData, finalMessageFilter); if (pollingResult == PollingResult.POLLING_SUC) { return null; } else if (pollingResult == PollingResult.POLLING_FULL) { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java index 0e484f82c0d..46c5930c1d1 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java @@ -44,6 +44,9 @@ public class NotificationRequestHeader extends TopicQueueRequestHeader { private Boolean order = Boolean.FALSE; private String attemptId; + private String expType; + private String exp; + @CFNotNull @Override public void checkFields() throws RemotingCommandException { @@ -108,6 +111,22 @@ public void setAttemptId(String attemptId) { this.attemptId = attemptId; } + public String getExpType() { + return expType; + } + + public void setExpType(String expType) { + this.expType = expType; + } + + public String getExp() { + return exp; + } + + public void setExp(String exp) { + this.exp = exp; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java index 09c60c0b45f..c45a26c59d0 100644 --- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java +++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java @@ -199,6 +199,12 @@ public CompletableFuture notification(String brokerAddr, String topic, public CompletableFuture notification(String brokerAddr, String topic, String consumerGroup, int queueId, Boolean order, String attemptId, long pollTime, long bornTime, long timeoutMillis) { + return notification(brokerAddr, topic, consumerGroup, queueId, order, attemptId, pollTime, bornTime, timeoutMillis, null, null); + } + + + public CompletableFuture notification(String brokerAddr, String topic, + String consumerGroup, int queueId, Boolean order, String attemptId, long pollTime, long bornTime, long timeoutMillis, String expType, String exp) { NotificationRequestHeader requestHeader = new NotificationRequestHeader(); requestHeader.setConsumerGroup(consumerGroup); requestHeader.setTopic(topic); @@ -207,6 +213,8 @@ public CompletableFuture notification(String brokerAddr, String topic, requestHeader.setBornTime(bornTime); requestHeader.setOrder(order); requestHeader.setAttemptId(attemptId); + requestHeader.setExpType(expType); + requestHeader.setExp(exp); return this.mqClientAPI.notification(brokerAddr, requestHeader, timeoutMillis); } } From 93cf16dd6470e2f2fb115536e9be6b4d6976a607 Mon Sep 17 00:00:00 2001 From: qianye Date: Mon, 2 Feb 2026 17:56:57 +0800 Subject: [PATCH 2/5] fix Change-Id: I645edeaf6b19617b6fafdd3e88d69bea3d12abd6 --- .../processor/NotificationProcessor.java | 133 +++++++++++------- .../apache/rocketmq/common/BrokerConfig.java | 13 +- 2 files changed, 90 insertions(+), 56 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java index 4df7480debb..55cd6ade15c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java @@ -46,7 +46,13 @@ import org.apache.rocketmq.remoting.protocol.header.NotificationResponseHeader; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.store.ConsumeQueue; +import org.apache.rocketmq.store.MessageFilter; import org.apache.rocketmq.store.exception.ConsumeQueueException; +import org.apache.rocketmq.store.queue.ConsumeQueueInterface; +import org.apache.rocketmq.store.queue.CqUnit; +import org.apache.rocketmq.store.queue.ReferredIterator; +import org.rocksdb.RocksDBException; public class NotificationProcessor implements NettyRequestProcessor { private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); @@ -142,67 +148,58 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, int randomQ = random.nextInt(100); boolean hasMsg = false; BrokerConfig brokerConfig = brokerController.getBrokerConfig(); + + SubscriptionData subscriptionData = null; + ExpressionMessageFilter messageFilter = null; + if (brokerConfig.isUseMessageFilterForNotification() && requestHeader.getExp() != null && !requestHeader.getExp().isEmpty()) { + try { + // origin topic + subscriptionData = FilterAPI.build( + requestHeader.getTopic(), requestHeader.getExp(), requestHeader.getExpType()); + + ConsumerFilterData consumerFilterData = null; + if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) { + consumerFilterData = ConsumerFilterManager.build( + requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getExp(), + requestHeader.getExpType(), System.currentTimeMillis()); + if (consumerFilterData == null) { + POP_LOGGER.warn("Parse the consumer's subscription[{}] failed, group: {}", + requestHeader.getExp(), requestHeader.getConsumerGroup()); + response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED); + response.setRemark("parse the consumer's subscription failed"); + return response; + } + } + messageFilter = new ExpressionMessageFilter( + subscriptionData, consumerFilterData, brokerController.getConsumerFilterManager()); + } catch (Exception e) { + POP_LOGGER.warn("Parse the consumer's subscription[{}] error, group: {}", requestHeader.getExp(), + requestHeader.getConsumerGroup()); + response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED); + response.setRemark("parse the consumer's subscription failed"); + return response; + } + } + if (requestHeader.getQueueId() < 0) { // read all queue - hasMsg = hasMsgFromTopic(topicConfig, randomQ, requestHeader); + hasMsg = hasMsgFromTopic(topicConfig, randomQ, requestHeader, subscriptionData, messageFilter); } else { int queueId = requestHeader.getQueueId(); - hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), requestHeader, queueId); + hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), requestHeader, queueId, subscriptionData, messageFilter); } // if it doesn't have message, fetch retry if (!hasMsg) { String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2()); - hasMsg = hasMsgFromTopic(retryTopic, randomQ, requestHeader); + hasMsg = hasMsgFromTopic(retryTopic, randomQ, requestHeader, null, null); if (!hasMsg && brokerConfig.isEnableRetryTopicV2() && brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) { String retryTopicConfigV1 = KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup()); - hasMsg = hasMsgFromTopic(retryTopicConfigV1, randomQ, requestHeader); + hasMsg = hasMsgFromTopic(retryTopicConfigV1, randomQ, requestHeader, null, null); } } if (!hasMsg) { - SubscriptionData subscriptionData = null; - ExpressionMessageFilter messageFilter = null; - if (requestHeader.getExp() != null && !requestHeader.getExp().isEmpty()) { - try { - // origin topic - subscriptionData = FilterAPI.build( - requestHeader.getTopic(), requestHeader.getExp(), requestHeader.getExpType()); - - ConsumerFilterData consumerFilterData = null; - if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) { - consumerFilterData = ConsumerFilterManager.build( - requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getExp(), - requestHeader.getExpType(), System.currentTimeMillis()); - if (consumerFilterData == null) { - POP_LOGGER.warn("Parse the consumer's subscription[{}] failed, group: {}", - requestHeader.getExp(), requestHeader.getConsumerGroup()); - response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED); - response.setRemark("parse the consumer's subscription failed"); - return response; - } - } - messageFilter = new ExpressionMessageFilter( - subscriptionData, consumerFilterData, brokerController.getConsumerFilterManager()); - } catch (Exception e) { - POP_LOGGER.warn("Parse the consumer's subscription[{}] error, group: {}", requestHeader.getExp(), - requestHeader.getConsumerGroup()); - response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED); - response.setRemark("parse the consumer's subscription failed"); - return response; - } - } else { - try { - // origin topic - subscriptionData = FilterAPI.build(requestHeader.getTopic(), "*", ExpressionType.TAG); - } catch (Exception e) { - POP_LOGGER.warn("Build default subscription error, group: {}", requestHeader.getConsumerGroup()); - } - } - - ExpressionMessageFilter finalMessageFilter = messageFilter; - SubscriptionData finalSubscriptionData = subscriptionData; - - PollingResult pollingResult = popLongPollingService.polling(ctx, request, new PollingHeader(requestHeader), finalSubscriptionData, finalMessageFilter); + PollingResult pollingResult = popLongPollingService.polling(ctx, request, new PollingHeader(requestHeader), subscriptionData, messageFilter); if (pollingResult == PollingResult.POLLING_SUC) { return null; } else if (pollingResult == PollingResult.POLLING_FULL) { @@ -214,19 +211,19 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, return response; } - private boolean hasMsgFromTopic(String topicName, int randomQ, NotificationRequestHeader requestHeader) + private boolean hasMsgFromTopic(String topicName, int randomQ, NotificationRequestHeader requestHeader, SubscriptionData subscriptionData, MessageFilter messageFilter) throws RemotingCommandException { TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topicName); - return hasMsgFromTopic(topicConfig, randomQ, requestHeader); + return hasMsgFromTopic(topicConfig, randomQ, requestHeader, subscriptionData, messageFilter); } - private boolean hasMsgFromTopic(TopicConfig topicConfig, int randomQ, NotificationRequestHeader requestHeader) + private boolean hasMsgFromTopic(TopicConfig topicConfig, int randomQ, NotificationRequestHeader requestHeader, SubscriptionData subscriptionData, MessageFilter messageFilter) throws RemotingCommandException { boolean hasMsg; if (topicConfig != null) { for (int i = 0; i < topicConfig.getReadQueueNums(); i++) { int queueId = (randomQ + i) % topicConfig.getReadQueueNums(); - hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), requestHeader, queueId); + hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), requestHeader, queueId, subscriptionData, messageFilter); if (hasMsg) { return true; } @@ -235,7 +232,7 @@ private boolean hasMsgFromTopic(TopicConfig topicConfig, int randomQ, Notificati return false; } - private boolean hasMsgFromQueue(String targetTopic, NotificationRequestHeader requestHeader, int queueId) throws RemotingCommandException { + private boolean hasMsgFromQueue(String targetTopic, NotificationRequestHeader requestHeader, int queueId, SubscriptionData subscriptionData, MessageFilter messageFilter) throws RemotingCommandException { if (Boolean.TRUE.equals(requestHeader.getOrder())) { if (this.brokerController.getConsumerOrderInfoManager().checkBlock(requestHeader.getAttemptId(), requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId, 0)) { return false; @@ -244,9 +241,37 @@ private boolean hasMsgFromQueue(String targetTopic, NotificationRequestHeader re long offset = getPopOffset(targetTopic, requestHeader.getConsumerGroup(), queueId); try { long restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(targetTopic, queueId) - offset; + int maxFilterMessageNum = brokerController.getMessageStoreConfig().getMaxFilterMessageSize() / ConsumeQueue.CQ_STORE_UNIT_SIZE; + boolean needFilter = restNum < maxFilterMessageNum && + subscriptionData != null && + messageFilter != null && + ExpressionType.isTagType(subscriptionData.getExpressionType()); + if (needFilter) { + ConsumeQueueInterface queue = this.brokerController.getMessageStore().getConsumeQueue(targetTopic, queueId); + // If the ConsumeQueue doesn't exist, it's not readable. + if (queue == null) { + return false; + } + ReferredIterator iterator = null; + try { + iterator = queue.iterateFrom(offset, maxFilterMessageNum); + if (iterator != null) { + while (iterator.hasNext()) { + CqUnit cqUnit = iterator.next(); + if (messageFilter.isMatchedByConsumeQueue(cqUnit.getValidTagsCodeAsLong(), cqUnit.getCqExtUnit())) { + return true; + } + } + } + } finally { + if (iterator != null) { + iterator.release(); + } + } + } return restNum > 0; - } catch (ConsumeQueueException e) { - throw new RemotingCommandException("Failed tp get max offset in queue", e); + } catch (ConsumeQueueException | RocksDBException e) { + throw new RemotingCommandException("Failed to get max offset in queue or iterate in queue", e); } } diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index caee5e45f26..5722dff4773 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.common; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.annotation.ImportantField; import org.apache.rocketmq.common.config.ConfigManagerVersion; import org.apache.rocketmq.common.constant.PermName; @@ -24,8 +25,6 @@ import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.common.utils.NetworkUtil; -import java.util.concurrent.TimeUnit; - public class BrokerConfig extends BrokerIdentity { private String brokerConfigPath = null; @@ -256,6 +255,8 @@ public class BrokerConfig extends BrokerIdentity { private boolean useSeparateRetryQueue = false; private boolean realTimeNotifyConsumerChange = true; + private boolean useMessageFilterForNotification = true; + private boolean litePullMessageEnable = true; // The period to sync broker member group from namesrv, default value is 1 second @@ -2407,4 +2408,12 @@ public int getLiteLagLatencyTopK() { public void setLiteLagLatencyTopK(int liteLagLatencyTopK) { this.liteLagLatencyTopK = liteLagLatencyTopK; } + + public boolean isUseMessageFilterForNotification() { + return useMessageFilterForNotification; + } + + public void setUseMessageFilterForNotification(boolean useMessageFilterForNotification) { + this.useMessageFilterForNotification = useMessageFilterForNotification; + } } From 4a84ab78b2ed7748a9fdb96aaa5c4d5a9485ec21 Mon Sep 17 00:00:00 2001 From: qianye Date: Mon, 2 Feb 2026 20:38:41 +0800 Subject: [PATCH 3/5] fix Change-Id: Ic25eceeb32a7486d154b382cc62fbf3df9e91571 --- .../apache/rocketmq/broker/processor/NotificationProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java index 55cd6ade15c..dba0d5410b5 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java @@ -250,7 +250,7 @@ private boolean hasMsgFromQueue(String targetTopic, NotificationRequestHeader re ConsumeQueueInterface queue = this.brokerController.getMessageStore().getConsumeQueue(targetTopic, queueId); // If the ConsumeQueue doesn't exist, it's not readable. if (queue == null) { - return false; + return false; } ReferredIterator iterator = null; try { From 2ad07d9a04b2db548c5b9f2ec146733df7c8ab3f Mon Sep 17 00:00:00 2001 From: qianye Date: Tue, 3 Feb 2026 10:04:46 +0800 Subject: [PATCH 4/5] fix Change-Id: I9274ca9ede5e6fb206dba362b653224447c79a40 --- .../rocketmq/broker/processor/NotificationProcessor.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java index dba0d5410b5..b28dfa9a4be 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java @@ -20,6 +20,7 @@ import io.netty.channel.ChannelHandlerContext; import java.util.Map; import java.util.Random; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.filter.ConsumerFilterData; import org.apache.rocketmq.broker.filter.ConsumerFilterManager; @@ -151,7 +152,9 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, SubscriptionData subscriptionData = null; ExpressionMessageFilter messageFilter = null; - if (brokerConfig.isUseMessageFilterForNotification() && requestHeader.getExp() != null && !requestHeader.getExp().isEmpty()) { + if (brokerConfig.isUseMessageFilterForNotification() && + StringUtils.isNotEmpty(requestHeader.getExpType()) && + StringUtils.isNotEmpty(requestHeader.getExp())) { try { // origin topic subscriptionData = FilterAPI.build( @@ -254,7 +257,9 @@ private boolean hasMsgFromQueue(String targetTopic, NotificationRequestHeader re } ReferredIterator iterator = null; try { - iterator = queue.iterateFrom(offset, maxFilterMessageNum); + // In order to take into account both the file CQ and the Rocksdb CQ, + // the count passed here is 32. + iterator = queue.iterateFrom(offset, 32); if (iterator != null) { while (iterator.hasNext()) { CqUnit cqUnit = iterator.next(); From 3de0e75fbd5d12dce98d16e19d8d689dcd91429d Mon Sep 17 00:00:00 2001 From: qianye Date: Tue, 3 Feb 2026 16:55:08 +0800 Subject: [PATCH 5/5] fix Change-Id: I19b1c387bb3f03f5acade1bef28daaea0369613a --- .../rocketmq/broker/processor/NotificationProcessor.java | 4 ++-- .../java/org/apache/rocketmq/common/BrokerConfig.java | 9 +++++++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java index b28dfa9a4be..24b587d1c6c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java @@ -47,7 +47,6 @@ import org.apache.rocketmq.remoting.protocol.header.NotificationResponseHeader; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; -import org.apache.rocketmq.store.ConsumeQueue; import org.apache.rocketmq.store.MessageFilter; import org.apache.rocketmq.store.exception.ConsumeQueueException; import org.apache.rocketmq.store.queue.ConsumeQueueInterface; @@ -244,7 +243,7 @@ private boolean hasMsgFromQueue(String targetTopic, NotificationRequestHeader re long offset = getPopOffset(targetTopic, requestHeader.getConsumerGroup(), queueId); try { long restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(targetTopic, queueId) - offset; - int maxFilterMessageNum = brokerController.getMessageStoreConfig().getMaxFilterMessageSize() / ConsumeQueue.CQ_STORE_UNIT_SIZE; + int maxFilterMessageNum = this.brokerController.getBrokerConfig().getMaxMessageFilterNumForNotification(); boolean needFilter = restNum < maxFilterMessageNum && subscriptionData != null && messageFilter != null && @@ -267,6 +266,7 @@ private boolean hasMsgFromQueue(String targetTopic, NotificationRequestHeader re return true; } } + return false; } } finally { if (iterator != null) { diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 5722dff4773..7271c12b187 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -256,6 +256,7 @@ public class BrokerConfig extends BrokerIdentity { private boolean realTimeNotifyConsumerChange = true; private boolean useMessageFilterForNotification = true; + private int maxMessageFilterNumForNotification = 64; private boolean litePullMessageEnable = true; @@ -2416,4 +2417,12 @@ public boolean isUseMessageFilterForNotification() { public void setUseMessageFilterForNotification(boolean useMessageFilterForNotification) { this.useMessageFilterForNotification = useMessageFilterForNotification; } + + public int getMaxMessageFilterNumForNotification() { + return maxMessageFilterNumForNotification; + } + + public void setMaxMessageFilterNumForNotification(int maxMessageFilterNumForNotification) { + this.maxMessageFilterNumForNotification = maxMessageFilterNumForNotification; + } }