|
40 | 40 | import java.util.concurrent.locks.Lock; |
41 | 41 | import java.util.concurrent.locks.ReentrantLock; |
42 | 42 | import java.util.function.Function; |
| 43 | +import java.util.function.Supplier; |
43 | 44 | import java.util.stream.Collectors; |
44 | 45 | import org.apache.rocketmq.acl.AccessValidator; |
45 | 46 | import org.apache.rocketmq.acl.plain.PlainAccessValidator; |
@@ -1284,16 +1285,36 @@ public long headSlowTimeMills4AckThreadPoolQueue() { |
1284 | 1285 | return this.headSlowTimeMills(this.ackThreadPoolQueue); |
1285 | 1286 | } |
1286 | 1287 |
|
| 1288 | + public long headSlowTimeMills4EndTransactionThreadPoolQueue() { |
| 1289 | + return this.headSlowTimeMills(this.endTransactionThreadPoolQueue); |
| 1290 | + } |
| 1291 | + |
| 1292 | + public long headSlowTimeMills4ClientManagerThreadPoolQueue() { |
| 1293 | + return this.headSlowTimeMills(this.clientManagerThreadPoolQueue); |
| 1294 | + } |
| 1295 | + |
| 1296 | + public long headSlowTimeMills4HeartbeatThreadPoolQueue() { |
| 1297 | + return this.headSlowTimeMills(this.heartbeatThreadPoolQueue); |
| 1298 | + } |
| 1299 | + |
| 1300 | + public long headSlowTimeMills4AdminBrokerThreadPoolQueue() { |
| 1301 | + return this.headSlowTimeMills(this.adminBrokerThreadPoolQueue); |
| 1302 | + } |
| 1303 | + |
1287 | 1304 | public void printWaterMark() { |
1288 | | - LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue()); |
1289 | | - LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue()); |
1290 | | - LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills: {}", this.queryThreadPoolQueue.size(), headSlowTimeMills4QueryThreadPoolQueue()); |
1291 | | - LOG_WATER_MARK.info("[WATERMARK] Lite Pull Queue Size: {} SlowTimeMills: {}", this.litePullThreadPoolQueue.size(), headSlowTimeMills4LitePullThreadPoolQueue()); |
1292 | | - LOG_WATER_MARK.info("[WATERMARK] Transaction Queue Size: {} SlowTimeMills: {}", this.endTransactionThreadPoolQueue.size(), headSlowTimeMills(this.endTransactionThreadPoolQueue)); |
1293 | | - LOG_WATER_MARK.info("[WATERMARK] ClientManager Queue Size: {} SlowTimeMills: {}", this.clientManagerThreadPoolQueue.size(), this.headSlowTimeMills(this.clientManagerThreadPoolQueue)); |
1294 | | - LOG_WATER_MARK.info("[WATERMARK] Heartbeat Queue Size: {} SlowTimeMills: {}", this.heartbeatThreadPoolQueue.size(), this.headSlowTimeMills(this.heartbeatThreadPoolQueue)); |
1295 | | - LOG_WATER_MARK.info("[WATERMARK] Ack Queue Size: {} SlowTimeMills: {}", this.ackThreadPoolQueue.size(), headSlowTimeMills(this.ackThreadPoolQueue)); |
1296 | | - LOG_WATER_MARK.info("[WATERMARK] Admin Queue Size: {} SlowTimeMills: {}", this.adminBrokerThreadPoolQueue.size(), headSlowTimeMills(this.adminBrokerThreadPoolQueue)); |
| 1305 | + logWaterMarkQueueInfo("Send", this.sendThreadPoolQueue, this::headSlowTimeMills4SendThreadPoolQueue); |
| 1306 | + logWaterMarkQueueInfo("Pull", this.pullThreadPoolQueue, this::headSlowTimeMills4PullThreadPoolQueue); |
| 1307 | + logWaterMarkQueueInfo("Query", this.queryThreadPoolQueue, this::headSlowTimeMills4QueryThreadPoolQueue); |
| 1308 | + logWaterMarkQueueInfo("Lite Pull", this.litePullThreadPoolQueue, this::headSlowTimeMills4LitePullThreadPoolQueue); |
| 1309 | + logWaterMarkQueueInfo("Transaction", this.endTransactionThreadPoolQueue, this::headSlowTimeMills4EndTransactionThreadPoolQueue); |
| 1310 | + logWaterMarkQueueInfo("ClientManager", this.clientManagerThreadPoolQueue, this::headSlowTimeMills4ClientManagerThreadPoolQueue); |
| 1311 | + logWaterMarkQueueInfo("Heartbeat", this.heartbeatThreadPoolQueue, this::headSlowTimeMills4HeartbeatThreadPoolQueue); |
| 1312 | + logWaterMarkQueueInfo("Ack", this.ackThreadPoolQueue, this::headSlowTimeMills4AckThreadPoolQueue); |
| 1313 | + logWaterMarkQueueInfo("Admin", this.adminBrokerThreadPoolQueue, this::headSlowTimeMills4AdminBrokerThreadPoolQueue); |
| 1314 | + } |
| 1315 | + |
| 1316 | + private void logWaterMarkQueueInfo(String queueName, BlockingQueue<?> queue, Supplier<Long> slowTimeSupplier) { |
| 1317 | + LOG_WATER_MARK.info("[WATERMARK] {} Queue Size: {} SlowTimeMills: {}", queueName, queue.size(), slowTimeSupplier.get()); |
1297 | 1318 | } |
1298 | 1319 |
|
1299 | 1320 | public MessageStore getMessageStore() { |
|
0 commit comments