diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java index ed7bfba06d6..e45f48fe5ac 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java @@ -46,6 +46,7 @@ public class ConsumeMessageContext { private BrokerStatsManager.StatsType commercialRcvStats; private int commercialRcvTimes; private int commercialRcvSize; + private int filterMessageCount; private String namespace; public String getConsumerGroup() { @@ -231,4 +232,12 @@ public String getNamespace() { public void setNamespace(String namespace) { this.namespace = namespace; } + + public int getFilterMessageCount() { + return filterMessageCount; + } + + public void setFilterMessageCount(int filterMessageCount) { + this.filterMessageCount = filterMessageCount; + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index 5b11bc2fef4..5f0735e74cf 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -713,6 +713,7 @@ protected void executeConsumeMessageHookBefore(RemotingCommand request, PullMess context.setAccountOwnerParent(ownerParent); context.setAccountOwnerSelf(ownerSelf); context.setNamespace(NamespaceUtil.getNamespaceFromResource(requestHeader.getTopic())); + context.setFilterMessageCount(getMessageResult.getFilterMessageCount()); switch (responseCode) { case ResponseCode.SUCCESS: diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index d6134683861..f8caf7beacd 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -816,6 +816,7 @@ public GetMessageResult getMessage(final String group, final String topic, final long maxOffset = 0; GetMessageResult getResult = new GetMessageResult(); + int filterMessageCount = 0; final long maxOffsetPy = this.commitLog.getMaxOffset(); @@ -927,6 +928,7 @@ public GetMessageResult getMessage(final String group, final String topic, final } // release... selectResult.release(); + filterMessageCount++; continue; } this.storeStatsService.getGetMessageTransferredMsgCount().add(cqUnit.getBatchNum()); @@ -976,6 +978,7 @@ public GetMessageResult getMessage(final String group, final String topic, final getResult.setNextBeginOffset(nextBeginOffset); getResult.setMaxOffset(maxOffset); getResult.setMinOffset(minOffset); + getResult.setFilterMessageCount(filterMessageCount); return getResult; } diff --git a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java index a7556dfb855..6f322a19e19 100644 --- a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java +++ b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java @@ -43,6 +43,8 @@ public class GetMessageResult { private long coldDataSum = 0L; + private int filterMessageCount; + public static final GetMessageResult NO_MATCH_LOGIC_QUEUE = new GetMessageResult(GetMessageStatus.NO_MATCHED_LOGIC_QUEUE, 0, 0, 0, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); @@ -177,10 +179,18 @@ public void setColdDataSum(long coldDataSum) { this.coldDataSum = coldDataSum; } + public int getFilterMessageCount() { + return filterMessageCount; + } + + public void setFilterMessageCount(int filterMessageCount) { + this.filterMessageCount = filterMessageCount; + } + @Override public String toString() { return "GetMessageResult [status=" + status + ", nextBeginOffset=" + nextBeginOffset + ", minOffset=" + minOffset + ", maxOffset=" + maxOffset + ", bufferTotalSize=" + bufferTotalSize + ", messageCount=" + messageCount - + ", suggestPullingFromSlave=" + suggestPullingFromSlave + "]"; + + ", filterMessageCount=" + filterMessageCount + ", suggestPullingFromSlave=" + suggestPullingFromSlave + "]"; } }