Skip to content

Commit 3de0e75

Browse files
committed
fix
Change-Id: I19b1c387bb3f03f5acade1bef28daaea0369613a
1 parent 2ad07d9 commit 3de0e75

2 files changed

Lines changed: 11 additions & 2 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.apache.rocketmq.remoting.protocol.header.NotificationResponseHeader;
4848
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
4949
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
50-
import org.apache.rocketmq.store.ConsumeQueue;
5150
import org.apache.rocketmq.store.MessageFilter;
5251
import org.apache.rocketmq.store.exception.ConsumeQueueException;
5352
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
@@ -244,7 +243,7 @@ private boolean hasMsgFromQueue(String targetTopic, NotificationRequestHeader re
244243
long offset = getPopOffset(targetTopic, requestHeader.getConsumerGroup(), queueId);
245244
try {
246245
long restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(targetTopic, queueId) - offset;
247-
int maxFilterMessageNum = brokerController.getMessageStoreConfig().getMaxFilterMessageSize() / ConsumeQueue.CQ_STORE_UNIT_SIZE;
246+
int maxFilterMessageNum = this.brokerController.getBrokerConfig().getMaxMessageFilterNumForNotification();
248247
boolean needFilter = restNum < maxFilterMessageNum &&
249248
subscriptionData != null &&
250249
messageFilter != null &&
@@ -267,6 +266,7 @@ private boolean hasMsgFromQueue(String targetTopic, NotificationRequestHeader re
267266
return true;
268267
}
269268
}
269+
return false;
270270
}
271271
} finally {
272272
if (iterator != null) {

common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ public class BrokerConfig extends BrokerIdentity {
256256
private boolean realTimeNotifyConsumerChange = true;
257257

258258
private boolean useMessageFilterForNotification = true;
259+
private int maxMessageFilterNumForNotification = 64;
259260

260261
private boolean litePullMessageEnable = true;
261262

@@ -2416,4 +2417,12 @@ public boolean isUseMessageFilterForNotification() {
24162417
public void setUseMessageFilterForNotification(boolean useMessageFilterForNotification) {
24172418
this.useMessageFilterForNotification = useMessageFilterForNotification;
24182419
}
2420+
2421+
public int getMaxMessageFilterNumForNotification() {
2422+
return maxMessageFilterNumForNotification;
2423+
}
2424+
2425+
public void setMaxMessageFilterNumForNotification(int maxMessageFilterNumForNotification) {
2426+
this.maxMessageFilterNumForNotification = maxMessageFilterNumForNotification;
2427+
}
24192428
}

0 commit comments

Comments
 (0)