Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class ConsumeMessageContext {
private BrokerStatsManager.StatsType commercialRcvStats;
private int commercialRcvTimes;
private int commercialRcvSize;
private int filterMessageCount;

private String namespace;
public String getConsumerGroup() {
Expand Down Expand Up @@ -231,4 +232,12 @@ public String getNamespace() {
public void setNamespace(String namespace) {
this.namespace = namespace;
}

public int getFilterMessageCount() {
return filterMessageCount;
}

public void setFilterMessageCount(int filterMessageCount) {
this.filterMessageCount = filterMessageCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,7 @@ protected void executeConsumeMessageHookBefore(RemotingCommand request, PullMess
context.setAccountOwnerParent(ownerParent);
context.setAccountOwnerSelf(ownerSelf);
context.setNamespace(NamespaceUtil.getNamespaceFromResource(requestHeader.getTopic()));
context.setFilterMessageCount(getMessageResult.getFilterMessageCount());

switch (responseCode) {
case ResponseCode.SUCCESS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,7 @@ public GetMessageResult getMessage(final String group, final String topic, final
long maxOffset = 0;

GetMessageResult getResult = new GetMessageResult();
int filterMessageCount = 0;

final long maxOffsetPy = this.commitLog.getMaxOffset();

Expand Down Expand Up @@ -927,6 +928,7 @@ public GetMessageResult getMessage(final String group, final String topic, final
}
// release...
selectResult.release();
filterMessageCount++;
continue;
}
this.storeStatsService.getGetMessageTransferredMsgCount().add(cqUnit.getBatchNum());
Expand Down Expand Up @@ -976,6 +978,7 @@ public GetMessageResult getMessage(final String group, final String topic, final
getResult.setNextBeginOffset(nextBeginOffset);
getResult.setMaxOffset(maxOffset);
getResult.setMinOffset(minOffset);
getResult.setFilterMessageCount(filterMessageCount);
return getResult;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public class GetMessageResult {

private long coldDataSum = 0L;

private int filterMessageCount;

public static final GetMessageResult NO_MATCH_LOGIC_QUEUE =
new GetMessageResult(GetMessageStatus.NO_MATCHED_LOGIC_QUEUE, 0, 0, 0, Collections.emptyList(),
Collections.emptyList(), Collections.emptyList());
Expand Down Expand Up @@ -177,10 +179,18 @@ public void setColdDataSum(long coldDataSum) {
this.coldDataSum = coldDataSum;
}

public int getFilterMessageCount() {
return filterMessageCount;
}

public void setFilterMessageCount(int filterMessageCount) {
this.filterMessageCount = filterMessageCount;
}

@Override
public String toString() {
return "GetMessageResult [status=" + status + ", nextBeginOffset=" + nextBeginOffset + ", minOffset="
+ minOffset + ", maxOffset=" + maxOffset + ", bufferTotalSize=" + bufferTotalSize + ", messageCount=" + messageCount
+ ", suggestPullingFromSlave=" + suggestPullingFromSlave + "]";
+ ", filterMessageCount=" + filterMessageCount + ", suggestPullingFromSlave=" + suggestPullingFromSlave + "]";
}
}
Loading