diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java index c58e91e9c0c..e0a3c5cd0af 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.tieredstore.index; import com.google.common.base.Stopwatch; +import java.io.FileNotFoundException; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; @@ -243,13 +244,13 @@ public AppendResult putKey( topicId, queueId, offset, size, hashCode, timeDiff, slotOldValue); int itemIndex = this.indexItemCount.incrementAndGet(); int itemPosition = this.getItemPosition(itemIndex); - + if (writeWithoutMmap && fileChannel != null) { // Use FileChannel for writing ByteBuffer itemBuffer = indexItem.getByteBuffer(); fileChannel.position(itemPosition); fileChannel.write(itemBuffer); - + ByteBuffer slotBuffer = ByteBuffer.allocate(Integer.BYTES); slotBuffer.putInt(0, itemIndex); slotBuffer.position(0); @@ -436,6 +437,8 @@ public ByteBuffer doCompaction() { buffer = compactToNewFile(); log.debug("IndexStoreFile do compaction, timestamp: {}, file size: {}, cost: {}ms", this.getTimestamp(), buffer.capacity(), stopwatch.elapsed(TimeUnit.MICROSECONDS)); + } catch (FileNotFoundException e) { + throw new RuntimeException(e); } catch (Throwable e) { log.error("IndexStoreFile do compaction, timestamp: {}, cost: {}ms", this.getTimestamp(), stopwatch.elapsed(TimeUnit.MICROSECONDS), e); @@ -482,7 +485,7 @@ protected ByteBuffer compactToNewFile() throws IOException { buffer.get(payload); int newSlotValue = payloadBuffer.getInt(COMPACT_INDEX_ITEM_SIZE); buffer.limit(COMPACT_INDEX_ITEM_SIZE); - + if (writeWithoutMmap && compactFileChannel != null) { // Use FileChannel for writing ByteBuffer writeBuffer = ByteBuffer.wrap(payload, 0, COMPACT_INDEX_ITEM_SIZE); @@ -537,11 +540,13 @@ public void shutdown() { } if (this.mappedFile != null) { this.mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10)); + this.mappedFile.cleanResources(); } if (this.compactMappedFile != null) { this.compactMappedFile.shutdown(TimeUnit.SECONDS.toMillis(10)); + this.compactMappedFile.cleanResources(); } - } catch (Exception e) { + } catch (Throwable e) { log.error("IndexStoreFile shutdown failed, timestamp: {}, status: {}", this.getTimestamp(), fileStatus.get(), e); } finally { fileReadWriteLock.writeLock().unlock(); diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java index 07609bbab96..132d2162f99 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java @@ -422,15 +422,19 @@ public void forceShutdown() { @Override public void run() { while (!this.isStopped()) { - long expireTimestamp = System.currentTimeMillis() - - TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime()); - this.destroyExpiredFile(expireTimestamp); - IndexFile indexFile = this.getNextSealedFile(); - if (indexFile != null) { - if (this.doCompactThenUploadFile(indexFile)) { - this.setCompactTimestamp(indexFile.getTimestamp()); - continue; + try { + long expireTimestamp = System.currentTimeMillis() + - TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime()); + this.destroyExpiredFile(expireTimestamp); + IndexFile indexFile = this.getNextSealedFile(); + if (indexFile != null) { + if (this.doCompactThenUploadFile(indexFile)) { + this.setCompactTimestamp(indexFile.getTimestamp()); + continue; + } } + } catch (Throwable e) { + log.error("IndexStoreService running error", e); } this.waitForRunning(TimeUnit.SECONDS.toMillis(10)); } @@ -439,13 +443,14 @@ public void run() { if (autoCreateNewFile) { this.forceUpload(); } - this.timeStoreTable.forEach((timestamp, file) -> file.shutdown()); - this.timeStoreTable.clear(); } catch (Exception e) { log.error("IndexStoreService shutdown error", e); } finally { + this.timeStoreTable.forEach((timestamp, file) -> file.shutdown()); + this.timeStoreTable.clear(); readWriteLock.writeLock().unlock(); } + log.info(this.getServiceName() + " service shutdown"); } }