Skip to content

Commit d5b474d

Browse files
authored
[ISSUE #9297] Supports outputting topic put TPS in TopicStatusSubCommand (#9298)
1 parent 445b945 commit d5b474d

File tree

5 files changed

+18
-0
lines changed

5 files changed

+18
-0
lines changed

broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1837,6 +1837,7 @@ private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx,
18371837
topicStatsTable.getOffsetTable().put(mq, topicOffset);
18381838
}
18391839

1840+
topicStatsTable.setTopicPutTps(this.brokerController.getBrokerStatsManager().tpsTopicPutNums(requestHeader.getTopic()));
18401841
byte[] body = topicStatsTable.encode();
18411842
response.setBody(body);
18421843
response.setCode(ResponseCode.SUCCESS);

remoting/src/main/java/org/apache/rocketmq/remoting/protocol/admin/TopicStatsTable.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
2323

2424
public class TopicStatsTable extends RemotingSerializable {
25+
private double topicPutTps;
26+
2527
private Map<MessageQueue, TopicOffset> offsetTable = new ConcurrentHashMap<>();
2628

2729
public Map<MessageQueue, TopicOffset> getOffsetTable() {
@@ -31,4 +33,12 @@ public Map<MessageQueue, TopicOffset> getOffsetTable() {
3133
public void setOffsetTable(Map<MessageQueue, TopicOffset> offsetTable) {
3234
this.offsetTable = offsetTable;
3335
}
36+
37+
public double getTopicPutTps() {
38+
return topicPutTps;
39+
}
40+
41+
public void setTopicPutTps(double topicPutTps) {
42+
this.topicPutTps = topicPutTps;
43+
}
3444
}

store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,10 @@ public void incSendBackNums(final String group, final String topic) {
595595
this.statsTable.get(Stats.SNDBCK_PUT_NUMS).addValue(statsKey, 1, 1);
596596
}
597597

598+
public double tpsTopicPutNums(final String topic) {
599+
return this.statsTable.get(TOPIC_PUT_NUMS).getStatsDataInMinute(topic).getTps();
600+
}
601+
598602
public double tpsGroupGetNums(final String group, final String topic) {
599603
final String statsKey = buildStatsKey(topic, group);
600604
return this.statsTable.get(Stats.GROUP_GET_NUMS).getStatsDataInMinute(statsKey).getTps();

tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,7 @@ public void run() {
367367
if (addr != null) {
368368
TopicStatsTable tst = mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic, timeoutMillis);
369369
topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable());
370+
topicStatsTable.setTopicPutTps(topicStatsTable.getTopicPutTps() + tst.getTopicPutTps());
370371
}
371372
} catch (Exception e) {
372373
logger.error("getTopicStatsInfo error. topic={}", topic, e);

tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ public void execute(final CommandLine commandLine, final Options options,
113113
humanTimestamp
114114
);
115115
}
116+
System.out.printf("%n");
117+
System.out.printf("Topic Put TPS: %s%n", topicStatsTable.getTopicPutTps());
116118
} catch (Exception e) {
117119
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
118120
} finally {

0 commit comments

Comments
 (0)