Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
import io.netty.channel.ChannelHandlerContext;
import java.util.Map;
import java.util.Random;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.broker.longpolling.PollingHeader;
import org.apache.rocketmq.broker.longpolling.PollingResult;
import org.apache.rocketmq.broker.longpolling.PopLongPollingService;
Expand All @@ -29,6 +33,7 @@
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
Expand All @@ -37,10 +42,17 @@
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
import org.apache.rocketmq.remoting.protocol.header.NotificationRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.NotificationResponseHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.exception.ConsumeQueueException;
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.store.queue.ReferredIterator;
import org.rocksdb.RocksDBException;

public class NotificationProcessor implements NettyRequestProcessor {
private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
Expand Down Expand Up @@ -136,25 +148,60 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx,
int randomQ = random.nextInt(100);
boolean hasMsg = false;
BrokerConfig brokerConfig = brokerController.getBrokerConfig();

SubscriptionData subscriptionData = null;
ExpressionMessageFilter messageFilter = null;
if (brokerConfig.isUseMessageFilterForNotification() &&
StringUtils.isNotEmpty(requestHeader.getExpType()) &&
StringUtils.isNotEmpty(requestHeader.getExp())) {
try {
// origin topic
subscriptionData = FilterAPI.build(
requestHeader.getTopic(), requestHeader.getExp(), requestHeader.getExpType());

ConsumerFilterData consumerFilterData = null;
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
consumerFilterData = ConsumerFilterManager.build(
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getExp(),
requestHeader.getExpType(), System.currentTimeMillis());
if (consumerFilterData == null) {
POP_LOGGER.warn("Parse the consumer's subscription[{}] failed, group: {}",
requestHeader.getExp(), requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
response.setRemark("parse the consumer's subscription failed");
return response;
}
}
messageFilter = new ExpressionMessageFilter(
subscriptionData, consumerFilterData, brokerController.getConsumerFilterManager());
} catch (Exception e) {
POP_LOGGER.warn("Parse the consumer's subscription[{}] error, group: {}", requestHeader.getExp(),
requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
response.setRemark("parse the consumer's subscription failed");
return response;
}
}

if (requestHeader.getQueueId() < 0) {
// read all queue
hasMsg = hasMsgFromTopic(topicConfig, randomQ, requestHeader);
hasMsg = hasMsgFromTopic(topicConfig, randomQ, requestHeader, subscriptionData, messageFilter);
} else {
int queueId = requestHeader.getQueueId();
hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), requestHeader, queueId);
hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), requestHeader, queueId, subscriptionData, messageFilter);
}
// if it doesn't have message, fetch retry
if (!hasMsg) {
String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2());
hasMsg = hasMsgFromTopic(retryTopic, randomQ, requestHeader);
hasMsg = hasMsgFromTopic(retryTopic, randomQ, requestHeader, null, null);
if (!hasMsg && brokerConfig.isEnableRetryTopicV2() && brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
String retryTopicConfigV1 = KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup());
hasMsg = hasMsgFromTopic(retryTopicConfigV1, randomQ, requestHeader);
hasMsg = hasMsgFromTopic(retryTopicConfigV1, randomQ, requestHeader, null, null);
}
}

if (!hasMsg) {
PollingResult pollingResult = popLongPollingService.polling(ctx, request, new PollingHeader(requestHeader));
PollingResult pollingResult = popLongPollingService.polling(ctx, request, new PollingHeader(requestHeader), subscriptionData, messageFilter);
if (pollingResult == PollingResult.POLLING_SUC) {
return null;
} else if (pollingResult == PollingResult.POLLING_FULL) {
Expand All @@ -166,19 +213,19 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx,
return response;
}

private boolean hasMsgFromTopic(String topicName, int randomQ, NotificationRequestHeader requestHeader)
private boolean hasMsgFromTopic(String topicName, int randomQ, NotificationRequestHeader requestHeader, SubscriptionData subscriptionData, MessageFilter messageFilter)
throws RemotingCommandException {
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topicName);
return hasMsgFromTopic(topicConfig, randomQ, requestHeader);
return hasMsgFromTopic(topicConfig, randomQ, requestHeader, subscriptionData, messageFilter);
}

private boolean hasMsgFromTopic(TopicConfig topicConfig, int randomQ, NotificationRequestHeader requestHeader)
private boolean hasMsgFromTopic(TopicConfig topicConfig, int randomQ, NotificationRequestHeader requestHeader, SubscriptionData subscriptionData, MessageFilter messageFilter)
throws RemotingCommandException {
boolean hasMsg;
if (topicConfig != null) {
for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
int queueId = (randomQ + i) % topicConfig.getReadQueueNums();
hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), requestHeader, queueId);
hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), requestHeader, queueId, subscriptionData, messageFilter);
if (hasMsg) {
return true;
}
Expand All @@ -187,7 +234,7 @@ private boolean hasMsgFromTopic(TopicConfig topicConfig, int randomQ, Notificati
return false;
}

