From 477d663c1d48ac86bf65560e9669f2fef9042c1d Mon Sep 17 00:00:00 2001 From: Kris20030907 <3185633428@qq.com> Date: Thu, 22 Jan 2026 17:28:09 +0800 Subject: [PATCH] feat: make TimerMessageReputService thread pool configurable and shutdown gracefully. --- .../TransactionalMessageRocksDBService.java | 6 ++-- .../apache/rocketmq/common/BrokerConfig.java | 30 +++++++++++++++++++ .../store/config/MessageStoreConfig.java | 27 +++++++++++++++++ .../rocksdb/TimerMessageRocksDBStore.java | 25 +++++++++++----- 4 files changed, 77 insertions(+), 11 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java index 1fc38eb3d6d..dbd3575d69c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java @@ -76,11 +76,11 @@ public void start() { private void initService() { this.transStatusService = new TransStatusCheckService(); this.checkTranStatusTaskExecutor = ThreadUtils.newThreadPoolExecutor( - 2, - 5, + brokerController.getBrokerConfig().getTransactionCheckRocksdbCoreThreads(), + brokerController.getBrokerConfig().getTransactionCheckRocksdbMaxThreads(), 100, TimeUnit.SECONDS, - new ArrayBlockingQueue<>(2000), + new ArrayBlockingQueue<>(brokerController.getBrokerConfig().getTransactionCheckRocksdbQueueCapacity()), new ThreadFactoryImpl("Transaction-rocksdb-msg-check-thread", brokerController.getBrokerIdentity()), new CallerRunsPolicy()); } diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index e9c588e9d1b..caee5e45f26 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -298,6 +298,12 @@ public class BrokerConfig extends BrokerIdentity { private long transactionMetricFlushInterval = 10 * 1000; + private int transactionCheckRocksdbCoreThreads = 2; + + private int transactionCheckRocksdbMaxThreads = 5; + + private int transactionCheckRocksdbQueueCapacity = 2000; + /** * transaction batch op message */ @@ -2106,6 +2112,30 @@ public void setTransactionMetricFlushInterval(long transactionMetricFlushInterva this.transactionMetricFlushInterval = transactionMetricFlushInterval; } + public void setTransactionCheckRocksdbCoreThreads(int transactionCheckRocksdbCoreThreads) { + this.transactionCheckRocksdbCoreThreads = transactionCheckRocksdbCoreThreads; + } + + public int getTransactionCheckRocksdbCoreThreads() { + return transactionCheckRocksdbCoreThreads; + } + + public int getTransactionCheckRocksdbMaxThreads() { + return transactionCheckRocksdbMaxThreads; + } + + public void setTransactionCheckRocksdbMaxThreads(int transactionCheckRocksdbMaxThreads) { + this.transactionCheckRocksdbMaxThreads = transactionCheckRocksdbMaxThreads; + } + + public int getTransactionCheckRocksdbQueueCapacity() { + return transactionCheckRocksdbQueueCapacity; + } + + public void setTransactionCheckRocksdbQueueCapacity(int transactionCheckRocksdbQueueCapacity) { + this.transactionCheckRocksdbQueueCapacity = transactionCheckRocksdbQueueCapacity; + } + public long getPopInflightMessageThreshold() { return popInflightMessageThreshold; } diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index d7f17efd64a..ffc261aa178 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -116,6 +116,9 @@ public class MessageStoreConfig { private int timerRocksDBRollRangeHours = 2; private boolean timerRecallToTimeWheelEnable = true; private boolean timerRecallToTimelineEnable = true; + private int timerReputServiceCorePoolSize = 6; + private int timerReputServiceMaxPoolSize = 6; + private int timerReputServiceQueueCapacity = 10000; private boolean transRocksDBEnable = false; private boolean transWriteOriginTransHalfEnable = true; @@ -2227,6 +2230,30 @@ public void setTimerRecallToTimelineEnable(boolean timerRecallToTimelineEnable) this.timerRecallToTimelineEnable = timerRecallToTimelineEnable; } + public void setTimerReputServiceCorePoolSize(int timerReputServiceCorePoolSize) { + this.timerReputServiceCorePoolSize = timerReputServiceCorePoolSize; + } + + public int getTimerReputServiceCorePoolSize() { + return timerReputServiceCorePoolSize; + } + + public void setTimerReputServiceMaxPoolSize(int timerReputServiceMaxPoolSize) { + this.timerReputServiceMaxPoolSize = timerReputServiceMaxPoolSize; + } + + public int getTimerReputServiceMaxPoolSize() { + return timerReputServiceMaxPoolSize; + } + + public void setTimerReputServiceQueueCapacity(int timerReputServiceQueueCapacity) { + this.timerReputServiceQueueCapacity = timerReputServiceQueueCapacity; + } + + public int getTimerReputServiceQueueCapacity() { + return timerReputServiceQueueCapacity; + } + public int getTimerRocksDBRollIntervalHours() { return timerRocksDBRollIntervalHours; } diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStore.java index ec13971d922..c48e177c9d2 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStore.java @@ -42,6 +42,7 @@ import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBrokerInner; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.store.DefaultMessageStore; @@ -506,14 +507,16 @@ private class TimerMessageReputService extends ServiceThread { private final BlockingQueue> queue; private final RateLimiter rateLimiter; private final boolean writeCheckPoint; - ExecutorService executor = new ThreadPoolExecutor( - 6, - 6, - 60, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(10000), - new ThreadPoolExecutor.CallerRunsPolicy() - ); + private final ExecutorService executor = + ThreadUtils.newThreadPoolExecutor( + storeConfig.getTimerReputServiceCorePoolSize(), + storeConfig.getTimerReputServiceMaxPoolSize(), + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(storeConfig.getTimerReputServiceQueueCapacity()), + ThreadUtils.newGenericThreadFactory("TimerMessageReputService", false), + new ThreadPoolExecutor.CallerRunsPolicy() + ); public TimerMessageReputService(BlockingQueue> queue, double maxTps, boolean writeCheckPoint) { this.queue = queue; @@ -614,6 +617,12 @@ public Void call() throws Exception { return null; } } + + @Override + public void shutdown() { + super.shutdown(); + ThreadUtils.shutdownGracefully(executor, 5, TimeUnit.SECONDS); + } } }