Skip to content

Commit b5ab93c

Browse files
authored
[ISSUE #9286] Counting the filtered message when filter by SQL92 (#9287)
1 parent 4a15256 commit b5ab93c

4 files changed

Lines changed: 24 additions & 1 deletion

File tree

broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public class ConsumeMessageContext {
4646
private BrokerStatsManager.StatsType commercialRcvStats;
4747
private int commercialRcvTimes;
4848
private int commercialRcvSize;
49+
private int filterMessageCount;
4950

5051
private String namespace;
5152
public String getConsumerGroup() {
@@ -231,4 +232,12 @@ public String getNamespace() {
231232
public void setNamespace(String namespace) {
232233
this.namespace = namespace;
233234
}
235+
236+
public int getFilterMessageCount() {
237+
return filterMessageCount;
238+
}
239+
240+
public void setFilterMessageCount(int filterMessageCount) {
241+
this.filterMessageCount = filterMessageCount;
242+
}
234243
}

broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,7 @@ protected void executeConsumeMessageHookBefore(RemotingCommand request, PullMess
713713
context.setAccountOwnerParent(ownerParent);
714714
context.setAccountOwnerSelf(ownerSelf);
715715
context.setNamespace(NamespaceUtil.getNamespaceFromResource(requestHeader.getTopic()));
716+
context.setFilterMessageCount(getMessageResult.getFilterMessageCount());
716717

717718
switch (responseCode) {
718719
case ResponseCode.SUCCESS:

store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -816,6 +816,7 @@ public GetMessageResult getMessage(final String group, final String topic, final
816816
long maxOffset = 0;
817817

818818
GetMessageResult getResult = new GetMessageResult();
819+
int filterMessageCount = 0;
819820

820821
final long maxOffsetPy = this.commitLog.getMaxOffset();
821822

@@ -927,6 +928,7 @@ public GetMessageResult getMessage(final String group, final String topic, final
927928
}
928929
// release...
929930
selectResult.release();
931+
filterMessageCount++;
930932
continue;
931933
}
932934
this.storeStatsService.getGetMessageTransferredMsgCount().add(cqUnit.getBatchNum());
@@ -976,6 +978,7 @@ public GetMessageResult getMessage(final String group, final String topic, final
976978
getResult.setNextBeginOffset(nextBeginOffset);
977979
getResult.setMaxOffset(maxOffset);
978980
getResult.setMinOffset(minOffset);
981+
getResult.setFilterMessageCount(filterMessageCount);
979982
return getResult;
980983
}
981984

store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ public class GetMessageResult {
4343

4444
private long coldDataSum = 0L;
4545

46+
private int filterMessageCount;
47+
4648
public static final GetMessageResult NO_MATCH_LOGIC_QUEUE =
4749
new GetMessageResult(GetMessageStatus.NO_MATCHED_LOGIC_QUEUE, 0, 0, 0, Collections.emptyList(),
4850
Collections.emptyList(), Collections.emptyList());
@@ -177,10 +179,18 @@ public void setColdDataSum(long coldDataSum) {
177179
this.coldDataSum = coldDataSum;
178180
}
179181

182+
public int getFilterMessageCount() {
183+
return filterMessageCount;
184+
}
185+
186+
public void setFilterMessageCount(int filterMessageCount) {
187+
this.filterMessageCount = filterMessageCount;
188+
}
189+
180190
@Override
181191
public String toString() {
182192
return "GetMessageResult [status=" + status + ", nextBeginOffset=" + nextBeginOffset + ", minOffset="
183193
+ minOffset + ", maxOffset=" + maxOffset + ", bufferTotalSize=" + bufferTotalSize + ", messageCount=" + messageCount
184-
+ ", suggestPullingFromSlave=" + suggestPullingFromSlave + "]";
194+
+ ", filterMessageCount=" + filterMessageCount + ", suggestPullingFromSlave=" + suggestPullingFromSlave + "]";
185195
}
186196
}

0 commit comments

Comments
 (0)