Skip to content

Commit a955ce4

Browse files
committed
[ISSUE #8340] RuntimeInfo and ClusterListSubCommand show ackThreadPoolQueueSize and ackThreadPoolQueueHeadWaitTimeMills (#8339)
* RuntimeInfo and ClusterListSubCommand show ackThreadPoolQueueSize and ackThreadPoolQueueHeadWaitTimeMills * RuntimeInfo and ClusterListSubCommand show ackThreadPoolQueueSize and ackThreadPoolQueueHeadWaitTimeMills
1 parent e0580bc commit a955ce4

3 files changed

Lines changed: 18 additions & 7 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1280,6 +1280,10 @@ public long headSlowTimeMills4QueryThreadPoolQueue() {
12801280
return this.headSlowTimeMills(this.queryThreadPoolQueue);
12811281
}
12821282

1283+
public long headSlowTimeMills4AckThreadPoolQueue() {
1284+
return this.headSlowTimeMills(this.ackThreadPoolQueue);
1285+
}
1286+
12831287
public void printWaterMark() {
12841288
LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue());
12851289
LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue());

broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2801,10 +2801,15 @@ private HashMap<String, String> prepareRuntimeInfo() throws RemotingCommandExcep
28012801
runtimeInfo.put("queryThreadPoolQueueCapacity",
28022802
String.valueOf(this.brokerController.getBrokerConfig().getQueryThreadPoolQueueCapacity()));
28032803

2804+
runtimeInfo.put("ackThreadPoolQueueSize", String.valueOf(this.brokerController.getAckThreadPoolQueue().size()));
2805+
runtimeInfo.put("ackThreadPoolQueueCapacity",
2806+
String.valueOf(this.brokerController.getBrokerConfig().getAckThreadPoolQueueCapacity()));
2807+
28042808
runtimeInfo.put("sendThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4SendThreadPoolQueue()));
28052809
runtimeInfo.put("pullThreadPoolQueueHeadWaitTimeMills", String.valueOf(brokerController.headSlowTimeMills4PullThreadPoolQueue()));
28062810
runtimeInfo.put("queryThreadPoolQueueHeadWaitTimeMills", String.valueOf(this.brokerController.headSlowTimeMills4QueryThreadPoolQueue()));
28072811
runtimeInfo.put("litePullThreadPoolQueueHeadWaitTimeMills", String.valueOf(brokerController.headSlowTimeMills4LitePullThreadPoolQueue()));
2812+
runtimeInfo.put("ackThreadPoolQueueHeadWaitTimeMills", String.valueOf(brokerController.headSlowTimeMills4AckThreadPoolQueue()));
28082813

28092814
runtimeInfo.put("EndTransactionQueueSize", String.valueOf(this.brokerController.getEndTransactionThreadPoolQueue().size()));
28102815
runtimeInfo.put("EndTransactionThreadPoolQueueCapacity",

tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -177,9 +177,9 @@ private void printClusterMoreStats(final Set<String> clusterNames,
177177
}
178178

179179
private void printClusterBaseInfo(final Set<String> clusterNames,
180-
final DefaultMQAdminExt defaultMQAdminExt,
181-
final ClusterInfo clusterInfo) {
182-
System.out.printf("%-22s %-22s %-4s %-22s %-16s %16s %16s %-22s %-11s %-12s %-8s %-10s%n",
180+
final DefaultMQAdminExt defaultMQAdminExt,
181+
final ClusterInfo clusterInfo) {
182+
System.out.printf("%-22s %-22s %-4s %-22s %-16s %16s %30s %-22s %-11s %-12s %-8s %-10s%n",
183183
"#Cluster Name",
184184
"#Broker Name",
185185
"#BID",
@@ -212,8 +212,10 @@ private void printClusterBaseInfo(final Set<String> clusterNames,
212212
String version = "";
213213
String sendThreadPoolQueueSize = "";
214214
String pullThreadPoolQueueSize = "";
215+
String ackThreadPoolQueueSize = "";
215216
String sendThreadPoolQueueHeadWaitTimeMills = "";
216217
String pullThreadPoolQueueHeadWaitTimeMills = "";
218+
String ackThreadPoolQueueHeadWaitTimeMills = "";
217219
String pageCacheLockTimeMills = "";
218220
String earliestMessageTimeStamp = "";
219221
String commitLogDiskRatio = "";
@@ -228,14 +230,14 @@ private void printClusterBaseInfo(final Set<String> clusterNames,
228230
isBrokerActive = Boolean.parseBoolean(kvTable.getTable().get("brokerActive"));
229231
String putTps = kvTable.getTable().get("putTps");
230232
String getTransferredTps = kvTable.getTable().get("getTransferredTps");
231-
sendThreadPoolQueueSize = kvTable.getTable().get("sendThreadPoolQueueSize");
232-
pullThreadPoolQueueSize = kvTable.getTable().get("pullThreadPoolQueueSize");
233233

234234
sendThreadPoolQueueSize = kvTable.getTable().get("sendThreadPoolQueueSize");
235235
pullThreadPoolQueueSize = kvTable.getTable().get("pullThreadPoolQueueSize");
236+
ackThreadPoolQueueSize = kvTable.getTable().getOrDefault("ackThreadPoolQueueSize", "N");
236237

237238
sendThreadPoolQueueHeadWaitTimeMills = kvTable.getTable().get("sendThreadPoolQueueHeadWaitTimeMills");
238239
pullThreadPoolQueueHeadWaitTimeMills = kvTable.getTable().get("pullThreadPoolQueueHeadWaitTimeMills");
240+
ackThreadPoolQueueHeadWaitTimeMills = kvTable.getTable().getOrDefault("ackThreadPoolQueueHeadWaitTimeMills", "N");
239241
pageCacheLockTimeMills = kvTable.getTable().get("pageCacheLockTimeMills");
240242
earliestMessageTimeStamp = kvTable.getTable().get("earliestMessageTimeStamp");
241243
commitLogDiskRatio = kvTable.getTable().get("commitLogDiskRatio");
@@ -280,14 +282,14 @@ private void printClusterBaseInfo(final Set<String> clusterNames,
280282
space = Double.parseDouble(commitLogDiskRatio);
281283
}
282284

283-
System.out.printf("%-22s %-22s %-4s %-22s %-16s %16s %16s %-22s %11s %-12s %-8s %10s%n",
285+
System.out.printf("%-22s %-22s %-4s %-22s %-16s %16s %30s %-22s %11s %-12s %-8s %10s%n",
284286
clusterName,
285287
brokerName,
286288
next1.getKey(),
287289
next1.getValue(),
288290
version,
289291
String.format("%9.2f(%s,%sms)", in, sendThreadPoolQueueSize, sendThreadPoolQueueHeadWaitTimeMills),
290-
String.format("%9.2f(%s,%sms)", out, pullThreadPoolQueueSize, pullThreadPoolQueueHeadWaitTimeMills),
292+
String.format("%9.2f(%s,%sms|%s,%sms)", out, pullThreadPoolQueueSize, pullThreadPoolQueueHeadWaitTimeMills, ackThreadPoolQueueSize, ackThreadPoolQueueHeadWaitTimeMills),
291293
String.format("%d-%d(%.1fw, %.1f, %.1f)", timerReadBehind, timerOffsetBehind, timerCongestNum / 10000.0f, timerEnqueueTps, timerDequeueTps),
292294
pageCacheLockTimeMills,
293295
String.format("%2.2f", hour),

0 commit comments

Comments
 (0)