Skip to content

Commit 9399d7e

Browse files
committed
fix
Change-Id: I309419c125475aec5ceeac2ce95f47cb9f45693d
1 parent 89d331c commit 9399d7e

3 files changed

Lines changed: 76 additions & 1 deletion

File tree

broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
import java.util.Map;
2222
import java.util.Random;
2323
import org.apache.rocketmq.broker.BrokerController;
24+
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
25+
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
26+
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
2427
import org.apache.rocketmq.broker.longpolling.PollingHeader;
2528
import org.apache.rocketmq.broker.longpolling.PollingResult;
2629
import org.apache.rocketmq.broker.longpolling.PopLongPollingService;
@@ -29,6 +32,7 @@
2932
import org.apache.rocketmq.common.TopicConfig;
3033
import org.apache.rocketmq.common.constant.LoggerName;
3134
import org.apache.rocketmq.common.constant.PermName;
35+
import org.apache.rocketmq.common.filter.ExpressionType;
3236
import org.apache.rocketmq.common.help.FAQUrl;
3337
import org.apache.rocketmq.logging.org.slf4j.Logger;
3438
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -37,8 +41,10 @@
3741
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
3842
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
3943
import org.apache.rocketmq.remoting.protocol.ResponseCode;
44+
import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
4045
import org.apache.rocketmq.remoting.protocol.header.NotificationRequestHeader;
4146
import org.apache.rocketmq.remoting.protocol.header.NotificationResponseHeader;
47+
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
4248
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
4349
import org.apache.rocketmq.store.exception.ConsumeQueueException;
4450

@@ -154,7 +160,49 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx,
154160
}
155161

156162
if (!hasMsg) {
157-
PollingResult pollingResult = popLongPollingService.polling(ctx, request, new PollingHeader(requestHeader));
163+
SubscriptionData subscriptionData = null;
164+
ExpressionMessageFilter messageFilter = null;
165+
if (requestHeader.getExp() != null && !requestHeader.getExp().isEmpty()) {
166+
try {
167+
// origin topic
168+
subscriptionData = FilterAPI.build(
169+
requestHeader.getTopic(), requestHeader.getExp(), requestHeader.getExpType());
170+
171+
ConsumerFilterData consumerFilterData = null;
172+
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
173+
consumerFilterData = ConsumerFilterManager.build(
174+
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getExp(),
175+
requestHeader.getExpType(), System.currentTimeMillis());
176+
if (consumerFilterData == null) {
177+
POP_LOGGER.warn("Parse the consumer's subscription[{}] failed, group: {}",
178+
requestHeader.getExp(), requestHeader.getConsumerGroup());
179+
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
180+
response.setRemark("parse the consumer's subscription failed");
181+
return response;
182+
}
183+
}
184+
messageFilter = new ExpressionMessageFilter(
185+
subscriptionData, consumerFilterData, brokerController.getConsumerFilterManager());
186+
} catch (Exception e) {
187+
POP_LOGGER.warn("Parse the consumer's subscription[{}] error, group: {}", requestHeader.getExp(),
188+
requestHeader.getConsumerGroup());
189+
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
190+
response.setRemark("parse the consumer's subscription failed");
191+
return response;
192+
}
193+
} else {
194+
try {
195+
// origin topic
196+
subscriptionData = FilterAPI.build(requestHeader.getTopic(), "*", ExpressionType.TAG);
197+
} catch (Exception e) {
198+
POP_LOGGER.warn("Build default subscription error, group: {}", requestHeader.getConsumerGroup());
199+
}
200+
}
201+
202+
ExpressionMessageFilter finalMessageFilter = messageFilter;
203+
SubscriptionData finalSubscriptionData = subscriptionData;
204+
205+
PollingResult pollingResult = popLongPollingService.polling(ctx, request, new PollingHeader(requestHeader), finalSubscriptionData, finalMessageFilter);
158206
if (pollingResult == PollingResult.POLLING_SUC) {
159207
return null;
160208
} else if (pollingResult == PollingResult.POLLING_FULL) {

remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ public class NotificationRequestHeader extends TopicQueueRequestHeader {
4444
private Boolean order = Boolean.FALSE;
4545
private String attemptId;
4646

47+
private String expType;
48+
private String exp;
49+
4750
@CFNotNull
4851
@Override
4952
public void checkFields() throws RemotingCommandException {
@@ -108,6 +111,22 @@ public void setAttemptId(String attemptId) {
108111
this.attemptId = attemptId;
109112
}
110113

114+
public String getExpType() {
115+
return expType;
116+
}
117+
118+
public void setExpType(String expType) {
119+
this.expType = expType;
120+
}
121+
122+
public String getExp() {
123+
return exp;
124+
}
125+
126+
public void setExp(String exp) {
127+
this.exp = exp;
128+
}
129+
111130
@Override
112131
public String toString() {
113132
return MoreObjects.toStringHelper(this)

test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,12 @@ public CompletableFuture<Boolean> notification(String brokerAddr, String topic,
199199

200200
public CompletableFuture<Boolean> notification(String brokerAddr, String topic,
201201
String consumerGroup, int queueId, Boolean order, String attemptId, long pollTime, long bornTime, long timeoutMillis) {
202+
return notification(brokerAddr, topic, consumerGroup, queueId, order, attemptId, pollTime, bornTime, timeoutMillis, null, null);
203+
}
204+
205+
206+
public CompletableFuture<Boolean> notification(String brokerAddr, String topic,
207+
String consumerGroup, int queueId, Boolean order, String attemptId, long pollTime, long bornTime, long timeoutMillis, String expType, String exp) {
202208
NotificationRequestHeader requestHeader = new NotificationRequestHeader();
203209
requestHeader.setConsumerGroup(consumerGroup);
204210
requestHeader.setTopic(topic);
@@ -207,6 +213,8 @@ public CompletableFuture<Boolean> notification(String brokerAddr, String topic,
207213
requestHeader.setBornTime(bornTime);
208214
requestHeader.setOrder(order);
209215
requestHeader.setAttemptId(attemptId);
216+
requestHeader.setExpType(expType);
217+
requestHeader.setExp(exp);
210218
return this.mqClientAPI.notification(brokerAddr, requestHeader, timeoutMillis);
211219
}
212220
}

0 commit comments

Comments
 (0)