Skip to content

Commit 9337904

Browse files
3424672656hqbfzwang
andauthored
[ISSUE #9309]opti:Avoid the generation of dirty data in #assignResetOffset (#9310)
* feat: support clients to reset lmq consumption offset * fix * fix * fix * fix: clean pull offset in #removeOffset * fix: clean pull offset in #removeOffset * rerun test --------- Co-authored-by: hqbfzwang <hqbfzwang@tencent.com>
1 parent ecd78d9 commit 9337904

3 files changed

Lines changed: 55 additions & 1 deletion

File tree

broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.rocketmq.broker.config.v2;
1818

19+
import com.google.common.base.Strings;
1920
import io.netty.buffer.ByteBuf;
2021
import io.netty.util.internal.PlatformDependent;
2122
import java.nio.ByteBuffer;
@@ -446,4 +447,29 @@ public long queryPullOffset(String group, String topic, int queueId) {
446447
}
447448
return -1;
448449
}
450+
451+
@Override
452+
public void assignResetOffset(String topic, String group, int queueId, long offset) {
453+
if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(group) || queueId < 0 || offset < 0) {
454+
LOG.warn("Illegal arguments when assigning reset offset. Topic={}, group={}, queueId={}, offset={}",
455+
topic, group, queueId, offset);
456+
return;
457+
}
458+
if (!MixAll.isLmq(topic) || !MixAll.isLmq(group)) {
459+
super.assignResetOffset(topic, group, queueId, offset);
460+
} else {
461+
String key = topic + TOPIC_GROUP_SEPARATOR + group;
462+
ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);
463+
if (null == map) {
464+
map = new ConcurrentHashMap<>();
465+
ConcurrentMap<Integer, Long> previous = resetOffsetTable.putIfAbsent(key, map);
466+
if (null != previous) {
467+
map = previous;
468+
}
469+
}
470+
map.put(queueId, offset);
471+
}
472+
473+
this.commitOffset(null, topic, group, queueId, offset);
474+
}
449475
}

broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public class ConsumerOffsetManager extends ConfigManager {
5050
protected ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
5151
new ConcurrentHashMap<>(512);
5252

53-
private final ConcurrentMap<String, ConcurrentMap<Integer, Long>> resetOffsetTable =
53+
protected final ConcurrentMap<String, ConcurrentMap<Integer, Long>> resetOffsetTable =
5454
new ConcurrentHashMap<>(512);
5555

5656
private final ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> pullOffsetTable =

broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
import java.util.Iterator;
2121
import java.util.Map;
2222
import java.util.concurrent.ConcurrentHashMap;
23+
import java.util.concurrent.ConcurrentMap;
2324

25+
import com.google.common.base.Strings;
2426
import org.apache.rocketmq.broker.BrokerController;
2527
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
2628
import org.apache.rocketmq.common.MixAll;
@@ -132,4 +134,30 @@ public void removeOffset(String group) {
132134
}
133135
}
134136
}
137+
138+
@Override
139+
public void assignResetOffset(String topic, String group, int queueId, long offset) {
140+
if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(group) || queueId < 0 || offset < 0) {
141+
LOG.warn("Illegal arguments when assigning reset offset. Topic={}, group={}, queueId={}, offset={}",
142+
topic, group, queueId, offset);
143+
return;
144+
}
145+
if (!MixAll.isLmq(topic) || !MixAll.isLmq(group)) {
146+
super.assignResetOffset(topic, group, queueId, offset);
147+
return;
148+
}
149+
150+
String key = topic + TOPIC_GROUP_SEPARATOR + group;
151+
ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);
152+
if (null == map) {
153+
map = new ConcurrentHashMap<>();
154+
ConcurrentMap<Integer, Long> previous = resetOffsetTable.putIfAbsent(key, map);
155+
if (null != previous) {
156+
map = previous;
157+
}
158+
}
159+
map.put(queueId, offset);
160+
161+
lmqOffsetTable.computeIfPresent(key, (k, oldValue) -> offset);
162+
}
135163
}

0 commit comments

Comments
 (0)