Skip to content

Commit 77c1ba4

Browse files
golden-yangRongtongJin
authored andcommitted
[ISSUE #9351] Add topic-group mapping in queryTopicConsumeByWho command (#9352)
1 parent fb848db commit 77c1ba4

1 file changed

Lines changed: 40 additions & 8 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ public class ConsumerManager {
4242
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
4343
private final ConcurrentMap<String, ConsumerGroupInfo> consumerTable =
4444
new ConcurrentHashMap<>(1024);
45+
private final ConcurrentMap<String, Set<String>> topicGroupTable =
46+
new ConcurrentHashMap<>(1024);
4547
private final ConcurrentMap<String, ConsumerGroupInfo> consumerCompensationTable =
4648
new ConcurrentHashMap<>(1024);
4749
private final List<ConsumerIdsChangeListener> consumerIdsChangeListenerList = new CopyOnWriteArrayList<>();
@@ -156,6 +158,7 @@ public boolean doChannelCloseEvent(final String remoteAddr, final Channel channe
156158
LOGGER.info("unregister consumer ok, no any connection, and remove consumer group, {}",
157159
group);
158160
callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, group);
161+
clearTopicGroupTable(remove);
159162
}
160163
}
161164
callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
@@ -177,6 +180,7 @@ public boolean doChannelCloseEvent(final String remoteAddr, final Channel channe
177180
LOGGER.info("unregister consumer ok, no any connection, and remove consumer group, {}",
178181
next.getKey());
179182
callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, next.getKey());
183+
clearTopicGroupTable(remove);
180184
}
181185
}
182186
if (!isBroadcastMode(info.getMessageModel())) {
@@ -187,6 +191,18 @@ public boolean doChannelCloseEvent(final String remoteAddr, final Channel channe
187191
return removed;
188192
}
189193

194+
private void clearTopicGroupTable(final ConsumerGroupInfo groupInfo) {
195+
for (String subscribeTopic : groupInfo.getSubscribeTopics()) {
196+
Set<String> groups = this.topicGroupTable.get(subscribeTopic);
197+
if (groups != null) {
198+
groups.remove(groupInfo.getGroupName());
199+
}
200+
if (groups != null && groups.isEmpty()) {
201+
this.topicGroupTable.remove(subscribeTopic);
202+
}
203+
}
204+
}
205+
190206
// compensate consumer info for consumer without heartbeat
191207
public void compensateBasicConsumerInfo(String group, ConsumeType consumeType, MessageModel messageModel) {
192208
ConsumerGroupInfo consumerGroupInfo = consumerCompensationTable.computeIfAbsent(group, ConsumerGroupInfo::new);
@@ -218,6 +234,16 @@ public boolean registerConsumer(final String group, final ClientChannelInfo clie
218234
consumerGroupInfo = prev != null ? prev : tmp;
219235
}
220236

237+
for (SubscriptionData subscriptionData : subList) {
238+
Set<String> groups = this.topicGroupTable.get(subscriptionData.getTopic());
239+
if (groups == null) {
240+
Set<String> tmp = new HashSet<>();
241+
Set<String> prev = this.topicGroupTable.putIfAbsent(subscriptionData.getTopic(), tmp);
242+
groups = prev != null ? prev : tmp;
243+
}
244+
groups.add(subscriptionData.getTopic());
245+
}
246+
221247
boolean r1 =
222248
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
223249
consumeFromWhere);
@@ -258,6 +284,17 @@ public boolean registerConsumerWithoutSub(final String group, final ClientChanne
258284
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
259285
consumerGroupInfo = prev != null ? prev : tmp;
260286
}
287+
288+
for (SubscriptionData subscriptionData : consumerGroupInfo.getSubscriptionTable().values()) {
289+
Set<String> groups = this.topicGroupTable.get(subscriptionData.getTopic());
290+
if (groups == null) {
291+
Set<String> tmp = new HashSet<>();
292+
Set<String> prev = this.topicGroupTable.putIfAbsent(subscriptionData.getTopic(), tmp);
293+
groups = prev != null ? prev : tmp;
294+
}
295+
groups.add(subscriptionData.getTopic());
296+
}
297+
261298
boolean updateChannelRst = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere);
262299
if (updateChannelRst && isNotifyConsumerIdsChangedEnable && !isBroadcastMode(consumerGroupInfo.getMessageModel())) {
263300
callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
@@ -282,6 +319,7 @@ public void unregisterConsumer(final String group, final ClientChannelInfo clien
282319
LOGGER.info("unregister consumer ok, no any connection, and remove consumer group, {}", group);
283320

284321
callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, group);
322+
clearTopicGroupTable(remove);
285323
}
286324
}
287325
if (isNotifyConsumerIdsChangedEnable && !isBroadcastMode(consumerGroupInfo.getMessageModel())) {
@@ -349,14 +387,8 @@ public void scanNotActiveChannel() {
349387

350388
public HashSet<String> queryTopicConsumeByWho(final String topic) {
351389
HashSet<String> groups = new HashSet<>();
352-
Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
353-
while (it.hasNext()) {
354-
Entry<String, ConsumerGroupInfo> entry = it.next();
355-
ConcurrentMap<String, SubscriptionData> subscriptionTable =
356-
entry.getValue().getSubscriptionTable();
357-
if (subscriptionTable.containsKey(topic)) {
358-
groups.add(entry.getKey());
359-
}
390+
if (this.topicGroupTable.get(topic) != null) {
391+
groups.addAll(this.topicGroupTable.get(topic));
360392
}
361393
return groups;
362394
}

0 commit comments

Comments
 (0)