Skip to content

Commit 24ca9e4

Browse files
authored
[ISSUE #10043] Make TimerMessageReputService thread pool configurable and shutdown gracefully (#10044)
1 parent 9ad4a1b commit 24ca9e4

File tree

4 files changed

+77
-11
lines changed

4 files changed

+77
-11
lines changed

broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,11 @@ public void start() {
7676
private void initService() {
7777
this.transStatusService = new TransStatusCheckService();
7878
this.checkTranStatusTaskExecutor = ThreadUtils.newThreadPoolExecutor(
79-
2,
80-
5,
79+
brokerController.getBrokerConfig().getTransactionCheckRocksdbCoreThreads(),
80+
brokerController.getBrokerConfig().getTransactionCheckRocksdbMaxThreads(),
8181
100,
8282
TimeUnit.SECONDS,
83-
new ArrayBlockingQueue<>(2000),
83+
new ArrayBlockingQueue<>(brokerController.getBrokerConfig().getTransactionCheckRocksdbQueueCapacity()),
8484
new ThreadFactoryImpl("Transaction-rocksdb-msg-check-thread", brokerController.getBrokerIdentity()),
8585
new CallerRunsPolicy());
8686
}

common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,12 @@ public class BrokerConfig extends BrokerIdentity {
298298

299299
private long transactionMetricFlushInterval = 10 * 1000;
300300

301+
private int transactionCheckRocksdbCoreThreads = 2;
302+
303+
private int transactionCheckRocksdbMaxThreads = 5;
304+
305+
private int transactionCheckRocksdbQueueCapacity = 2000;
306+
301307
/**
302308
* transaction batch op message
303309
*/
@@ -2106,6 +2112,30 @@ public void setTransactionMetricFlushInterval(long transactionMetricFlushInterva
21062112
this.transactionMetricFlushInterval = transactionMetricFlushInterval;
21072113
}
21082114

2115+
public void setTransactionCheckRocksdbCoreThreads(int transactionCheckRocksdbCoreThreads) {
2116+
this.transactionCheckRocksdbCoreThreads = transactionCheckRocksdbCoreThreads;
2117+
}
2118+
2119+
public int getTransactionCheckRocksdbCoreThreads() {
2120+
return transactionCheckRocksdbCoreThreads;
2121+
}
2122+
2123+
public int getTransactionCheckRocksdbMaxThreads() {
2124+
return transactionCheckRocksdbMaxThreads;
2125+
}
2126+
2127+
public void setTransactionCheckRocksdbMaxThreads(int transactionCheckRocksdbMaxThreads) {
2128+
this.transactionCheckRocksdbMaxThreads = transactionCheckRocksdbMaxThreads;
2129+
}
2130+
2131+
public int getTransactionCheckRocksdbQueueCapacity() {
2132+
return transactionCheckRocksdbQueueCapacity;
2133+
}
2134+
2135+
public void setTransactionCheckRocksdbQueueCapacity(int transactionCheckRocksdbQueueCapacity) {
2136+
this.transactionCheckRocksdbQueueCapacity = transactionCheckRocksdbQueueCapacity;
2137+
}
2138+
21092139
public long getPopInflightMessageThreshold() {
21102140
return popInflightMessageThreshold;
21112141
}

store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,9 @@ public class MessageStoreConfig {
116116
private int timerRocksDBRollRangeHours = 2;
117117
private boolean timerRecallToTimeWheelEnable = true;
118118
private boolean timerRecallToTimelineEnable = true;
119+
private int timerReputServiceCorePoolSize = 6;
120+
private int timerReputServiceMaxPoolSize = 6;
121+
private int timerReputServiceQueueCapacity = 10000;
119122

120123
private boolean transRocksDBEnable = false;
121124
private boolean transWriteOriginTransHalfEnable = true;
@@ -2227,6 +2230,30 @@ public void setTimerRecallToTimelineEnable(boolean timerRecallToTimelineEnable)
22272230
this.timerRecallToTimelineEnable = timerRecallToTimelineEnable;
22282231
}
22292232

2233+
public void setTimerReputServiceCorePoolSize(int timerReputServiceCorePoolSize) {
2234+
this.timerReputServiceCorePoolSize = timerReputServiceCorePoolSize;
2235+
}
2236+
2237+
public int getTimerReputServiceCorePoolSize() {
2238+
return timerReputServiceCorePoolSize;
2239+
}
2240+
2241+
public void setTimerReputServiceMaxPoolSize(int timerReputServiceMaxPoolSize) {
2242+
this.timerReputServiceMaxPoolSize = timerReputServiceMaxPoolSize;
2243+
}
2244+
2245+
public int getTimerReputServiceMaxPoolSize() {
2246+
return timerReputServiceMaxPoolSize;
2247+
}
2248+
2249+
public void setTimerReputServiceQueueCapacity(int timerReputServiceQueueCapacity) {
2250+
this.timerReputServiceQueueCapacity = timerReputServiceQueueCapacity;
2251+
}
2252+
2253+
public int getTimerReputServiceQueueCapacity() {
2254+
return timerReputServiceQueueCapacity;
2255+
}
2256+
22302257
public int getTimerRocksDBRollIntervalHours() {
22312258
return timerRocksDBRollIntervalHours;
22322259
}

store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStore.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.rocketmq.common.message.MessageDecoder;
4343
import org.apache.rocketmq.common.message.MessageExt;
4444
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
45+
import org.apache.rocketmq.common.utils.ThreadUtils;
4546
import org.apache.rocketmq.logging.org.slf4j.Logger;
4647
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
4748
import org.apache.rocketmq.store.DefaultMessageStore;
@@ -506,14 +507,16 @@ private class TimerMessageReputService extends ServiceThread {
506507
private final BlockingQueue<List<TimerRocksDBRecord>> queue;
507508
private final RateLimiter rateLimiter;
508509
private final boolean writeCheckPoint;
509-
ExecutorService executor = new ThreadPoolExecutor(
510-
6,
511-
6,
512-
60,
513-
TimeUnit.SECONDS,
514-
new LinkedBlockingQueue<>(10000),
515-
new ThreadPoolExecutor.CallerRunsPolicy()
516-
);
510+
private final ExecutorService executor =
511+
ThreadUtils.newThreadPoolExecutor(
512+
storeConfig.getTimerReputServiceCorePoolSize(),
513+
storeConfig.getTimerReputServiceMaxPoolSize(),
514+
60L,
515+
TimeUnit.SECONDS,
516+
new LinkedBlockingQueue<>(storeConfig.getTimerReputServiceQueueCapacity()),
517+
ThreadUtils.newGenericThreadFactory("TimerMessageReputService", false),
518+
new ThreadPoolExecutor.CallerRunsPolicy()
519+
);
517520

518521
public TimerMessageReputService(BlockingQueue<List<TimerRocksDBRecord>> queue, double maxTps, boolean writeCheckPoint) {
519522
this.queue = queue;
@@ -614,6 +617,12 @@ public Void call() throws Exception {
614617
return null;
615618
}
616619
}
620+
621+
@Override
622+
public void shutdown() {
623+
super.shutdown();
624+
ThreadUtils.shutdownGracefully(executor, 5, TimeUnit.SECONDS);
625+
}
617626
}
618627

619628
}

0 commit comments

Comments
 (0)