Skip to content

Commit 7583fda

Browse files
RongtongJinRongtongJin
andauthored
fix(store): close all consume queue file handles on ConsumeQueueStore shutdown (#10060)
* fix(store): close all consume queue file handles on ConsumeQueueStore shutdown * remove implementation --------- Co-authored-by: RongtongJin <user@example.com>
1 parent 89d331c commit 7583fda

5 files changed

Lines changed: 22 additions & 2 deletions

File tree

store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,11 @@
3737
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
3838
import org.apache.rocketmq.store.queue.ConsumeQueueStore;
3939
import org.apache.rocketmq.store.queue.CqUnit;
40-
import org.apache.rocketmq.store.queue.FileQueueLifeCycle;
4140
import org.apache.rocketmq.store.queue.MultiDispatchUtils;
4241
import org.apache.rocketmq.store.queue.QueueOffsetOperator;
4342
import org.apache.rocketmq.store.queue.ReferredIterator;
4443

45-
public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
44+
public class ConsumeQueue implements ConsumeQueueInterface {
4645
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
4746

4847
/**
@@ -1236,4 +1235,10 @@ public void initializeWithOffset(long offset, long minPhyOffset) {
12361235

12371236
flush(0);
12381237
}
1238+
1239+
@Override
1240+
public boolean shutdown() {
1241+
this.mappedFileQueue.cleanResourcesAll();
1242+
return true;
1243+
}
12391244
}

store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1200,4 +1200,9 @@ public long estimateMessageCount(long from, long to, MessageFilter filter) {
12001200
public void initializeWithOffset(long offset, long minPhyOffset) {
12011201
// not support now
12021202
}
1203+
1204+
@Override
1205+
public boolean shutdown() {
1206+
return true;
1207+
}
12031208
}

store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ public boolean shutdown() {
171171
log.error("Failed to flush all consume queues", e);
172172
return false;
173173
}
174+
174175
return true;
175176
}
176177

@@ -864,4 +865,5 @@ public String getServiceName() {
864865
return messageStore.getBrokerConfig().getIdentifier() + CleanConsumeQueueService.class.getSimpleName();
865866
}
866867
}
868+
867869
}

store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,10 @@ public interface FileQueueLifeCycle extends Swappable {
7878

7979
/**
8080
* Does the first file exist?
81+
*
8182
* @return true if it exists
8283
*/
8384
boolean isFirstFileExist();
85+
86+
boolean shutdown();
8487
}

store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,4 +507,9 @@ public void initializeWithOffset(long offset, long minPhyOffset) {
507507
ERROR_LOG.error("RocksDBConsumeQueue initializeWithOffset Failed. topic={}, queueId={}, offset={}", topic, queueId, offset, e);
508508
}
509509
}
510+
511+
@Override
512+
public boolean shutdown() {
513+
return true;
514+
}
510515
}

0 commit comments

Comments
 (0)