Skip to content

Commit df8cf98

Browse files
lizhiminsltamber3424672656
authored andcommitted
[ISSUE #9172] Clean pull offset and reset offset when delete subscription group (#9173)
* [ISSUE #9172] Clean pull offset and reset offset when delete subscription group * [ISSUE #9174] Add a collection of predefined Groups and common checking methods in the MixAll (#9175) Signed-off-by: ltamber <ltamber12@gmail.com> * [ISSUE #9177] Fix unstable tests in AdaptiveLockTest.testAdaptiveLock (#9178) --------- Signed-off-by: ltamber <ltamber12@gmail.com> Co-authored-by: ltamber <ltamber12@gmail.com> Co-authored-by: hqbfz <125714719+3424672656@users.noreply.github.com>
1 parent f5c84de commit df8cf98

2 files changed

Lines changed: 36 additions & 11 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import com.google.common.base.Strings;
3131

32+
import java.util.function.Function;
3233
import org.apache.rocketmq.broker.BrokerController;
3334
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
3435
import org.apache.rocketmq.common.ConfigManager;
@@ -395,19 +396,29 @@ public boolean loadDataVersion() {
395396
}
396397

397398
public void removeOffset(final String group) {
398-
Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
399-
while (it.hasNext()) {
400-
Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
401-
String topicAtGroup = next.getKey();
402-
if (topicAtGroup.contains(group)) {
403-
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
404-
if (arrays.length == 2 && group.equals(arrays[1])) {
405-
it.remove();
406-
removeConsumerOffset(topicAtGroup);
407-
LOG.warn("clean group offset {}", topicAtGroup);
399+
Function<Iterator<Entry<String, ConcurrentMap<Integer, Long>>>, Boolean> deleteFunction = it -> {
400+
boolean removed = false;
401+
while (it.hasNext()) {
402+
Entry<String, ConcurrentMap<Integer, Long>> entry = it.next();
403+
String topicAtGroup = entry.getKey();
404+
if (topicAtGroup.contains(group)) {
405+
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
406+
if (arrays.length == 2 && group.equals(arrays[1])) {
407+
it.remove();
408+
removeConsumerOffset(topicAtGroup);
409+
removed = true;
410+
}
408411
}
409412
}
410-
}
413+
return removed;
414+
};
415+
416+
boolean clearOffset = deleteFunction.apply(this.offsetTable.entrySet().iterator());
417+
boolean clearReset = deleteFunction.apply(this.resetOffsetTable.entrySet().iterator());
418+
boolean clearPull = deleteFunction.apply(this.pullOffsetTable.entrySet().iterator());
419+
420+
LOG.info("Consumer offset manager clean group offset, groupName={}, " +
421+
"offsetTable={}, resetOffsetTable={}, pullOffsetTable={}", group, clearOffset, clearReset, clearPull);
411422
}
412423

413424
public void assignResetOffset(String topic, String group, int queueId, long offset) {

broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.rocketmq.broker.offset;
1919

2020
import org.apache.rocketmq.broker.BrokerController;
21+
import org.apache.rocketmq.common.BrokerConfig;
2122
import org.apache.rocketmq.store.config.MessageStoreConfig;
2223
import org.junit.Assert;
2324
import org.junit.Before;
@@ -27,6 +28,7 @@
2728
import java.util.concurrent.ConcurrentMap;
2829
import org.mockito.Mockito;
2930

31+
import static org.apache.rocketmq.broker.offset.ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR;
3032
import static org.assertj.core.api.Assertions.assertThat;
3133

3234
public class ConsumerOffsetManagerTest {
@@ -65,6 +67,18 @@ public void cleanOffsetByTopic_Exist() {
6567
assertThat(!consumerOffsetManager.getOffsetTable().containsKey(KEY)).isTrue();
6668
}
6769

70+
@Test
71+
public void removeOffsetByGroupTest() {
72+
String topic = "TopicName";
73+
String group = "GroupName";
74+
Mockito.when(brokerController.getBrokerConfig()).thenReturn(new BrokerConfig());
75+
consumerOffsetManager.commitOffset("Commit", group, topic, 0, 100);
76+
consumerOffsetManager.assignResetOffset(topic, group, 0, 100);
77+
consumerOffsetManager.commitPullOffset("Pull", group, topic, 0, 100);
78+
consumerOffsetManager.removeOffset(group);
79+
Assert.assertFalse(consumerOffsetManager.getOffsetTable().containsKey(topic + TOPIC_GROUP_SEPARATOR + group));
80+
}
81+
6882
@Test
6983
public void testOffsetPersistInMemory() {
7084
ConcurrentMap<String, ConcurrentMap<Integer, Long>> offsetTable = consumerOffsetManager.getOffsetTable();

0 commit comments

Comments
 (0)