diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 2616e039e82..2d35acfcb98 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -1543,10 +1543,6 @@ protected void shutdownBasicService() { this.consumerFilterManager.persist(); } - if (this.consumerOrderInfoManager != null) { - this.consumerOrderInfoManager.persist(); - } - if (this.scheduleMessageService != null) { this.scheduleMessageService.persist(); this.scheduleMessageService.shutdown(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index eafb47a89da..140604f5217 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -283,6 +283,10 @@ public long queryPullOffset(final String group, final String topic, final int qu return offset; } + public void clearPullOffset(final String group, final String topic) { + this.pullOffsetTable.remove(topic + TOPIC_GROUP_SEPARATOR + group); + } + @Override public String encode() { return this.encode(false); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index c747fa15af0..172988ea416 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -2262,8 +2262,7 @@ private RemotingCommand resetOffsetInner(String topic, String group, int queueId } if (brokerController.getBrokerConfig().isPopConsumerKVServiceEnable()) { brokerController.getPopConsumerService().clearCache(group, topic, entry.getKey()); - brokerController.getConsumerOffsetManager().commitPullOffset( - "ResetOffsetInner", group, topic, entry.getKey(), entry.getValue()); + brokerController.getConsumerOffsetManager().clearPullOffset(group, topic); } body.getOffsetTable().put(new MessageQueue(topic, brokerName, entry.getKey()), entry.getValue()); } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java index 9fc553409d2..d980090a23f 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java @@ -77,6 +77,10 @@ public void removeOffsetByGroupTest() { consumerOffsetManager.commitPullOffset("Pull", group, topic, 0, 100); consumerOffsetManager.removeOffset(group); Assert.assertFalse(consumerOffsetManager.getOffsetTable().containsKey(topic + TOPIC_GROUP_SEPARATOR + group)); + + consumerOffsetManager.commitPullOffset("Pull", group, topic, 0, 100); + consumerOffsetManager.clearPullOffset(group, topic); + Assert.assertEquals(-1L, consumerOffsetManager.queryPullOffset(group, topic, 0)); } @Test