Skip to content

Commit 69e2524

Browse files
RongtongJinRongtongJinShannonDing
authored
Fix resource leak in IndexStoreFile shutdown (#9840)
* Fix resource leak in IndexStoreFile shutdown Add cleanResources() calls after shutdown() for both mappedFile and compactMappedFile to ensure proper cleanup of memory-mapped buffers and file channels. Change-Id: I2716b4e3b0cd281e89a9d5a00a389dc6048de3e7 Co-developed-by: Cursor <noreply@cursor.com> * Add a FileNotFoundException check to allow breaking out of the loop after shutdown Change-Id: Icc0063544ef91de8b2bd96a80f3829a6922cb0e6 * Add a FileNotFoundException check to allow breaking out of the loop after shutdown. Change-Id: I53a7d2800775d80c1db8feb159c5c258b3517f09 --------- Co-authored-by: RongtongJin <user@example.com> Co-authored-by: ShannonDing <dinglei@apache.org>
1 parent 2017630 commit 69e2524

2 files changed

Lines changed: 24 additions & 14 deletions

File tree

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.rocketmq.tieredstore.index;
1818

1919
import com.google.common.base.Stopwatch;
20+
import java.io.FileNotFoundException;
2021
import java.io.IOException;
2122
import java.nio.ByteBuffer;
2223
import java.nio.MappedByteBuffer;
@@ -243,13 +244,13 @@ public AppendResult putKey(
243244
topicId, queueId, offset, size, hashCode, timeDiff, slotOldValue);
244245
int itemIndex = this.indexItemCount.incrementAndGet();
245246
int itemPosition = this.getItemPosition(itemIndex);
246-
247+
247248
if (writeWithoutMmap && fileChannel != null) {
248249
// Use FileChannel for writing
249250
ByteBuffer itemBuffer = indexItem.getByteBuffer();
250251
fileChannel.position(itemPosition);
251252
fileChannel.write(itemBuffer);
252-
253+
253254
ByteBuffer slotBuffer = ByteBuffer.allocate(Integer.BYTES);
254255
slotBuffer.putInt(0, itemIndex);
255256
slotBuffer.position(0);
@@ -436,6 +437,8 @@ public ByteBuffer doCompaction() {
436437
buffer = compactToNewFile();
437438
log.debug("IndexStoreFile do compaction, timestamp: {}, file size: {}, cost: {}ms",
438439
this.getTimestamp(), buffer.capacity(), stopwatch.elapsed(TimeUnit.MICROSECONDS));
440+
} catch (FileNotFoundException e) {
441+
throw new RuntimeException(e);
439442
} catch (Throwable e) {
440443
log.error("IndexStoreFile do compaction, timestamp: {}, cost: {}ms",
441444
this.getTimestamp(), stopwatch.elapsed(TimeUnit.MICROSECONDS), e);
@@ -482,7 +485,7 @@ protected ByteBuffer compactToNewFile() throws IOException {
482485
buffer.get(payload);
483486
int newSlotValue = payloadBuffer.getInt(COMPACT_INDEX_ITEM_SIZE);
484487
buffer.limit(COMPACT_INDEX_ITEM_SIZE);
485-
488+
486489
if (writeWithoutMmap && compactFileChannel != null) {
487490
// Use FileChannel for writing
488491
ByteBuffer writeBuffer = ByteBuffer.wrap(payload, 0, COMPACT_INDEX_ITEM_SIZE);
@@ -537,11 +540,13 @@ public void shutdown() {
537540
}
538541
if (this.mappedFile != null) {
539542
this.mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
543+
this.mappedFile.cleanResources();
540544
}
541545
if (this.compactMappedFile != null) {
542546
this.compactMappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
547+
this.compactMappedFile.cleanResources();
543548
}
544-
} catch (Exception e) {
549+
} catch (Throwable e) {
545550
log.error("IndexStoreFile shutdown failed, timestamp: {}, status: {}", this.getTimestamp(), fileStatus.get(), e);
546551
} finally {
547552
fileReadWriteLock.writeLock().unlock();

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -422,15 +422,19 @@ public void forceShutdown() {
422422
@Override
423423
public void run() {
424424
while (!this.isStopped()) {
425-
long expireTimestamp = System.currentTimeMillis()
426-
- TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime());
427-
this.destroyExpiredFile(expireTimestamp);
428-
IndexFile indexFile = this.getNextSealedFile();
429-
if (indexFile != null) {
430-
if (this.doCompactThenUploadFile(indexFile)) {
431-
this.setCompactTimestamp(indexFile.getTimestamp());
432-
continue;
425+
try {
426+
long expireTimestamp = System.currentTimeMillis()
427+
- TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime());
428+
this.destroyExpiredFile(expireTimestamp);
429+
IndexFile indexFile = this.getNextSealedFile();
430+
if (indexFile != null) {
431+
if (this.doCompactThenUploadFile(indexFile)) {
432+
this.setCompactTimestamp(indexFile.getTimestamp());
433+
continue;
434+
}
433435
}
436+
} catch (Throwable e) {
437+
log.error("IndexStoreService running error", e);
434438
}
435439
this.waitForRunning(TimeUnit.SECONDS.toMillis(10));
436440
}
@@ -439,13 +443,14 @@ public void run() {
439443
if (autoCreateNewFile) {
440444
this.forceUpload();
441445
}
442-
this.timeStoreTable.forEach((timestamp, file) -> file.shutdown());
443-
this.timeStoreTable.clear();
444446
} catch (Exception e) {
445447
log.error("IndexStoreService shutdown error", e);
446448
} finally {
449+
this.timeStoreTable.forEach((timestamp, file) -> file.shutdown());
450+
this.timeStoreTable.clear();
447451
readWriteLock.writeLock().unlock();
448452
}
453+
449454
log.info(this.getServiceName() + " service shutdown");
450455
}
451456
}

0 commit comments

Comments
 (0)