|
20 | 20 | import io.netty.channel.ChannelHandlerContext; |
21 | 21 | import java.util.Map; |
22 | 22 | import java.util.Random; |
| 23 | +import org.apache.commons.lang3.StringUtils; |
23 | 24 | import org.apache.rocketmq.broker.BrokerController; |
24 | 25 | import org.apache.rocketmq.broker.filter.ConsumerFilterData; |
25 | 26 | import org.apache.rocketmq.broker.filter.ConsumerFilterManager; |
@@ -151,7 +152,9 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, |
151 | 152 |
|
152 | 153 | SubscriptionData subscriptionData = null; |
153 | 154 | ExpressionMessageFilter messageFilter = null; |
154 | | - if (brokerConfig.isUseMessageFilterForNotification() && requestHeader.getExp() != null && !requestHeader.getExp().isEmpty()) { |
| 155 | + if (brokerConfig.isUseMessageFilterForNotification() && |
| 156 | + StringUtils.isNotEmpty(requestHeader.getExpType()) && |
| 157 | + StringUtils.isNotEmpty(requestHeader.getExp())) { |
155 | 158 | try { |
156 | 159 | // origin topic |
157 | 160 | subscriptionData = FilterAPI.build( |
@@ -254,7 +257,9 @@ private boolean hasMsgFromQueue(String targetTopic, NotificationRequestHeader re |
254 | 257 | } |
255 | 258 | ReferredIterator<CqUnit> iterator = null; |
256 | 259 | try { |
257 | | - iterator = queue.iterateFrom(offset, maxFilterMessageNum); |
| 260 | + // In order to take into account both the file CQ and the Rocksdb CQ, |
| 261 | + // the count passed here is 32. |
| 262 | + iterator = queue.iterateFrom(offset, 32); |
258 | 263 | if (iterator != null) { |
259 | 264 | while (iterator.hasNext()) { |
260 | 265 | CqUnit cqUnit = iterator.next(); |
|
0 commit comments