private boolean hasMsgFromQueue(String targetTopic, NotificationRequestHeader requestHeader, int queueId) throws RemotingCommandException {
private boolean hasMsgFromQueue(String targetTopic, NotificationRequestHeader requestHeader, int queueId, SubscriptionData subscriptionData, MessageFilter messageFilter) throws RemotingCommandException {
if (Boolean.TRUE.equals(requestHeader.getOrder())) {
if (this.brokerController.getConsumerOrderInfoManager().checkBlock(requestHeader.getAttemptId(), requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId, 0)) {
return false;
Expand All @@ -196,9 +243,40 @@ private boolean hasMsgFromQueue(String targetTopic, NotificationRequestHeader re
long offset = getPopOffset(targetTopic, requestHeader.getConsumerGroup(), queueId);
try {
long restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(targetTopic, queueId) - offset;
int maxFilterMessageNum = this.brokerController.getBrokerConfig().getMaxMessageFilterNumForNotification();
boolean needFilter = restNum < maxFilterMessageNum &&
subscriptionData != null &&
messageFilter != null &&
ExpressionType.isTagType(subscriptionData.getExpressionType());
if (needFilter) {
ConsumeQueueInterface queue = this.brokerController.getMessageStore().getConsumeQueue(targetTopic, queueId);
// If the ConsumeQueue doesn't exist, it's not readable.
if (queue == null) {
return false;
}
ReferredIterator<CqUnit> iterator = null;
try {
// In order to take into account both the file CQ and the Rocksdb CQ,
// the count passed here is 32.
iterator = queue.iterateFrom(offset, 32);
if (iterator != null) {
while (iterator.hasNext()) {
CqUnit cqUnit = iterator.next();
if (messageFilter.isMatchedByConsumeQueue(cqUnit.getValidTagsCodeAsLong(), cqUnit.getCqExtUnit())) {
return true;
}
}
return false;
}
} finally {
if (iterator != null) {
iterator.release();
}
}
}
return restNum > 0;
} catch (ConsumeQueueException e) {
throw new RemotingCommandException("Failed tp get max offset in queue", e);
} catch (ConsumeQueueException | RocksDBException e) {
throw new RemotingCommandException("Failed to get max offset in queue or iterate in queue", e);
}
}

Expand Down
22 changes: 20 additions & 2 deletions common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.common;

import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.common.config.ConfigManagerVersion;
import org.apache.rocketmq.common.constant.PermName;
Expand All @@ -24,8 +25,6 @@
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.NetworkUtil;

import java.util.concurrent.TimeUnit;

public class BrokerConfig extends BrokerIdentity {

private String brokerConfigPath = null;
Expand Down Expand Up @@ -256,6 +255,9 @@ public class BrokerConfig extends BrokerIdentity {
private boolean useSeparateRetryQueue = false;
private boolean realTimeNotifyConsumerChange = true;

private boolean useMessageFilterForNotification = true;
private int maxMessageFilterNumForNotification = 64;

private boolean litePullMessageEnable = true;

// The period to sync broker member group from namesrv, default value is 1 second
Expand Down Expand Up @@ -2407,4 +2409,20 @@ public int getLiteLagLatencyTopK() {
public void setLiteLagLatencyTopK(int liteLagLatencyTopK) {
this.liteLagLatencyTopK = liteLagLatencyTopK;
}

public boolean isUseMessageFilterForNotification() {
return useMessageFilterForNotification;
}

public void setUseMessageFilterForNotification(boolean useMessageFilterForNotification) {
this.useMessageFilterForNotification = useMessageFilterForNotification;
}

public int getMaxMessageFilterNumForNotification() {
return maxMessageFilterNumForNotification;
}

public void setMaxMessageFilterNumForNotification(int maxMessageFilterNumForNotification) {
this.maxMessageFilterNumForNotification = maxMessageFilterNumForNotification;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public class NotificationRequestHeader extends TopicQueueRequestHeader {
private Boolean order = Boolean.FALSE;
private String attemptId;

private String expType;
private String exp;

@CFNotNull
@Override
public void checkFields() throws RemotingCommandException {
Expand Down Expand Up @@ -108,6 +111,22 @@ public void setAttemptId(String attemptId) {
this.attemptId = attemptId;
}

public String getExpType() {
return expType;
}

public void setExpType(String expType) {
this.expType = expType;
}

public String getExp() {
return exp;
}

public void setExp(String exp) {
this.exp = exp;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ public CompletableFuture<Boolean> notification(String brokerAddr, String topic,

public CompletableFuture<Boolean> notification(String brokerAddr, String topic,
String consumerGroup, int queueId, Boolean order, String attemptId, long pollTime, long bornTime, long timeoutMillis) {
return notification(brokerAddr, topic, consumerGroup, queueId, order, attemptId, pollTime, bornTime, timeoutMillis, null, null);
}


public CompletableFuture<Boolean> notification(String brokerAddr, String topic,
String consumerGroup, int queueId, Boolean order, String attemptId, long pollTime, long bornTime, long timeoutMillis, String expType, String exp) {
NotificationRequestHeader requestHeader = new NotificationRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
requestHeader.setTopic(topic);
Expand All @@ -207,6 +213,8 @@ public CompletableFuture<Boolean> notification(String brokerAddr, String topic,
requestHeader.setBornTime(bornTime);
requestHeader.setOrder(order);
requestHeader.setAttemptId(attemptId);
requestHeader.setExpType(expType);
requestHeader.setExp(exp);
return this.mqClientAPI.notification(brokerAddr, requestHeader, timeoutMillis);
}
}
Loading