Skip to content

Commit 2e7cc14

Browse files
committed
fix error in offsetManager when in broadcast
1 parent e4b731c commit 2e7cc14

5 files changed

Lines changed: 39 additions & 28 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ protected void removeConsumerOffset(String topicAtGroup) {
5959
}
6060

6161
String[] topicGroup = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
62-
if (topicGroup.length != 2) {
62+
if (!validateOffsetTableKey(topicAtGroup)) {
6363
LOG.error("Invalid topic group: {}", topicAtGroup);
6464
return;
6565
}
@@ -452,7 +452,7 @@ public long queryPullOffset(String group, String topic, int queueId) {
452452
public void assignResetOffset(String topic, String group, int queueId, long offset) {
453453
if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(group) || queueId < 0 || offset < 0) {
454454
LOG.warn("Illegal arguments when assigning reset offset. Topic={}, group={}, queueId={}, offset={}",
455-
topic, group, queueId, offset);
455+
topic, group, queueId, offset);
456456
return;
457457
}
458458
if (!MixAll.isLmq(topic) || !MixAll.isLmq(group)) {

broker/src/main/java/org/apache/rocketmq/broker/offset/BroadcastOffsetManager.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.rocketmq.common.ServiceThread;
2828
import org.apache.rocketmq.store.exception.ConsumeQueueException;
2929

30+
import static org.apache.rocketmq.common.MixAll.BROADCAST_KEY;
31+
3032
/**
3133
* manage the offset of broadcast.
3234
* now, use this to support switch remoting client between proxy and broker
@@ -169,7 +171,7 @@ protected void scanOffsetData() {
169171

170172
queueMinOffset.forEach((queueId, offset) ->
171173
this.brokerController.getConsumerOffsetManager().commitOffset("BroadcastOffset",
172-
broadcastGroupId(broadcastOffsetData.group), broadcastOffsetData.topic, queueId, offset));
174+
broadcastGroupId(broadcastOffsetData.group), broadcastOffsetData.topic, queueId, offset));
173175
}
174176
}
175177

@@ -182,7 +184,7 @@ private String buildKey(String topic, String group) {
182184
* @return the groupId used to commit offset
183185
*/
184186
private static String broadcastGroupId(String group) {
185-
return group + TOPIC_GROUP_SEPARATOR + "broadcast";
187+
return group + TOPIC_GROUP_SEPARATOR + BROADCAST_KEY;
186188
}
187189

188190
@Override

broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.rocketmq.broker.offset;
1818

19+
import com.google.common.base.Strings;
1920
import com.google.common.collect.Maps;
2021
import java.util.HashMap;
2122
import java.util.HashSet;
@@ -26,9 +27,6 @@
2627
import java.util.concurrent.ConcurrentHashMap;
2728
import java.util.concurrent.ConcurrentMap;
2829
import java.util.concurrent.atomic.AtomicLong;
29-
30-
import com.google.common.base.Strings;
31-
3230
import java.util.function.Function;
3331
import org.apache.rocketmq.broker.BrokerController;
3432
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
@@ -41,6 +39,8 @@
4139
import org.apache.rocketmq.remoting.protocol.DataVersion;
4240
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
4341

42+
import static org.apache.rocketmq.common.MixAll.BROADCAST_KEY;
43+
4444
public class ConsumerOffsetManager extends ConfigManager {
4545
protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
4646
public static final String TOPIC_GROUP_SEPARATOR = "@";
@@ -78,7 +78,7 @@ public void cleanOffset(String group) {
7878
String topicAtGroup = next.getKey();
7979
if (topicAtGroup.contains(group)) {
8080
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
81-
if (arrays.length == 2 && group.equals(arrays[1])) {
81+
if (validateOffsetTableKey(topicAtGroup) && group.equals(arrays[1])) {
8282
it.remove();
8383
removeConsumerOffset(topicAtGroup);
8484
LOG.warn("Clean group's offset, {}, {}", topicAtGroup, next.getValue());
@@ -94,7 +94,7 @@ public void cleanOffsetByTopic(String topic) {
9494
String topicAtGroup = next.getKey();
9595
if (topicAtGroup.contains(topic)) {
9696
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
97-
if (arrays.length == 2 && topic.equals(arrays[0])) {
97+
if (validateOffsetTableKey(topicAtGroup) && topic.equals(arrays[0])) {
9898
it.remove();
9999
removeConsumerOffset(topicAtGroup);
100100
LOG.warn("Clean topic's offset, {}, {}", topicAtGroup, next.getValue());
@@ -109,7 +109,7 @@ public void scanUnsubscribedTopic() {
109109
Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
110110
String topicAtGroup = next.getKey();
111111
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
112-
if (arrays.length == 2) {
112+
if (validateOffsetTableKey(topicAtGroup)) {
113113
String topic = arrays[0];
114114
String group = arrays[1];
115115

@@ -139,13 +139,12 @@ private boolean offsetBehindMuchThanData(final String topic, ConcurrentMap<Integ
139139

140140
public Set<String> whichTopicByConsumer(final String group) {
141141
Set<String> topics = new HashSet<>();
142-
143142
Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
144143
while (it.hasNext()) {
145144
Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
146145
String topicAtGroup = next.getKey();
147146
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
148-
if (arrays.length == 2) {
147+
if (validateOffsetTableKey(topicAtGroup)) {
149148
if (group.equals(arrays[1])) {
150149
topics.add(arrays[0]);
151150
}
@@ -163,7 +162,7 @@ public Set<String> whichGroupByTopic(final String topic) {
163162
Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
164163
String topicAtGroup = next.getKey();
165164
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
166-
if (arrays.length == 2) {
165+
if (validateOffsetTableKey(topicAtGroup)) {
167166
if (topic.equals(arrays[0])) {
168167
groups.add(arrays[1]);
169168
}
@@ -178,7 +177,7 @@ public Map<String, Set<String>> getGroupTopicMap() {
178177

179178
for (String key : this.offsetTable.keySet()) {
180179
String[] arr = key.split(TOPIC_GROUP_SEPARATOR);
181-
if (arr.length == 2) {
180+
if (validateOffsetTableKey(key)) {
182181
String topic = arr[0];
183182
String group = arr[1];
184183

@@ -232,8 +231,9 @@ public void commitPullOffset(final String clientHost, final String group, final
232231
/**
233232
* If the target queue has temporary reset offset, return the reset-offset.
234233
* Otherwise, return the current consume offset in the offset store.
235-
* @param group Consumer group
236-
* @param topic Topic
234+
*
235+
* @param group Consumer group
236+
* @param topic Topic
237237
* @param queueId Queue ID
238238
* @return current consume offset or reset offset if there were one.
239239
*/
@@ -261,8 +261,9 @@ public long queryOffset(final String group, final String topic, final int queueI
261261

262262
/**
263263
* Query pull offset in pullOffsetTable
264-
* @param group Consumer group
265-
* @param topic Topic
264+
*
265+
* @param group Consumer group
266+
* @param topic Topic
266267
* @param queueId Queue ID
267268
* @return latest pull offset of consumer group
268269
*/
@@ -330,7 +331,7 @@ public Map<Integer, Long> queryMinOffsetInAllGroup(final String topic, final Str
330331
Iterator<String> it = topicGroups.iterator();
331332
while (it.hasNext()) {
332333
String topicAtGroup = it.next();
333-
if (group.equals(topicAtGroup.split(TOPIC_GROUP_SEPARATOR)[1])) {
334+
if (validateOffsetTableKey(topicAtGroup) && group.equals(topicAtGroup.split(TOPIC_GROUP_SEPARATOR)[1])) {
334335
it.remove();
335336
removeConsumerOffset(topicAtGroup);
336337
}
@@ -341,7 +342,7 @@ public Map<Integer, Long> queryMinOffsetInAllGroup(final String topic, final Str
341342
for (Map.Entry<String, ConcurrentMap<Integer, Long>> offSetEntry : this.offsetTable.entrySet()) {
342343
String topicGroup = offSetEntry.getKey();
343344
String[] topicGroupArr = topicGroup.split(TOPIC_GROUP_SEPARATOR);
344-
if (topic.equals(topicGroupArr[0])) {
345+
if (validateOffsetTableKey(topicGroup) && topic.equals(topicGroupArr[0])) {
345346
for (Entry<Integer, Long> entry : offSetEntry.getValue().entrySet()) {
346347
long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, entry.getKey());
347348
if (entry.getValue() >= minOffset) {
@@ -407,7 +408,7 @@ public void removeOffset(final String group) {
407408
String topicAtGroup = entry.getKey();
408409
if (topicAtGroup.contains(group)) {
409410
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
410-
if (arrays.length == 2 && group.equals(arrays[1])) {
411+
if (validateOffsetTableKey(topicAtGroup) && group.equals(arrays[1])) {
411412
it.remove();
412413
removeConsumerOffset(topicAtGroup);
413414
removed = true;
@@ -461,4 +462,12 @@ public Long queryThenEraseResetOffset(String topic, String group, Integer queueI
461462
return map.remove(queueId);
462463
}
463464
}
465+
466+
public boolean validateOffsetTableKey(String key) {
467+
String[] arr = key.split(TOPIC_GROUP_SEPARATOR);
468+
if (arr.length == 2 || (arr.length == 3 && BROADCAST_KEY.equals(arr[2]))) {
469+
return true;
470+
}
471+
return false;
472+
}
464473
}

broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,12 @@
1616
*/
1717
package org.apache.rocketmq.broker.offset;
1818

19+
import com.google.common.base.Strings;
1920
import java.util.HashMap;
2021
import java.util.Iterator;
2122
import java.util.Map;
2223
import java.util.concurrent.ConcurrentHashMap;
2324
import java.util.concurrent.ConcurrentMap;
24-
25-
import com.google.common.base.Strings;
2625
import org.apache.rocketmq.broker.BrokerController;
2726
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
2827
import org.apache.rocketmq.common.MixAll;
@@ -126,7 +125,7 @@ public void removeOffset(String group) {
126125
String topicAtGroup = next.getKey();
127126
if (topicAtGroup.contains(group)) {
128127
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
129-
if (arrays.length == 2 && group.equals(arrays[1])) {
128+
if (validateOffsetTableKey(topicAtGroup) && group.equals(arrays[1])) {
130129
it.remove();
131130
removeConsumerOffset(topicAtGroup);
132131
LOG.warn("clean lmq group offset {}", topicAtGroup);
@@ -139,7 +138,7 @@ public void removeOffset(String group) {
139138
public void assignResetOffset(String topic, String group, int queueId, long offset) {
140139
if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(group) || queueId < 0 || offset < 0) {
141140
LOG.warn("Illegal arguments when assigning reset offset. Topic={}, group={}, queueId={}, offset={}",
142-
topic, group, queueId, offset);
141+
topic, group, queueId, offset);
143142
return;
144143
}
145144
if (!MixAll.isLmq(topic) || !MixAll.isLmq(group)) {

common/src/main/java/org/apache/rocketmq/common/MixAll.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.rocketmq.common;
1818

19+
import com.google.common.collect.ImmutableSet;
1920
import java.io.ByteArrayInputStream;
2021
import java.io.File;
2122
import java.io.FileInputStream;
@@ -43,8 +44,6 @@
4344
import java.util.TreeMap;
4445
import java.util.concurrent.atomic.AtomicLong;
4546
import java.util.function.Predicate;
46-
47-
import com.google.common.collect.ImmutableSet;
4847
import org.apache.commons.lang3.StringUtils;
4948
import org.apache.rocketmq.common.annotation.ImportantField;
5049
import org.apache.rocketmq.common.constant.LoggerName;
@@ -141,6 +140,7 @@ public class MixAll {
141140
CID_ONSAPI_PULL_GROUP,
142141
CID_SYS_RMQ_TRANS
143142
);
143+
public static final String BROADCAST_KEY = "broadcast";
144144

145145
public static boolean isWindows() {
146146
return OS.contains("win");
@@ -182,7 +182,8 @@ public static boolean isSysConsumerGroup(final String consumerGroup) {
182182
return consumerGroup.startsWith(CID_RMQ_SYS_PREFIX);
183183
}
184184

185-
public static boolean isSysConsumerGroupAndEnableCreate(final String consumerGroup, final boolean isEnableCreateSysGroup) {
185+
public static boolean isSysConsumerGroupAndEnableCreate(final String consumerGroup,
186+
final boolean isEnableCreateSysGroup) {
186187
return isEnableCreateSysGroup && isSysConsumerGroup(consumerGroup);
187188
}
188189

0 commit comments

Comments
 (0)