Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ protected void removeConsumerOffset(String topicAtGroup) {
}

String[] topicGroup = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
if (topicGroup.length != 2) {
if (!validateOffsetTableKey(topicAtGroup)) {
LOG.error("Invalid topic group: {}", topicAtGroup);
return;
}
Expand Down Expand Up @@ -452,7 +452,7 @@ public long queryPullOffset(String group, String topic, int queueId) {
public void assignResetOffset(String topic, String group, int queueId, long offset) {
if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(group) || queueId < 0 || offset < 0) {
LOG.warn("Illegal arguments when assigning reset offset. Topic={}, group={}, queueId={}, offset={}",
topic, group, queueId, offset);
topic, group, queueId, offset);
return;
}
if (!MixAll.isLmq(topic) || !MixAll.isLmq(group)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.store.exception.ConsumeQueueException;

import static org.apache.rocketmq.common.MixAll.BROADCAST_KEY;

/**
* manage the offset of broadcast.
* now, use this to support switch remoting client between proxy and broker
Expand Down Expand Up @@ -169,7 +171,7 @@ protected void scanOffsetData() {

queueMinOffset.forEach((queueId, offset) ->
this.brokerController.getConsumerOffsetManager().commitOffset("BroadcastOffset",
broadcastGroupId(broadcastOffsetData.group), broadcastOffsetData.topic, queueId, offset));
broadcastGroupId(broadcastOffsetData.group), broadcastOffsetData.topic, queueId, offset));
}
}

Expand All @@ -182,7 +184,7 @@ private String buildKey(String topic, String group) {
* @return the groupId used to commit offset
*/
private static String broadcastGroupId(String group) {
return group + TOPIC_GROUP_SEPARATOR + "broadcast";
return group + TOPIC_GROUP_SEPARATOR + BROADCAST_KEY;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.broker.offset;

import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -26,9 +27,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.base.Strings;

import java.util.function.Function;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
Expand All @@ -41,20 +39,19 @@
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;

import static org.apache.rocketmq.common.MixAll.BROADCAST_KEY;

public class ConsumerOffsetManager extends ConfigManager {
protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
public static final String TOPIC_GROUP_SEPARATOR = "@";

protected DataVersion dataVersion = new DataVersion();

protected ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
new ConcurrentHashMap<>(512);
protected ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable = new ConcurrentHashMap<>(512);

protected final ConcurrentMap<String, ConcurrentMap<Integer, Long>> resetOffsetTable =
new ConcurrentHashMap<>(512);
protected final ConcurrentMap<String, ConcurrentMap<Integer, Long>> resetOffsetTable = new ConcurrentHashMap<>(512);

private final ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> pullOffsetTable =
new ConcurrentHashMap<>(512);
private final ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> pullOffsetTable = new ConcurrentHashMap<>(512);

protected transient BrokerController brokerController;

Expand All @@ -78,7 +75,7 @@ public void cleanOffset(String group) {
String topicAtGroup = next.getKey();
if (topicAtGroup.contains(group)) {
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
if (arrays.length == 2 && group.equals(arrays[1])) {
if (validateOffsetTableKey(topicAtGroup) && group.equals(arrays[1])) {
it.remove();
removeConsumerOffset(topicAtGroup);
LOG.warn("Clean group's offset, {}, {}", topicAtGroup, next.getValue());
Expand All @@ -94,7 +91,7 @@ public void cleanOffsetByTopic(String topic) {
String topicAtGroup = next.getKey();
if (topicAtGroup.contains(topic)) {
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
if (arrays.length == 2 && topic.equals(arrays[0])) {
if (validateOffsetTableKey(topicAtGroup) && topic.equals(arrays[0])) {
it.remove();
removeConsumerOffset(topicAtGroup);
LOG.warn("Clean topic's offset, {}, {}", topicAtGroup, next.getValue());
Expand All @@ -109,12 +106,11 @@ public void scanUnsubscribedTopic() {
Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
String topicAtGroup = next.getKey();
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
if (arrays.length == 2) {
if (validateOffsetTableKey(topicAtGroup)) {
String topic = arrays[0];
String group = arrays[1];

if (null == brokerController.getConsumerManager().findSubscriptionData(group, topic)
&& this.offsetBehindMuchThanData(topic, next.getValue())) {
if (null == brokerController.getConsumerManager().findSubscriptionData(group, topic) && this.offsetBehindMuchThanData(topic, next.getValue())) {
it.remove();
removeConsumerOffset(topicAtGroup);
LOG.warn("remove topic offset, {}", topicAtGroup);
Expand All @@ -139,13 +135,12 @@ private boolean offsetBehindMuchThanData(final String topic, ConcurrentMap<Integ

public Set<String> whichTopicByConsumer(final String group) {
Set<String> topics = new HashSet<>();

Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
String topicAtGroup = next.getKey();
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
if (arrays.length == 2) {
if (validateOffsetTableKey(topicAtGroup)) {
if (group.equals(arrays[1])) {
topics.add(arrays[0]);
}
Expand All @@ -163,7 +158,7 @@ public Set<String> whichGroupByTopic(final String topic) {
Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
String topicAtGroup = next.getKey();
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
if (arrays.length == 2) {
if (validateOffsetTableKey(topicAtGroup)) {
if (topic.equals(arrays[0])) {
groups.add(arrays[1]);
}
Expand All @@ -178,7 +173,7 @@ public Map<String, Set<String>> getGroupTopicMap() {

for (String key : this.offsetTable.keySet()) {
String[] arr = key.split(TOPIC_GROUP_SEPARATOR);
if (arr.length == 2) {
if (validateOffsetTableKey(key)) {
String topic = arr[0];
String group = arr[1];

Expand Down Expand Up @@ -224,16 +219,16 @@ public void commitPullOffset(final String clientHost, final String group, final
final long offset) {
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
ConcurrentMap<Integer, Long> map = this.pullOffsetTable.computeIfAbsent(
key, k -> new ConcurrentHashMap<>(32));
ConcurrentMap<Integer, Long> map = this.pullOffsetTable.computeIfAbsent(key, k -> new ConcurrentHashMap<>(32));
map.put(queueId, offset);
}

/**
* If the target queue has temporary reset offset, return the reset-offset.
* Otherwise, return the current consume offset in the offset store.
* @param group Consumer group
* @param topic Topic
*
* @param group Consumer group
* @param topic Topic
* @param queueId Queue ID
* @return current consume offset or reset offset if there were one.
*/
Expand Down Expand Up @@ -261,8 +256,9 @@ public long queryOffset(final String group, final String topic, final int queueI

/**
* Query pull offset in pullOffsetTable
* @param group Consumer group
* @param topic Topic
*
* @param group Consumer group
* @param topic Topic
* @param queueId Queue ID
* @return latest pull offset of consumer group
*/
Expand Down Expand Up @@ -330,7 +326,7 @@ public Map<Integer, Long> queryMinOffsetInAllGroup(final String topic, final Str
Iterator<String> it = topicGroups.iterator();
while (it.hasNext()) {
String topicAtGroup = it.next();
if (group.equals(topicAtGroup.split(TOPIC_GROUP_SEPARATOR)[1])) {
if (validateOffsetTableKey(topicAtGroup) && group.equals(topicAtGroup.split(TOPIC_GROUP_SEPARATOR)[1])) {
it.remove();
removeConsumerOffset(topicAtGroup);
}
Expand All @@ -341,7 +337,7 @@ public Map<Integer, Long> queryMinOffsetInAllGroup(final String topic, final Str
for (Map.Entry<String, ConcurrentMap<Integer, Long>> offSetEntry : this.offsetTable.entrySet()) {
String topicGroup = offSetEntry.getKey();
String[] topicGroupArr = topicGroup.split(TOPIC_GROUP_SEPARATOR);
if (topic.equals(topicGroupArr[0])) {
if (validateOffsetTableKey(topicGroup) && topic.equals(topicGroupArr[0])) {
for (Entry<Integer, Long> entry : offSetEntry.getValue().entrySet()) {
long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, entry.getKey());
if (entry.getValue() >= minOffset) {
Expand Down Expand Up @@ -407,7 +403,7 @@ public void removeOffset(final String group) {
String topicAtGroup = entry.getKey();
if (topicAtGroup.contains(group)) {
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
if (arrays.length == 2 && group.equals(arrays[1])) {
if (validateOffsetTableKey(topicAtGroup) && group.equals(arrays[1])) {
it.remove();
removeConsumerOffset(topicAtGroup);
removed = true;
Expand All @@ -421,14 +417,12 @@ public void removeOffset(final String group) {
boolean clearReset = deleteFunction.apply(this.resetOffsetTable.entrySet().iterator());
boolean clearPull = deleteFunction.apply(this.pullOffsetTable.entrySet().iterator());

LOG.info("Consumer offset manager clean group offset, groupName={}, " +
"offsetTable={}, resetOffsetTable={}, pullOffsetTable={}", group, clearOffset, clearReset, clearPull);
LOG.info("Consumer offset manager clean group offset, groupName={}, " + "offsetTable={}, resetOffsetTable={}, pullOffsetTable={}", group, clearOffset, clearReset, clearPull);
}

public void assignResetOffset(String topic, String group, int queueId, long offset) {
if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(group) || queueId < 0 || offset < 0) {
LOG.warn("Illegal arguments when assigning reset offset. Topic={}, group={}, queueId={}, offset={}",
topic, group, queueId, offset);
LOG.warn("Illegal arguments when assigning reset offset. Topic={}, group={}, queueId={}, offset={}", topic, group, queueId, offset);
return;
}

Expand Down Expand Up @@ -461,4 +455,9 @@ public Long queryThenEraseResetOffset(String topic, String group, Integer queueI
return map.remove(queueId);
}
}
}

public boolean validateOffsetTableKey(String key) {
String[] arr = key.split(TOPIC_GROUP_SEPARATOR);
return arr.length == 2 || arr.length == 3 && BROADCAST_KEY.equals(arr[2]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
*/
package org.apache.rocketmq.broker.offset;

import com.google.common.base.Strings;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import com.google.common.base.Strings;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.MixAll;
Expand Down Expand Up @@ -126,7 +125,7 @@ public void removeOffset(String group) {
String topicAtGroup = next.getKey();
if (topicAtGroup.contains(group)) {
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
if (arrays.length == 2 && group.equals(arrays[1])) {
if (validateOffsetTableKey(topicAtGroup) && group.equals(arrays[1])) {
it.remove();
removeConsumerOffset(topicAtGroup);
LOG.warn("clean lmq group offset {}", topicAtGroup);
Expand All @@ -139,7 +138,7 @@ public void removeOffset(String group) {
public void assignResetOffset(String topic, String group, int queueId, long offset) {
if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(group) || queueId < 0 || offset < 0) {
LOG.warn("Illegal arguments when assigning reset offset. Topic={}, group={}, queueId={}, offset={}",
topic, group, queueId, offset);
topic, group, queueId, offset);
return;
}
if (!MixAll.isLmq(topic) || !MixAll.isLmq(group)) {
Expand Down
7 changes: 4 additions & 3 deletions common/src/main/java/org/apache/rocketmq/common/MixAll.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.common;

import com.google.common.collect.ImmutableSet;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
Expand Down Expand Up @@ -43,8 +44,6 @@
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;

import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.common.constant.LoggerName;
Expand Down Expand Up @@ -141,6 +140,7 @@ public class MixAll {
CID_ONSAPI_PULL_GROUP,
CID_SYS_RMQ_TRANS
);
public static final String BROADCAST_KEY = "broadcast";

public static boolean isWindows() {
return OS.contains("win");
Expand Down Expand Up @@ -182,7 +182,8 @@ public static boolean isSysConsumerGroup(final String consumerGroup) {
return consumerGroup.startsWith(CID_RMQ_SYS_PREFIX);
}

public static boolean isSysConsumerGroupAndEnableCreate(final String consumerGroup, final boolean isEnableCreateSysGroup) {
public static boolean isSysConsumerGroupAndEnableCreate(final String consumerGroup,
final boolean isEnableCreateSysGroup) {
return isEnableCreateSysGroup && isSysConsumerGroup(consumerGroup);
}

Expand Down
Loading