From cdeb9a492e6527d95c20c8fd17cd140842654192 Mon Sep 17 00:00:00 2001 From: lizhimins <707364882@qq.com> Date: Fri, 10 Apr 2026 15:26:11 +0800 Subject: [PATCH] [ISSUE #10181] Remove lock when reset offset for PopConsumerService (#10182) Signed-off-by: terrance.lzm --- .../rocketmq/broker/pop/PopConsumerService.java | 11 ++--------- .../broker/processor/AdminBrokerProcessor.java | 1 - 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java index 55a347b89a5..7d80014ece2 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java @@ -555,16 +555,9 @@ public CompletableFuture revive(PopConsumerRecord record) { }); } - @SuppressWarnings("StatementWithEmptyBody") public void clearCache(String groupId, String topicId, int queueId) { - while (!consumerLockService.tryLock(groupId, topicId)) { - } - try { - if (popConsumerCache != null) { - popConsumerCache.removeRecords(groupId, topicId, queueId); - } - } finally { - consumerLockService.unlock(groupId, topicId); + if (popConsumerCache != null) { + popConsumerCache.removeRecords(groupId, topicId, queueId); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 73aaa69e74a..c88d4e5ad2f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -2183,7 +2183,6 @@ private RemotingCommand resetOffsetInner(String topic, String group, int queueId brokerController.getPopInflightMessageCounter().clearInFlightMessageNum(topic, group, entry.getKey()); } if (brokerController.getBrokerConfig().isPopConsumerKVServiceEnable()) { - brokerController.getPopConsumerService().clearCache(group, topic, entry.getKey()); brokerController.getConsumerOffsetManager().clearPullOffset(group, topic); } body.getOffsetTable().put(new MessageQueue(topic, brokerName, entry.getKey()), entry.getValue());