diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/QueueLevelConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/QueueLevelConsumerManager.java index 6c57dd7ab4d..6f496fa13b3 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/QueueLevelConsumerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/QueueLevelConsumerManager.java @@ -291,8 +291,7 @@ public void updateNextVisibleTime(String topic, String group, int queueId, long updateLockFreeTimestamp(topic, group, queueId, orderInfo); } - @VisibleForTesting - protected void autoClean() { + public void autoClean() { if (brokerController == null) { return; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java index cb32b9757c9..9314dab734e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java @@ -438,6 +438,9 @@ public GetMessageResult getMessage(String clientHost, String group, String lmqNa } public class PopLiteLockManager extends ServiceThread { + private static final long AUTO_CLEAN_INTERVAL = 5 * 60 * 1000; + private long lastCleanTime = System.currentTimeMillis(); + @Override public String getServiceName() { if (brokerController.getBrokerConfig().isInBrokerContainer()) { @@ -452,6 +455,10 @@ public void run() { try { waitForRunning(60000); lockService.removeTimeout(); + if (System.currentTimeMillis() - lastCleanTime >= AUTO_CLEAN_INTERVAL) { + ((MemoryConsumerOrderInfoManager) consumerOrderInfoManager).autoClean(); + lastCleanTime = System.currentTimeMillis(); + } } catch (Exception ignored) { } } diff --git a/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java b/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java index 78b6ed6ddcb..a7021e849c9 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java @@ -32,7 +32,7 @@ public static String getStorePathBatchConsumeQueue(final String rootDir) { } public static String getStorePathRocksDBConsumeQueue(final String rootDir) { - return rootDir + File.separator + "consumequeue_r"; + return rootDir + File.separator + "consumequeue_rocksdb"; } public static String getStorePathIndex(final String rootDir) {