Skip to content

Commit 73e8fdb

Browse files
authored
[ISSUE #9699] Optimize shutdown process and resource management (#9700)
* Optimize shutdown process and resource management - Improve BrokerController shutdown flow for graceful shutdown - Optimize BrokerStartup startup and shutdown logic - Enhance ClientHousekeepingService resource cleanup - Improve shutdown handling for various processors - Optimize resource management in storage layer components - Enhance lifecycle management for statistics manager - Improve shutdown flow for timer components * Fix this.popMessageProcessor.getPopLongPollingService() not shutdown * Fix test shutdown state transition issue - Add proper null checks and exception handling in test cleanup - Prevent IllegalStateException during test teardown - Ensure graceful test cleanup without state conflicts * Fix DefaultMessageStoreCleanFilesTest can not pass * Fix CombineConsumeQueueStoreTest can not pass * Polish the code * Polish the code * Ignore flaky test first
1 parent 93f60db commit 73e8fdb

30 files changed

Lines changed: 356 additions & 116 deletions

broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1460,6 +1460,18 @@ protected void shutdownBasicService() {
14601460
this.transactionalMessageService.close();
14611461
}
14621462

1463+
if (this.transactionalMessageCheckListener != null) {
1464+
this.transactionalMessageCheckListener.shutdown();
1465+
}
1466+
1467+
if (transactionalMessageCheckService != null) {
1468+
this.transactionalMessageCheckService.shutdown();
1469+
}
1470+
1471+
if (transactionMetricsFlushService != null) {
1472+
this.transactionMetricsFlushService.shutdown();
1473+
}
1474+
14631475
if (this.notificationProcessor != null) {
14641476
this.notificationProcessor.getPopLongPollingService().shutdown();
14651477
}
@@ -1483,10 +1495,6 @@ protected void shutdownBasicService() {
14831495
this.broadcastOffsetManager.shutdown();
14841496
}
14851497

1486-
if (this.messageStore != null) {
1487-
this.messageStore.shutdown();
1488-
}
1489-
14901498
if (this.replicasManager != null) {
14911499
this.replicasManager.shutdown();
14921500
}
@@ -1626,6 +1634,10 @@ protected void shutdownBasicService() {
16261634
brokerAttachedPlugin.shutdown();
16271635
}
16281636
}
1637+
1638+
if (this.messageStore != null) {
1639+
this.messageStore.shutdown();
1640+
}
16291641
}
16301642

16311643
public void shutdown() {

broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,14 @@ public static ConfigContext parseCmdLine(String[] args) throws Exception {
8787
System.exit(-1);
8888
}
8989

90-
ConfigContext configContext = null;
91-
String filePath;
90+
ConfigContext configContext;
91+
String filePath = null;
9292
if (commandLine.hasOption('c')) {
9393
filePath = commandLine.getOptionValue('c');
94-
configContext = configFileToConfigContext(filePath);
9594
}
9695

96+
configContext = configFileToConfigContext(filePath);
97+
9798
if (commandLine.hasOption('p') && configContext != null) {
9899
Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
99100
MixAll.printObjectProperties(console, configContext.getBrokerConfig());

broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,11 @@ public ClientHousekeepingService(final BrokerController brokerController) {
4141

4242
public void start() {
4343

44-
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
45-
@Override
46-
public void run() {
47-
try {
48-
ClientHousekeepingService.this.scanExceptionChannel();
49-
} catch (Throwable e) {
50-
log.error("Error occurred when scan not active client channels.", e);
51-
}
44+
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
45+
try {
46+
ClientHousekeepingService.this.scanExceptionChannel();
47+
} catch (Throwable e) {
48+
log.error("Error occurred when scan not active client channels.", e);
5249
}
5350
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
5451
}

broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,12 @@ public PopReviveService[] getPopReviveServices() {
7474
return popReviveServices;
7575
}
7676

77+
public void shutdown() throws Exception {
78+
for (PopReviveService popReviveService : popReviveServices) {
79+
popReviveService.shutdown();
80+
}
81+
}
82+
7783
public void startPopReviveService() {
7884
for (PopReviveService popReviveService : popReviveServices) {
7985
popReviveService.start();

broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ public NotificationProcessor(final BrokerController brokerController) {
5555
this.popLongPollingService = new PopLongPollingService(brokerController, this, true);
5656
}
5757

58+
public void shutdown() throws Exception {
59+
this.popLongPollingService.shutdown();
60+
}
61+
5862
@Override
5963
public boolean rejectRequest() {
6064
return false;

broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,10 @@ public void run() {
126126
if (!isShouldRunning()) {
127127
return;
128128
}
129-
while (this.buffer.size() > 0 || getOffsetTotalSize() > 0) {
130-
scan();
129+
if (!brokerController.getBrokerConfig().isInBrokerContainer()) {
130+
while (this.buffer.size() > 0 || getOffsetTotalSize() > 0) {
131+
scan();
132+
}
131133
}
132134
}
133135

broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,12 @@ public PopMessageProcessor(final BrokerController brokerController) {
123123
this.ckMessageNumber = new AtomicLong();
124124
}
125125

126+
public void shutdown() throws Exception {
127+
popLongPollingService.shutdown();
128+
queueLockManager.shutdown();
129+
popBufferMergeService.shutdown();
130+
}
131+
126132
protected String getReviveTopic() {
127133
return reviveTopic;
128134
}

broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public BrokerController getBrokerController() {
9090
return brokerController;
9191
}
9292

93-
public void shutDown() {
93+
public void shutdown() {
9494
if (executorService != null) {
9595
executorService.shutdown();
9696
}

common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -487,10 +487,16 @@ public synchronized boolean shutdown() {
487487
return true;
488488
}
489489

490+
manualCompactionThread.shutdownNow();
491+
492+
manualCompactionThread.awaitTermination(30, TimeUnit.SECONDS);
493+
490494
final FlushOptions flushOptions = new FlushOptions();
491495
flushOptions.setWaitForFlush(true);
492496
try {
493497
flush(flushOptions);
498+
} catch (Throwable e) {
499+
LOGGER.error("flush rocksdb wal failed when shutdown", e);
494500
} finally {
495501
flushOptions.close();
496502
}
@@ -521,10 +527,22 @@ public synchronized boolean shutdown() {
521527
}
522528
//4. close db.
523529
if (db != null && !this.readOnly) {
524-
this.db.syncWal();
530+
try {
531+
this.db.syncWal();
532+
} catch (Throwable e) {
533+
LOGGER.error("rocksdb sync wal failed when shutdown", e);
534+
} finally {
535+
flushOptions.close();
536+
}
537+
525538
}
526539
if (db != null) {
527-
this.db.closeE();
540+
try {
541+
this.db.closeE();
542+
} catch (Throwable e) {
543+
LOGGER.error("rocksdb db closeE failed when shutdown", e);
544+
}
545+
528546
}
529547
// Close DBOptions after RocksDB instance is closed.
530548
if (this.options != null) {

common/src/main/java/org/apache/rocketmq/common/statistics/StatisticsManager.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,4 +154,8 @@ public StatisticsItemStateGetter getStatisticsItemStateGetter() {
154154
public void setStatisticsItemStateGetter(StatisticsItemStateGetter statisticsItemStateGetter) {
155155
this.statisticsItemStateGetter = statisticsItemStateGetter;
156156
}
157+
158+
public void shutdown() {
159+
executor.shutdown();
160+
}
157161
}

0 commit comments

Comments
 (0)