diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 02f90cef1df..2a77ede32af 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -37,12 +37,11 @@ import org.apache.rocketmq.store.queue.ConsumeQueueInterface; import org.apache.rocketmq.store.queue.ConsumeQueueStore; import org.apache.rocketmq.store.queue.CqUnit; -import org.apache.rocketmq.store.queue.FileQueueLifeCycle; import org.apache.rocketmq.store.queue.MultiDispatchUtils; import org.apache.rocketmq.store.queue.QueueOffsetOperator; import org.apache.rocketmq.store.queue.ReferredIterator; -public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { +public class ConsumeQueue implements ConsumeQueueInterface { private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); /** @@ -1236,4 +1235,10 @@ public void initializeWithOffset(long offset, long minPhyOffset) { flush(0); } + + @Override + public boolean shutdown() { + this.mappedFileQueue.cleanResourcesAll(); + return true; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java index 3f1dc237d6b..7ad29ff538b 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java @@ -1200,4 +1200,9 @@ public long estimateMessageCount(long from, long to, MessageFilter filter) { public void initializeWithOffset(long offset, long minPhyOffset) { // not support now } + + @Override + public boolean shutdown() { + return true; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java index e9b0312c01c..d5d096becd9 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java @@ -171,6 +171,7 @@ public boolean shutdown() { log.error("Failed to flush all consume queues", e); return false; } + return true; } @@ -864,4 +865,5 @@ public String getServiceName() { return messageStore.getBrokerConfig().getIdentifier() + CleanConsumeQueueService.class.getSimpleName(); } } + } diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java b/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java index 95cc0887f42..89cb0b58ab3 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java @@ -78,7 +78,10 @@ public interface FileQueueLifeCycle extends Swappable { /** * Does the first file exist? + * * @return true if it exists */ boolean isFirstFileExist(); + + boolean shutdown(); } diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java index 03fa5ac9123..0d58d9a6934 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java @@ -507,4 +507,9 @@ public void initializeWithOffset(long offset, long minPhyOffset) { ERROR_LOG.error("RocksDBConsumeQueue initializeWithOffset Failed. topic={}, queueId={}, offset={}", topic, queueId, offset, e); } } + + @Override + public boolean shutdown() { + return true; + } }