Skip to content

Commit 35c88e3

Browse files
authored
[ISSUE #10225] Add RejectedExecutionHandler support for ThreadPoolMonitor (#10222)
1 parent b14f926 commit 35c88e3

1 file changed

Lines changed: 69 additions & 32 deletions

File tree

common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java

Lines changed: 69 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@
1919

2020
import com.google.common.collect.Lists;
2121
import com.google.common.util.concurrent.ThreadFactoryBuilder;
22+
2223
import java.util.Collections;
2324
import java.util.List;
2425
import java.util.concurrent.CopyOnWriteArrayList;
2526
import java.util.concurrent.LinkedBlockingQueue;
27+
import java.util.concurrent.RejectedExecutionHandler;
2628
import java.util.concurrent.ScheduledExecutorService;
2729
import java.util.concurrent.ThreadPoolExecutor;
2830
import java.util.concurrent.TimeUnit;
31+
2932
import org.apache.rocketmq.common.UtilAll;
3033
import org.apache.rocketmq.common.utils.ThreadUtils;
3134
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -37,7 +40,7 @@ public class ThreadPoolMonitor {
3740

3841
private static final List<ThreadPoolWrapper> MONITOR_EXECUTOR = new CopyOnWriteArrayList<>();
3942
private static final ScheduledExecutorService MONITOR_SCHEDULED = ThreadUtils.newSingleThreadScheduledExecutor(
40-
new ThreadFactoryBuilder().setNameFormat("ThreadPoolMonitor-%d").build()
43+
new ThreadFactoryBuilder().setNameFormat("ThreadPoolMonitor-%d").build()
4144
);
4245

4346
private static volatile long threadPoolStatusPeriodTime = TimeUnit.SECONDS.toMillis(3);
@@ -55,48 +58,82 @@ public static void config(Logger jstackLoggerConfig, Logger waterMarkLoggerConfi
5558
}
5659

5760
public static ThreadPoolExecutor createAndMonitor(int corePoolSize,
58-
int maximumPoolSize,
59-
long keepAliveTime,
60-
TimeUnit unit,
61-
String name,
62-
int queueCapacity) {
61+
int maximumPoolSize,
62+
long keepAliveTime,
63+
TimeUnit unit,
64+
String name,
65+
int queueCapacity) {
6366
return createAndMonitor(corePoolSize, maximumPoolSize, keepAliveTime, unit, name, queueCapacity, Collections.emptyList());
6467
}
6568

6669
public static ThreadPoolExecutor createAndMonitor(int corePoolSize,
67-
int maximumPoolSize,
68-
long keepAliveTime,
69-
TimeUnit unit,
70-
String name,
71-
int queueCapacity,
72-
ThreadPoolStatusMonitor... threadPoolStatusMonitors) {
70+
int maximumPoolSize,
71+
long keepAliveTime,
72+
TimeUnit unit,
73+
String name,
74+
int queueCapacity,
75+
RejectedExecutionHandler handler) {
76+
return createAndMonitor(corePoolSize, maximumPoolSize, keepAliveTime, unit, name, queueCapacity, handler, Collections.emptyList());
77+
}
78+
79+
public static ThreadPoolExecutor createAndMonitor(int corePoolSize,
80+
int maximumPoolSize,
81+
long keepAliveTime,
82+
TimeUnit unit,
83+
String name,
84+
int queueCapacity,
85+
ThreadPoolStatusMonitor... threadPoolStatusMonitors) {
86+
return createAndMonitor(corePoolSize, maximumPoolSize, keepAliveTime, unit, name, queueCapacity,
87+
Lists.newArrayList(threadPoolStatusMonitors));
88+
}
89+
90+
public static ThreadPoolExecutor createAndMonitor(int corePoolSize,
91+
int maximumPoolSize,
92+
long keepAliveTime,
93+
TimeUnit unit,
94+
String name,
95+
int queueCapacity,
96+
RejectedExecutionHandler handler,
97+
ThreadPoolStatusMonitor... threadPoolStatusMonitors) {
98+
return createAndMonitor(corePoolSize, maximumPoolSize, keepAliveTime, unit, name, queueCapacity, handler,
99+
Lists.newArrayList(threadPoolStatusMonitors));
100+
}
101+
102+
public static ThreadPoolExecutor createAndMonitor(int corePoolSize,
103+
int maximumPoolSize,
104+
long keepAliveTime,
105+
TimeUnit unit,
106+
String name,
107+
int queueCapacity,
108+
List<ThreadPoolStatusMonitor> threadPoolStatusMonitors) {
73109
return createAndMonitor(corePoolSize, maximumPoolSize, keepAliveTime, unit, name, queueCapacity,
74-
Lists.newArrayList(threadPoolStatusMonitors));
110+
new ThreadPoolExecutor.DiscardOldestPolicy(), threadPoolStatusMonitors);
75111
}
76112

77113
public static ThreadPoolExecutor createAndMonitor(int corePoolSize,
78-
int maximumPoolSize,
79-
long keepAliveTime,
80-
TimeUnit unit,
81-
String name,
82-
int queueCapacity,
83-
List<ThreadPoolStatusMonitor> threadPoolStatusMonitors) {
114+
int maximumPoolSize,
115+
long keepAliveTime,
116+
TimeUnit unit,
117+
String name,
118+
int queueCapacity,
119+
RejectedExecutionHandler handler,
120+
List<ThreadPoolStatusMonitor> threadPoolStatusMonitors) {
84121
ThreadPoolExecutor executor = (ThreadPoolExecutor) ThreadUtils.newThreadPoolExecutor(
85-
corePoolSize,
86-
maximumPoolSize,
87-
keepAliveTime,
88-
unit,
89-
new LinkedBlockingQueue<>(queueCapacity),
90-
new ThreadFactoryBuilder().setNameFormat(name + "-%d").build(),
91-
new ThreadPoolExecutor.DiscardOldestPolicy());
122+
corePoolSize,
123+
maximumPoolSize,
124+
keepAliveTime,
125+
unit,
126+
new LinkedBlockingQueue<>(queueCapacity),
127+
new ThreadFactoryBuilder().setNameFormat(name + "-%d").build(),
128+
handler);
92129
List<ThreadPoolStatusMonitor> printers = Lists.newArrayList(new ThreadPoolQueueSizeMonitor(queueCapacity));
93130
printers.addAll(threadPoolStatusMonitors);
94131

95132
MONITOR_EXECUTOR.add(ThreadPoolWrapper.builder()
96-
.name(name)
97-
.threadPoolExecutor(executor)
98-
.statusPrinters(printers)
99-
.build());
133+
.name(name)
134+
.threadPoolExecutor(executor)
135+
.statusPrinters(printers)
136+
.build());
100137
return executor;
101138
}
102139

@@ -110,7 +147,7 @@ public static void logThreadPoolStatus() {
110147
waterMarkLogger.info("{}{}{}", nameFormatted, descFormatted, value);
111148
if (enablePrintJstack) {
112149
if (monitor.needPrintJstack(threadPoolWrapper.getThreadPoolExecutor(), value) &&
113-
System.currentTimeMillis() - jstackTime > jstackPeriodTime) {
150+
System.currentTimeMillis() - jstackTime > jstackPeriodTime) {
114151
jstackTime = System.currentTimeMillis();
115152
jstackLogger.warn("jstack start\n{}", UtilAll.jstack());
116153
}
@@ -121,7 +158,7 @@ public static void logThreadPoolStatus() {
121158

122159
public static void init() {
123160
MONITOR_SCHEDULED.scheduleAtFixedRate(ThreadPoolMonitor::logThreadPoolStatus, 20,
124-
threadPoolStatusPeriodTime, TimeUnit.MILLISECONDS);
161+
threadPoolStatusPeriodTime, TimeUnit.MILLISECONDS);
125162
}
126163

127164
public static void shutdown() {

0 commit comments

Comments
 (0)