diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java index 4563132fe48..24b587d1c6c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java @@ -20,7 +20,11 @@ import io.netty.channel.ChannelHandlerContext; import java.util.Map; import java.util.Random; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.filter.ConsumerFilterData; +import org.apache.rocketmq.broker.filter.ConsumerFilterManager; +import org.apache.rocketmq.broker.filter.ExpressionMessageFilter; import org.apache.rocketmq.broker.longpolling.PollingHeader; import org.apache.rocketmq.broker.longpolling.PollingResult; import org.apache.rocketmq.broker.longpolling.PopLongPollingService; @@ -29,6 +33,7 @@ import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.PermName; +import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.help.FAQUrl; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; @@ -37,10 +42,17 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.filter.FilterAPI; import org.apache.rocketmq.remoting.protocol.header.NotificationRequestHeader; import org.apache.rocketmq.remoting.protocol.header.NotificationResponseHeader; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.store.MessageFilter; import org.apache.rocketmq.store.exception.ConsumeQueueException; +import org.apache.rocketmq.store.queue.ConsumeQueueInterface; +import org.apache.rocketmq.store.queue.CqUnit; +import org.apache.rocketmq.store.queue.ReferredIterator; +import org.rocksdb.RocksDBException; public class NotificationProcessor implements NettyRequestProcessor { private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); @@ -136,25 +148,60 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, int randomQ = random.nextInt(100); boolean hasMsg = false; BrokerConfig brokerConfig = brokerController.getBrokerConfig(); + + SubscriptionData subscriptionData = null; + ExpressionMessageFilter messageFilter = null; + if (brokerConfig.isUseMessageFilterForNotification() && + StringUtils.isNotEmpty(requestHeader.getExpType()) && + StringUtils.isNotEmpty(requestHeader.getExp())) { + try { + // origin topic + subscriptionData = FilterAPI.build( + requestHeader.getTopic(), requestHeader.getExp(), requestHeader.getExpType()); + + ConsumerFilterData consumerFilterData = null; + if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) { + consumerFilterData = ConsumerFilterManager.build( + requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getExp(), + requestHeader.getExpType(), System.currentTimeMillis()); + if (consumerFilterData == null) { + POP_LOGGER.warn("Parse the consumer's subscription[{}] failed, group: {}", + requestHeader.getExp(), requestHeader.getConsumerGroup()); + response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED); + response.setRemark("parse the consumer's subscription failed"); + return response; + } + } + messageFilter = new ExpressionMessageFilter( + subscriptionData, consumerFilterData, brokerController.getConsumerFilterManager()); + } catch (Exception e) { + POP_LOGGER.warn("Parse the consumer's subscription[{}] error, group: {}", requestHeader.getExp(), + requestHeader.getConsumerGroup()); + response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED); + response.setRemark("parse the consumer's subscription failed"); + return response; + } + } + if (requestHeader.getQueueId() < 0) { // read all queue - hasMsg = hasMsgFromTopic(topicConfig, randomQ, requestHeader); + hasMsg = hasMsgFromTopic(topicConfig, randomQ, requestHeader, subscriptionData, messageFilter); } else { int queueId = requestHeader.getQueueId(); - hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), requestHeader, queueId); + hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), requestHeader, queueId, subscriptionData, messageFilter); } // if it doesn't have message, fetch retry if (!hasMsg) { String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2()); - hasMsg = hasMsgFromTopic(retryTopic, randomQ, requestHeader); + hasMsg = hasMsgFromTopic(retryTopic, randomQ, requestHeader, null, null); if (!hasMsg && brokerConfig.isEnableRetryTopicV2() && brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) { String retryTopicConfigV1 = KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup()); - hasMsg = hasMsgFromTopic(retryTopicConfigV1, randomQ, requestHeader); + hasMsg = hasMsgFromTopic(retryTopicConfigV1, randomQ, requestHeader, null, null); } } if (!hasMsg) { - PollingResult pollingResult = popLongPollingService.polling(ctx, request, new PollingHeader(requestHeader)); + PollingResult pollingResult = popLongPollingService.polling(ctx, request, new PollingHeader(requestHeader), subscriptionData, messageFilter); if (pollingResult == PollingResult.POLLING_SUC) { return null; } else if (pollingResult == PollingResult.POLLING_FULL) { @@ -166,19 +213,19 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, return response; } - private boolean hasMsgFromTopic(String topicName, int randomQ, NotificationRequestHeader requestHeader) + private boolean hasMsgFromTopic(String topicName, int randomQ, NotificationRequestHeader requestHeader, SubscriptionData subscriptionData, MessageFilter messageFilter) throws RemotingCommandException { TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topicName); - return hasMsgFromTopic(topicConfig, randomQ, requestHeader); + return hasMsgFromTopic(topicConfig, randomQ, requestHeader, subscriptionData, messageFilter); } - private boolean hasMsgFromTopic(TopicConfig topicConfig, int randomQ, NotificationRequestHeader requestHeader) + private boolean hasMsgFromTopic(TopicConfig topicConfig, int randomQ, NotificationRequestHeader requestHeader, SubscriptionData subscriptionData, MessageFilter messageFilter) throws RemotingCommandException { boolean hasMsg; if (topicConfig != null) { for (int i = 0; i < topicConfig.getReadQueueNums(); i++) { int queueId = (randomQ + i) % topicConfig.getReadQueueNums(); - hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), requestHeader, queueId); + hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), requestHeader, queueId, subscriptionData, messageFilter); if (hasMsg) { return true; } @@ -187,7 +234,7 @@ private boolean hasMsgFromTopic(TopicConfig topicConfig, int randomQ, Notificati return false; } - private boolean hasMsgFromQueue(String targetTopic, NotificationRequestHeader requestHeader, int queueId) throws RemotingCommandException { + private boolean hasMsgFromQueue(String targetTopic, NotificationRequestHeader requestHeader, int queueId, SubscriptionData subscriptionData, MessageFilter messageFilter) throws RemotingCommandException { if (Boolean.TRUE.equals(requestHeader.getOrder())) { if (this.brokerController.getConsumerOrderInfoManager().checkBlock(requestHeader.getAttemptId(), requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId, 0)) { return false; @@ -196,9 +243,40 @@ private boolean hasMsgFromQueue(String targetTopic, NotificationRequestHeader re long offset = getPopOffset(targetTopic, requestHeader.getConsumerGroup(), queueId); try { long restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(targetTopic, queueId) - offset; + int maxFilterMessageNum = this.brokerController.getBrokerConfig().getMaxMessageFilterNumForNotification(); + boolean needFilter = restNum < maxFilterMessageNum && + subscriptionData != null && + messageFilter != null && + ExpressionType.isTagType(subscriptionData.getExpressionType()); + if (needFilter) { + ConsumeQueueInterface queue = this.brokerController.getMessageStore().getConsumeQueue(targetTopic, queueId); + // If the ConsumeQueue doesn't exist, it's not readable. + if (queue == null) { + return false; + } + ReferredIterator iterator = null; + try { + // In order to take into account both the file CQ and the Rocksdb CQ, + // the count passed here is 32. + iterator = queue.iterateFrom(offset, 32); + if (iterator != null) { + while (iterator.hasNext()) { + CqUnit cqUnit = iterator.next(); + if (messageFilter.isMatchedByConsumeQueue(cqUnit.getValidTagsCodeAsLong(), cqUnit.getCqExtUnit())) { + return true; + } + } + return false; + } + } finally { + if (iterator != null) { + iterator.release(); + } + } + } return restNum > 0; - } catch (ConsumeQueueException e) { - throw new RemotingCommandException("Failed tp get max offset in queue", e); + } catch (ConsumeQueueException | RocksDBException e) { + throw new RemotingCommandException("Failed to get max offset in queue or iterate in queue", e); } } diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index caee5e45f26..7271c12b187 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.common; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.annotation.ImportantField; import org.apache.rocketmq.common.config.ConfigManagerVersion; import org.apache.rocketmq.common.constant.PermName; @@ -24,8 +25,6 @@ import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.common.utils.NetworkUtil; -import java.util.concurrent.TimeUnit; - public class BrokerConfig extends BrokerIdentity { private String brokerConfigPath = null; @@ -256,6 +255,9 @@ public class BrokerConfig extends BrokerIdentity { private boolean useSeparateRetryQueue = false; private boolean realTimeNotifyConsumerChange = true; + private boolean useMessageFilterForNotification = true; + private int maxMessageFilterNumForNotification = 64; + private boolean litePullMessageEnable = true; // The period to sync broker member group from namesrv, default value is 1 second @@ -2407,4 +2409,20 @@ public int getLiteLagLatencyTopK() { public void setLiteLagLatencyTopK(int liteLagLatencyTopK) { this.liteLagLatencyTopK = liteLagLatencyTopK; } + + public boolean isUseMessageFilterForNotification() { + return useMessageFilterForNotification; + } + + public void setUseMessageFilterForNotification(boolean useMessageFilterForNotification) { + this.useMessageFilterForNotification = useMessageFilterForNotification; + } + + public int getMaxMessageFilterNumForNotification() { + return maxMessageFilterNumForNotification; + } + + public void setMaxMessageFilterNumForNotification(int maxMessageFilterNumForNotification) { + this.maxMessageFilterNumForNotification = maxMessageFilterNumForNotification; + } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java index 0e484f82c0d..46c5930c1d1 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java @@ -44,6 +44,9 @@ public class NotificationRequestHeader extends TopicQueueRequestHeader { private Boolean order = Boolean.FALSE; private String attemptId; + private String expType; + private String exp; + @CFNotNull @Override public void checkFields() throws RemotingCommandException { @@ -108,6 +111,22 @@ public void setAttemptId(String attemptId) { this.attemptId = attemptId; } + public String getExpType() { + return expType; + } + + public void setExpType(String expType) { + this.expType = expType; + } + + public String getExp() { + return exp; + } + + public void setExp(String exp) { + this.exp = exp; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java index 09c60c0b45f..c45a26c59d0 100644 --- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java +++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java @@ -199,6 +199,12 @@ public CompletableFuture notification(String brokerAddr, String topic, public CompletableFuture notification(String brokerAddr, String topic, String consumerGroup, int queueId, Boolean order, String attemptId, long pollTime, long bornTime, long timeoutMillis) { + return notification(brokerAddr, topic, consumerGroup, queueId, order, attemptId, pollTime, bornTime, timeoutMillis, null, null); + } + + + public CompletableFuture notification(String brokerAddr, String topic, + String consumerGroup, int queueId, Boolean order, String attemptId, long pollTime, long bornTime, long timeoutMillis, String expType, String exp) { NotificationRequestHeader requestHeader = new NotificationRequestHeader(); requestHeader.setConsumerGroup(consumerGroup); requestHeader.setTopic(topic); @@ -207,6 +213,8 @@ public CompletableFuture notification(String brokerAddr, String topic, requestHeader.setBornTime(bornTime); requestHeader.setOrder(order); requestHeader.setAttemptId(attemptId); + requestHeader.setExpType(expType); + requestHeader.setExp(exp); return this.mqClientAPI.notification(brokerAddr, requestHeader, timeoutMillis); } }