From d82a3716e220f1d2b047978dda187ca467e7b90f Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Tue, 11 Nov 2025 17:35:24 +0800 Subject: [PATCH 1/3] Fix resource leak in IndexStoreFile shutdown Add cleanResources() calls after shutdown() for both mappedFile and compactMappedFile to ensure proper cleanup of memory-mapped buffers and file channels. Change-Id: I2716b4e3b0cd281e89a9d5a00a389dc6048de3e7 Co-developed-by: Cursor --- .../org/apache/rocketmq/tieredstore/index/IndexStoreFile.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java index 528bce9bb8b..a40a865d505 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java @@ -468,9 +468,11 @@ public void shutdown() { } if (this.mappedFile != null) { this.mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10)); + this.mappedFile.cleanResources(); } if (this.compactMappedFile != null) { this.compactMappedFile.shutdown(TimeUnit.SECONDS.toMillis(10)); + this.compactMappedFile.cleanResources(); } } catch (Exception e) { log.error("IndexStoreFile shutdown failed, timestamp: {}, status: {}", this.getTimestamp(), fileStatus.get(), e); From 84c924d803a8646ef426cb53ac053c7a795d93af Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Tue, 11 Nov 2025 19:53:09 +0800 Subject: [PATCH 2/3] Add a FileNotFoundException check to allow breaking out of the loop after shutdown Change-Id: Icc0063544ef91de8b2bd96a80f3829a6922cb0e6 --- .../tieredstore/index/IndexStoreFile.java | 3 +++ .../tieredstore/index/IndexStoreService.java | 20 +++++++++++-------- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java index a40a865d505..9e05b3c0106 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.tieredstore.index; import com.google.common.base.Stopwatch; +import java.io.FileNotFoundException; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; @@ -392,6 +393,8 @@ public ByteBuffer doCompaction() { buffer = compactToNewFile(); log.debug("IndexStoreFile do compaction, timestamp: {}, file size: {}, cost: {}ms", this.getTimestamp(), buffer.capacity(), stopwatch.elapsed(TimeUnit.MICROSECONDS)); + } catch (FileNotFoundException e) { + throw new RuntimeException(e); } catch (Exception e) { log.error("IndexStoreFile do compaction, timestamp: {}, cost: {}ms", this.getTimestamp(), stopwatch.elapsed(TimeUnit.MICROSECONDS), e); diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java index 2385628ed45..797fe3168d3 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java @@ -417,15 +417,19 @@ public void shutdown() { @Override public void run() { while (!this.isStopped()) { - long expireTimestamp = System.currentTimeMillis() - - TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime()); - this.destroyExpiredFile(expireTimestamp); - IndexFile indexFile = this.getNextSealedFile(); - if (indexFile != null) { - if (this.doCompactThenUploadFile(indexFile)) { - this.setCompactTimestamp(indexFile.getTimestamp()); - continue; + try { + long expireTimestamp = System.currentTimeMillis() + - TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime()); + this.destroyExpiredFile(expireTimestamp); + IndexFile indexFile = this.getNextSealedFile(); + if (indexFile != null) { + if (this.doCompactThenUploadFile(indexFile)) { + this.setCompactTimestamp(indexFile.getTimestamp()); + continue; + } } + } catch (Throwable e) { + log.error("IndexStoreService running error", e); } this.waitForRunning(TimeUnit.SECONDS.toMillis(10)); } From db0cb9629f375fad62e11057b9aaeba41025e468 Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Tue, 11 Nov 2025 20:08:08 +0800 Subject: [PATCH 3/3] Add a FileNotFoundException check to allow breaking out of the loop after shutdown. Change-Id: I53a7d2800775d80c1db8feb159c5c258b3517f09 --- .../apache/rocketmq/tieredstore/index/IndexStoreFile.java | 2 +- .../apache/rocketmq/tieredstore/index/IndexStoreService.java | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java index 9e05b3c0106..6faffe261b0 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java @@ -477,7 +477,7 @@ public void shutdown() { this.compactMappedFile.shutdown(TimeUnit.SECONDS.toMillis(10)); this.compactMappedFile.cleanResources(); } - } catch (Exception e) { + } catch (Throwable e) { log.error("IndexStoreFile shutdown failed, timestamp: {}, status: {}", this.getTimestamp(), fileStatus.get(), e); } finally { fileReadWriteLock.writeLock().unlock(); diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java index 797fe3168d3..afce4753b35 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java @@ -438,13 +438,14 @@ public void run() { if (autoCreateNewFile) { this.forceUpload(); } - this.timeStoreTable.forEach((timestamp, file) -> file.shutdown()); - this.timeStoreTable.clear(); } catch (Exception e) { log.error("IndexStoreService shutdown error", e); } finally { + this.timeStoreTable.forEach((timestamp, file) -> file.shutdown()); + this.timeStoreTable.clear(); readWriteLock.writeLock().unlock(); } + log.info(this.getServiceName() + " service shutdown"); } }