Skip to content

Commit a867938

Browse files
lizhiminsRongtongJin
authored andcommitted
[ISSUE #9369] Fix reset offset commit pull offset when use pop consumer service (#9370)
1 parent 34742b9 commit a867938

File tree

4 files changed

+9
-6
lines changed

4 files changed

+9
-6
lines changed

broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1555,10 +1555,6 @@ protected void shutdownBasicService() {
15551555
this.consumerFilterManager.persist();
15561556
}
15571557

1558-
if (this.consumerOrderInfoManager != null) {
1559-
this.consumerOrderInfoManager.persist();
1560-
}
1561-
15621558
if (this.scheduleMessageService != null) {
15631559
this.scheduleMessageService.persist();
15641560
this.scheduleMessageService.shutdown();

broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,10 @@ public long queryPullOffset(final String group, final String topic, final int qu
283283
return offset;
284284
}
285285

286+
public void clearPullOffset(final String group, final String topic) {
287+
this.pullOffsetTable.remove(topic + TOPIC_GROUP_SEPARATOR + group);
288+
}
289+
286290
@Override
287291
public String encode() {
288292
return this.encode(false);

broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2263,8 +2263,7 @@ private RemotingCommand resetOffsetInner(String topic, String group, int queueId
22632263
}
22642264
if (brokerController.getBrokerConfig().isPopConsumerKVServiceEnable()) {
22652265
brokerController.getPopConsumerService().clearCache(group, topic, entry.getKey());
2266-
brokerController.getConsumerOffsetManager().commitPullOffset(
2267-
"ResetOffsetInner", group, topic, entry.getKey(), entry.getValue());
2266+
brokerController.getConsumerOffsetManager().clearPullOffset(group, topic);
22682267
}
22692268
body.getOffsetTable().put(new MessageQueue(topic, brokerName, entry.getKey()), entry.getValue());
22702269
}

broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ public void removeOffsetByGroupTest() {
7777
consumerOffsetManager.commitPullOffset("Pull", group, topic, 0, 100);
7878
consumerOffsetManager.removeOffset(group);
7979
Assert.assertFalse(consumerOffsetManager.getOffsetTable().containsKey(topic + TOPIC_GROUP_SEPARATOR + group));
80+
81+
consumerOffsetManager.commitPullOffset("Pull", group, topic, 0, 100);
82+
consumerOffsetManager.clearPullOffset(group, topic);
83+
Assert.assertEquals(-1L, consumerOffsetManager.queryPullOffset(group, topic, 0));
8084
}
8185

8286
@Test

0 commit comments

Comments
 (0)