diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 4ff4bed814d..4200f34bdee 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -1837,6 +1837,7 @@ private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx, topicStatsTable.getOffsetTable().put(mq, topicOffset); } + topicStatsTable.setTopicPutTps(this.brokerController.getBrokerStatsManager().tpsTopicPutNums(requestHeader.getTopic())); byte[] body = topicStatsTable.encode(); response.setBody(body); response.setCode(ResponseCode.SUCCESS); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/admin/TopicStatsTable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/admin/TopicStatsTable.java index 9f467e7449e..5cb2af6373f 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/admin/TopicStatsTable.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/admin/TopicStatsTable.java @@ -22,6 +22,8 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class TopicStatsTable extends RemotingSerializable { + private double topicPutTps; + private Map offsetTable = new ConcurrentHashMap<>(); public Map getOffsetTable() { @@ -31,4 +33,12 @@ public Map getOffsetTable() { public void setOffsetTable(Map offsetTable) { this.offsetTable = offsetTable; } + + public double getTopicPutTps() { + return topicPutTps; + } + + public void setTopicPutTps(double topicPutTps) { + this.topicPutTps = topicPutTps; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java index a6c33f61311..f2b94422c35 100644 --- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java +++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java @@ -581,6 +581,10 @@ public void incSendBackNums(final String group, final String topic) { this.statsTable.get(Stats.SNDBCK_PUT_NUMS).addValue(statsKey, 1, 1); } + public double tpsTopicPutNums(final String topic) { + return this.statsTable.get(TOPIC_PUT_NUMS).getStatsDataInMinute(topic).getTps(); + } + public double tpsGroupGetNums(final String group, final String topic) { final String statsKey = buildStatsKey(topic, group); return this.statsTable.get(Stats.GROUP_GET_NUMS).getStatsDataInMinute(statsKey).getTps(); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 9afd37f7840..e6405cb2d90 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -367,6 +367,7 @@ public void run() { if (addr != null) { TopicStatsTable tst = mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic, timeoutMillis); topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable()); + topicStatsTable.setTopicPutTps(topicStatsTable.getTopicPutTps() + tst.getTopicPutTps()); } } catch (Exception e) { logger.error("getTopicStatsInfo error. topic={}", topic, e); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java index 47ca761d1f6..ff91399f1c1 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java @@ -113,6 +113,8 @@ public void execute(final CommandLine commandLine, final Options options, humanTimestamp ); } + System.out.printf("%n"); + System.out.printf("Topic Put TPS: %s%n", topicStatsTable.getTopicPutTps()); } catch (Exception e) { throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); } finally {