diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java index e14ac0bb628..ed71bba700b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java @@ -59,7 +59,7 @@ protected void removeConsumerOffset(String topicAtGroup) { } String[] topicGroup = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); - if (topicGroup.length != 2) { + if (!validateOffsetTableKey(topicAtGroup)) { LOG.error("Invalid topic group: {}", topicAtGroup); return; } @@ -452,7 +452,7 @@ public long queryPullOffset(String group, String topic, int queueId) { public void assignResetOffset(String topic, String group, int queueId, long offset) { if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(group) || queueId < 0 || offset < 0) { LOG.warn("Illegal arguments when assigning reset offset. Topic={}, group={}, queueId={}, offset={}", - topic, group, queueId, offset); + topic, group, queueId, offset); return; } if (!MixAll.isLmq(topic) || !MixAll.isLmq(group)) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/BroadcastOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/BroadcastOffsetManager.java index 79bb0c771d6..b88fdf8010e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/BroadcastOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/BroadcastOffsetManager.java @@ -27,6 +27,8 @@ import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.store.exception.ConsumeQueueException; +import static org.apache.rocketmq.common.MixAll.BROADCAST_KEY; + /** * manage the offset of broadcast. * now, use this to support switch remoting client between proxy and broker @@ -169,7 +171,7 @@ protected void scanOffsetData() { queueMinOffset.forEach((queueId, offset) -> this.brokerController.getConsumerOffsetManager().commitOffset("BroadcastOffset", - broadcastGroupId(broadcastOffsetData.group), broadcastOffsetData.topic, queueId, offset)); + broadcastGroupId(broadcastOffsetData.group), broadcastOffsetData.topic, queueId, offset)); } } @@ -182,7 +184,7 @@ private String buildKey(String topic, String group) { * @return the groupId used to commit offset */ private static String broadcastGroupId(String group) { - return group + TOPIC_GROUP_SEPARATOR + "broadcast"; + return group + TOPIC_GROUP_SEPARATOR + BROADCAST_KEY; } @Override diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index a6cd9ad987d..d7e5c20ed41 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.broker.offset; +import com.google.common.base.Strings; import com.google.common.collect.Maps; import java.util.HashMap; import java.util.HashSet; @@ -26,9 +27,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; - -import com.google.common.base.Strings; - import java.util.function.Function; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; @@ -41,20 +39,19 @@ import org.apache.rocketmq.remoting.protocol.DataVersion; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; +import static org.apache.rocketmq.common.MixAll.BROADCAST_KEY; + public class ConsumerOffsetManager extends ConfigManager { protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); public static final String TOPIC_GROUP_SEPARATOR = "@"; protected DataVersion dataVersion = new DataVersion(); - protected ConcurrentMap> offsetTable = - new ConcurrentHashMap<>(512); + protected ConcurrentMap> offsetTable = new ConcurrentHashMap<>(512); - protected final ConcurrentMap> resetOffsetTable = - new ConcurrentHashMap<>(512); + protected final ConcurrentMap> resetOffsetTable = new ConcurrentHashMap<>(512); - private final ConcurrentMap> pullOffsetTable = - new ConcurrentHashMap<>(512); + private final ConcurrentMap> pullOffsetTable = new ConcurrentHashMap<>(512); protected transient BrokerController brokerController; @@ -78,7 +75,7 @@ public void cleanOffset(String group) { String topicAtGroup = next.getKey(); if (topicAtGroup.contains(group)) { String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); - if (arrays.length == 2 && group.equals(arrays[1])) { + if (validateOffsetTableKey(topicAtGroup) && group.equals(arrays[1])) { it.remove(); removeConsumerOffset(topicAtGroup); LOG.warn("Clean group's offset, {}, {}", topicAtGroup, next.getValue()); @@ -94,7 +91,7 @@ public void cleanOffsetByTopic(String topic) { String topicAtGroup = next.getKey(); if (topicAtGroup.contains(topic)) { String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); - if (arrays.length == 2 && topic.equals(arrays[0])) { + if (validateOffsetTableKey(topicAtGroup) && topic.equals(arrays[0])) { it.remove(); removeConsumerOffset(topicAtGroup); LOG.warn("Clean topic's offset, {}, {}", topicAtGroup, next.getValue()); @@ -109,12 +106,11 @@ public void scanUnsubscribedTopic() { Entry> next = it.next(); String topicAtGroup = next.getKey(); String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); - if (arrays.length == 2) { + if (validateOffsetTableKey(topicAtGroup)) { String topic = arrays[0]; String group = arrays[1]; - if (null == brokerController.getConsumerManager().findSubscriptionData(group, topic) - && this.offsetBehindMuchThanData(topic, next.getValue())) { + if (null == brokerController.getConsumerManager().findSubscriptionData(group, topic) && this.offsetBehindMuchThanData(topic, next.getValue())) { it.remove(); removeConsumerOffset(topicAtGroup); LOG.warn("remove topic offset, {}", topicAtGroup); @@ -139,13 +135,12 @@ private boolean offsetBehindMuchThanData(final String topic, ConcurrentMap whichTopicByConsumer(final String group) { Set topics = new HashSet<>(); - Iterator>> it = this.offsetTable.entrySet().iterator(); while (it.hasNext()) { Entry> next = it.next(); String topicAtGroup = next.getKey(); String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); - if (arrays.length == 2) { + if (validateOffsetTableKey(topicAtGroup)) { if (group.equals(arrays[1])) { topics.add(arrays[0]); } @@ -163,7 +158,7 @@ public Set whichGroupByTopic(final String topic) { Entry> next = it.next(); String topicAtGroup = next.getKey(); String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); - if (arrays.length == 2) { + if (validateOffsetTableKey(topicAtGroup)) { if (topic.equals(arrays[0])) { groups.add(arrays[1]); } @@ -178,7 +173,7 @@ public Map> getGroupTopicMap() { for (String key : this.offsetTable.keySet()) { String[] arr = key.split(TOPIC_GROUP_SEPARATOR); - if (arr.length == 2) { + if (validateOffsetTableKey(key)) { String topic = arr[0]; String group = arr[1]; @@ -224,16 +219,16 @@ public void commitPullOffset(final String clientHost, final String group, final final long offset) { // topic@group String key = topic + TOPIC_GROUP_SEPARATOR + group; - ConcurrentMap map = this.pullOffsetTable.computeIfAbsent( - key, k -> new ConcurrentHashMap<>(32)); + ConcurrentMap map = this.pullOffsetTable.computeIfAbsent(key, k -> new ConcurrentHashMap<>(32)); map.put(queueId, offset); } /** * If the target queue has temporary reset offset, return the reset-offset. * Otherwise, return the current consume offset in the offset store. - * @param group Consumer group - * @param topic Topic + * + * @param group Consumer group + * @param topic Topic * @param queueId Queue ID * @return current consume offset or reset offset if there were one. */ @@ -261,8 +256,9 @@ public long queryOffset(final String group, final String topic, final int queueI /** * Query pull offset in pullOffsetTable - * @param group Consumer group - * @param topic Topic + * + * @param group Consumer group + * @param topic Topic * @param queueId Queue ID * @return latest pull offset of consumer group */ @@ -330,7 +326,7 @@ public Map queryMinOffsetInAllGroup(final String topic, final Str Iterator it = topicGroups.iterator(); while (it.hasNext()) { String topicAtGroup = it.next(); - if (group.equals(topicAtGroup.split(TOPIC_GROUP_SEPARATOR)[1])) { + if (validateOffsetTableKey(topicAtGroup) && group.equals(topicAtGroup.split(TOPIC_GROUP_SEPARATOR)[1])) { it.remove(); removeConsumerOffset(topicAtGroup); } @@ -341,7 +337,7 @@ public Map queryMinOffsetInAllGroup(final String topic, final Str for (Map.Entry> offSetEntry : this.offsetTable.entrySet()) { String topicGroup = offSetEntry.getKey(); String[] topicGroupArr = topicGroup.split(TOPIC_GROUP_SEPARATOR); - if (topic.equals(topicGroupArr[0])) { + if (validateOffsetTableKey(topicGroup) && topic.equals(topicGroupArr[0])) { for (Entry entry : offSetEntry.getValue().entrySet()) { long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, entry.getKey()); if (entry.getValue() >= minOffset) { @@ -407,7 +403,7 @@ public void removeOffset(final String group) { String topicAtGroup = entry.getKey(); if (topicAtGroup.contains(group)) { String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); - if (arrays.length == 2 && group.equals(arrays[1])) { + if (validateOffsetTableKey(topicAtGroup) && group.equals(arrays[1])) { it.remove(); removeConsumerOffset(topicAtGroup); removed = true; @@ -421,14 +417,12 @@ public void removeOffset(final String group) { boolean clearReset = deleteFunction.apply(this.resetOffsetTable.entrySet().iterator()); boolean clearPull = deleteFunction.apply(this.pullOffsetTable.entrySet().iterator()); - LOG.info("Consumer offset manager clean group offset, groupName={}, " + - "offsetTable={}, resetOffsetTable={}, pullOffsetTable={}", group, clearOffset, clearReset, clearPull); + LOG.info("Consumer offset manager clean group offset, groupName={}, " + "offsetTable={}, resetOffsetTable={}, pullOffsetTable={}", group, clearOffset, clearReset, clearPull); } public void assignResetOffset(String topic, String group, int queueId, long offset) { if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(group) || queueId < 0 || offset < 0) { - LOG.warn("Illegal arguments when assigning reset offset. Topic={}, group={}, queueId={}, offset={}", - topic, group, queueId, offset); + LOG.warn("Illegal arguments when assigning reset offset. Topic={}, group={}, queueId={}, offset={}", topic, group, queueId, offset); return; } @@ -461,4 +455,9 @@ public Long queryThenEraseResetOffset(String topic, String group, Integer queueI return map.remove(queueId); } } -} + + public boolean validateOffsetTableKey(String key) { + String[] arr = key.split(TOPIC_GROUP_SEPARATOR); + return arr.length == 2 || arr.length == 3 && BROADCAST_KEY.equals(arr[2]); + } +} \ No newline at end of file diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java index a565ad07c3a..d9c7b424bd9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java @@ -16,13 +16,12 @@ */ package org.apache.rocketmq.broker.offset; +import com.google.common.base.Strings; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; - -import com.google.common.base.Strings; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.MixAll; @@ -126,7 +125,7 @@ public void removeOffset(String group) { String topicAtGroup = next.getKey(); if (topicAtGroup.contains(group)) { String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); - if (arrays.length == 2 && group.equals(arrays[1])) { + if (validateOffsetTableKey(topicAtGroup) && group.equals(arrays[1])) { it.remove(); removeConsumerOffset(topicAtGroup); LOG.warn("clean lmq group offset {}", topicAtGroup); @@ -139,7 +138,7 @@ public void removeOffset(String group) { public void assignResetOffset(String topic, String group, int queueId, long offset) { if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(group) || queueId < 0 || offset < 0) { LOG.warn("Illegal arguments when assigning reset offset. Topic={}, group={}, queueId={}, offset={}", - topic, group, queueId, offset); + topic, group, queueId, offset); return; } if (!MixAll.isLmq(topic) || !MixAll.isLmq(group)) { diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index 2c298b240c4..c1909f90da2 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.common; +import com.google.common.collect.ImmutableSet; import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileInputStream; @@ -43,8 +44,6 @@ import java.util.TreeMap; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; - -import com.google.common.collect.ImmutableSet; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.annotation.ImportantField; import org.apache.rocketmq.common.constant.LoggerName; @@ -141,6 +140,7 @@ public class MixAll { CID_ONSAPI_PULL_GROUP, CID_SYS_RMQ_TRANS ); + public static final String BROADCAST_KEY = "broadcast"; public static boolean isWindows() { return OS.contains("win"); @@ -182,7 +182,8 @@ public static boolean isSysConsumerGroup(final String consumerGroup) { return consumerGroup.startsWith(CID_RMQ_SYS_PREFIX); } - public static boolean isSysConsumerGroupAndEnableCreate(final String consumerGroup, final boolean isEnableCreateSysGroup) { + public static boolean isSysConsumerGroupAndEnableCreate(final String consumerGroup, + final boolean isEnableCreateSysGroup) { return isEnableCreateSysGroup && isSysConsumerGroup(consumerGroup); }