Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ public void start() {
private void initService() {
this.transStatusService = new TransStatusCheckService();
this.checkTranStatusTaskExecutor = ThreadUtils.newThreadPoolExecutor(
2,
5,
brokerController.getBrokerConfig().getTransactionCheckRocksdbCoreThreads(),
brokerController.getBrokerConfig().getTransactionCheckRocksdbMaxThreads(),
100,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2000),
new ArrayBlockingQueue<>(brokerController.getBrokerConfig().getTransactionCheckRocksdbQueueCapacity()),
new ThreadFactoryImpl("Transaction-rocksdb-msg-check-thread", brokerController.getBrokerIdentity()),
new CallerRunsPolicy());
}
Expand Down
30 changes: 30 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,12 @@ public class BrokerConfig extends BrokerIdentity {

private long transactionMetricFlushInterval = 10 * 1000;

private int transactionCheckRocksdbCoreThreads = 2;

private int transactionCheckRocksdbMaxThreads = 5;

private int transactionCheckRocksdbQueueCapacity = 2000;

/**
* transaction batch op message
*/
Expand Down Expand Up @@ -2106,6 +2112,30 @@ public void setTransactionMetricFlushInterval(long transactionMetricFlushInterva
this.transactionMetricFlushInterval = transactionMetricFlushInterval;
}

public void setTransactionCheckRocksdbCoreThreads(int transactionCheckRocksdbCoreThreads) {
this.transactionCheckRocksdbCoreThreads = transactionCheckRocksdbCoreThreads;
}

public int getTransactionCheckRocksdbCoreThreads() {
return transactionCheckRocksdbCoreThreads;
}

public int getTransactionCheckRocksdbMaxThreads() {
return transactionCheckRocksdbMaxThreads;
}

public void setTransactionCheckRocksdbMaxThreads(int transactionCheckRocksdbMaxThreads) {
this.transactionCheckRocksdbMaxThreads = transactionCheckRocksdbMaxThreads;
}

public int getTransactionCheckRocksdbQueueCapacity() {
return transactionCheckRocksdbQueueCapacity;
}

public void setTransactionCheckRocksdbQueueCapacity(int transactionCheckRocksdbQueueCapacity) {
this.transactionCheckRocksdbQueueCapacity = transactionCheckRocksdbQueueCapacity;
}

public long getPopInflightMessageThreshold() {
return popInflightMessageThreshold;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ public class MessageStoreConfig {
private int timerRocksDBRollRangeHours = 2;
private boolean timerRecallToTimeWheelEnable = true;
private boolean timerRecallToTimelineEnable = true;
private int timerReputServiceCorePoolSize = 6;
private int timerReputServiceMaxPoolSize = 6;
private int timerReputServiceQueueCapacity = 10000;

private boolean transRocksDBEnable = false;
private boolean transWriteOriginTransHalfEnable = true;
Expand Down Expand Up @@ -2227,6 +2230,30 @@ public void setTimerRecallToTimelineEnable(boolean timerRecallToTimelineEnable)
this.timerRecallToTimelineEnable = timerRecallToTimelineEnable;
}

public void setTimerReputServiceCorePoolSize(int timerReputServiceCorePoolSize) {
this.timerReputServiceCorePoolSize = timerReputServiceCorePoolSize;
}

public int getTimerReputServiceCorePoolSize() {
return timerReputServiceCorePoolSize;
}

public void setTimerReputServiceMaxPoolSize(int timerReputServiceMaxPoolSize) {
this.timerReputServiceMaxPoolSize = timerReputServiceMaxPoolSize;
}

public int getTimerReputServiceMaxPoolSize() {
return timerReputServiceMaxPoolSize;
}

public void setTimerReputServiceQueueCapacity(int timerReputServiceQueueCapacity) {
this.timerReputServiceQueueCapacity = timerReputServiceQueueCapacity;
}

public int getTimerReputServiceQueueCapacity() {
return timerReputServiceQueueCapacity;
}

public int getTimerRocksDBRollIntervalHours() {
return timerRocksDBRollIntervalHours;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.DefaultMessageStore;
Expand Down Expand Up @@ -506,14 +507,16 @@ private class TimerMessageReputService extends ServiceThread {
private final BlockingQueue<List<TimerRocksDBRecord>> queue;
private final RateLimiter rateLimiter;
private final boolean writeCheckPoint;
ExecutorService executor = new ThreadPoolExecutor(
6,
6,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10000),
new ThreadPoolExecutor.CallerRunsPolicy()
);
private final ExecutorService executor =
ThreadUtils.newThreadPoolExecutor(
storeConfig.getTimerReputServiceCorePoolSize(),
storeConfig.getTimerReputServiceMaxPoolSize(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(storeConfig.getTimerReputServiceQueueCapacity()),
ThreadUtils.newGenericThreadFactory("TimerMessageReputService", false),
new ThreadPoolExecutor.CallerRunsPolicy()
);

public TimerMessageReputService(BlockingQueue<List<TimerRocksDBRecord>> queue, double maxTps, boolean writeCheckPoint) {
this.queue = queue;
Expand Down Expand Up @@ -614,6 +617,12 @@ public Void call() throws Exception {
return null;
}
}

@Override
public void shutdown() {
super.shutdown();
ThreadUtils.shutdownGracefully(executor, 5, TimeUnit.SECONDS);
}
}

}
Loading