Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,11 @@
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
import org.apache.rocketmq.store.queue.ConsumeQueueStore;
import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.store.queue.FileQueueLifeCycle;
import org.apache.rocketmq.store.queue.MultiDispatchUtils;
import org.apache.rocketmq.store.queue.QueueOffsetOperator;
import org.apache.rocketmq.store.queue.ReferredIterator;

public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
public class ConsumeQueue implements ConsumeQueueInterface {
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

/**
Expand Down Expand Up @@ -1236,4 +1235,10 @@ public void initializeWithOffset(long offset, long minPhyOffset) {

flush(0);
}

@Override
public boolean shutdown() {
this.mappedFileQueue.cleanResourcesAll();
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1200,4 +1200,9 @@ public long estimateMessageCount(long from, long to, MessageFilter filter) {
public void initializeWithOffset(long offset, long minPhyOffset) {
// not support now
}

@Override
public boolean shutdown() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ public boolean shutdown() {
log.error("Failed to flush all consume queues", e);
return false;
}

return true;
}

Expand Down Expand Up @@ -864,4 +865,5 @@ public String getServiceName() {
return messageStore.getBrokerConfig().getIdentifier() + CleanConsumeQueueService.class.getSimpleName();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ public interface FileQueueLifeCycle extends Swappable {

/**
* Does the first file exist?
*
* @return true if it exists
*/
boolean isFirstFileExist();

boolean shutdown();
}
Original file line number Diff line number Diff line change
Expand Up @@ -507,4 +507,9 @@ public void initializeWithOffset(long offset, long minPhyOffset) {
ERROR_LOG.error("RocksDBConsumeQueue initializeWithOffset Failed. topic={}, queueId={}, offset={}", topic, queueId, offset, e);
}
}

@Override
public boolean shutdown() {
return true;
}
}
Loading