From 6bf661a237202a1815ac05fc584a85be3986a73e Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Fri, 30 Jan 2026 17:24:37 +0800 Subject: [PATCH 1/2] fix(store): close all consume queue file handles on ConsumeQueueStore shutdown --- .../java/org/apache/rocketmq/store/ConsumeQueue.java | 9 +++++++-- .../apache/rocketmq/store/queue/BatchConsumeQueue.java | 5 +++++ .../apache/rocketmq/store/queue/ConsumeQueueStore.java | 8 ++++++++ .../apache/rocketmq/store/queue/FileQueueLifeCycle.java | 3 +++ .../apache/rocketmq/store/queue/RocksDBConsumeQueue.java | 5 +++++ 5 files changed, 28 insertions(+), 2 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 02f90cef1df..2a77ede32af 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -37,12 +37,11 @@ import org.apache.rocketmq.store.queue.ConsumeQueueInterface; import org.apache.rocketmq.store.queue.ConsumeQueueStore; import org.apache.rocketmq.store.queue.CqUnit; -import org.apache.rocketmq.store.queue.FileQueueLifeCycle; import org.apache.rocketmq.store.queue.MultiDispatchUtils; import org.apache.rocketmq.store.queue.QueueOffsetOperator; import org.apache.rocketmq.store.queue.ReferredIterator; -public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { +public class ConsumeQueue implements ConsumeQueueInterface { private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); /** @@ -1236,4 +1235,10 @@ public void initializeWithOffset(long offset, long minPhyOffset) { flush(0); } + + @Override + public boolean shutdown() { + this.mappedFileQueue.cleanResourcesAll(); + return true; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java index 3f1dc237d6b..7ad29ff538b 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java @@ -1200,4 +1200,9 @@ public long estimateMessageCount(long from, long to, MessageFilter filter) { public void initializeWithOffset(long offset, long minPhyOffset) { // not support now } + + @Override + public boolean shutdown() { + return true; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java index e9b0312c01c..22981be2e39 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java @@ -171,6 +171,13 @@ public boolean shutdown() { log.error("Failed to flush all consume queues", e); return false; } + + for (Map.Entry> topicEntry : this.consumeQueueTable.entrySet()) { + for (Map.Entry cqEntry : topicEntry.getValue().entrySet()) { + cqEntry.getValue().shutdown(); + } + } + return true; } @@ -864,4 +871,5 @@ public String getServiceName() { return messageStore.getBrokerConfig().getIdentifier() + CleanConsumeQueueService.class.getSimpleName(); } } + } diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java b/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java index 95cc0887f42..89cb0b58ab3 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java @@ -78,7 +78,10 @@ public interface FileQueueLifeCycle extends Swappable { /** * Does the first file exist? + * * @return true if it exists */ boolean isFirstFileExist(); + + boolean shutdown(); } diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java index 03fa5ac9123..0d58d9a6934 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java @@ -507,4 +507,9 @@ public void initializeWithOffset(long offset, long minPhyOffset) { ERROR_LOG.error("RocksDBConsumeQueue initializeWithOffset Failed. topic={}, queueId={}, offset={}", topic, queueId, offset, e); } } + + @Override + public boolean shutdown() { + return true; + } } From f74565c48cd5d276189054c1b65f23175c69704d Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Mon, 2 Feb 2026 10:03:42 +0800 Subject: [PATCH 2/2] remove implementation --- .../org/apache/rocketmq/store/queue/ConsumeQueueStore.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java index 22981be2e39..d5d096becd9 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java @@ -172,12 +172,6 @@ public boolean shutdown() { return false; } - for (Map.Entry> topicEntry : this.consumeQueueTable.entrySet()) { - for (Map.Entry cqEntry : topicEntry.getValue().entrySet()) { - cqEntry.getValue().shutdown(); - } - } - return true; }