@@ -42,6 +42,8 @@ public class ConsumerManager {
4242 private static final Logger LOGGER = LoggerFactory .getLogger (LoggerName .BROKER_LOGGER_NAME );
4343 private final ConcurrentMap <String , ConsumerGroupInfo > consumerTable =
4444 new ConcurrentHashMap <>(1024 );
45+ private final ConcurrentMap <String , Set <String >> topicGroupTable =
46+ new ConcurrentHashMap <>(1024 );
4547 private final ConcurrentMap <String , ConsumerGroupInfo > consumerCompensationTable =
4648 new ConcurrentHashMap <>(1024 );
4749 private final List <ConsumerIdsChangeListener > consumerIdsChangeListenerList = new CopyOnWriteArrayList <>();
@@ -156,6 +158,7 @@ public boolean doChannelCloseEvent(final String remoteAddr, final Channel channe
156158 LOGGER .info ("unregister consumer ok, no any connection, and remove consumer group, {}" ,
157159 group );
158160 callConsumerIdsChangeListener (ConsumerGroupEvent .UNREGISTER , group );
161+ clearTopicGroupTable (remove );
159162 }
160163 }
161164 callConsumerIdsChangeListener (ConsumerGroupEvent .CHANGE , group , consumerGroupInfo .getAllChannel ());
@@ -177,6 +180,7 @@ public boolean doChannelCloseEvent(final String remoteAddr, final Channel channe
177180 LOGGER .info ("unregister consumer ok, no any connection, and remove consumer group, {}" ,
178181 next .getKey ());
179182 callConsumerIdsChangeListener (ConsumerGroupEvent .UNREGISTER , next .getKey ());
183+ clearTopicGroupTable (remove );
180184 }
181185 }
182186 if (!isBroadcastMode (info .getMessageModel ())) {
@@ -187,6 +191,18 @@ public boolean doChannelCloseEvent(final String remoteAddr, final Channel channe
187191 return removed ;
188192 }
189193
194+ private void clearTopicGroupTable (final ConsumerGroupInfo groupInfo ) {
195+ for (String subscribeTopic : groupInfo .getSubscribeTopics ()) {
196+ Set <String > groups = this .topicGroupTable .get (subscribeTopic );
197+ if (groups != null ) {
198+ groups .remove (groupInfo .getGroupName ());
199+ }
200+ if (groups != null && groups .isEmpty ()) {
201+ this .topicGroupTable .remove (subscribeTopic );
202+ }
203+ }
204+ }
205+
190206 // compensate consumer info for consumer without heartbeat
191207 public void compensateBasicConsumerInfo (String group , ConsumeType consumeType , MessageModel messageModel ) {
192208 ConsumerGroupInfo consumerGroupInfo = consumerCompensationTable .computeIfAbsent (group , ConsumerGroupInfo ::new );
@@ -218,6 +234,16 @@ public boolean registerConsumer(final String group, final ClientChannelInfo clie
218234 consumerGroupInfo = prev != null ? prev : tmp ;
219235 }
220236
237+ for (SubscriptionData subscriptionData : subList ) {
238+ Set <String > groups = this .topicGroupTable .get (subscriptionData .getTopic ());
239+ if (groups == null ) {
240+ Set <String > tmp = new HashSet <>();
241+ Set <String > prev = this .topicGroupTable .putIfAbsent (subscriptionData .getTopic (), tmp );
242+ groups = prev != null ? prev : tmp ;
243+ }
244+ groups .add (subscriptionData .getTopic ());
245+ }
246+
221247 boolean r1 =
222248 consumerGroupInfo .updateChannel (clientChannelInfo , consumeType , messageModel ,
223249 consumeFromWhere );
@@ -258,6 +284,17 @@ public boolean registerConsumerWithoutSub(final String group, final ClientChanne
258284 ConsumerGroupInfo prev = this .consumerTable .putIfAbsent (group , tmp );
259285 consumerGroupInfo = prev != null ? prev : tmp ;
260286 }
287+
288+ for (SubscriptionData subscriptionData : consumerGroupInfo .getSubscriptionTable ().values ()) {
289+ Set <String > groups = this .topicGroupTable .get (subscriptionData .getTopic ());
290+ if (groups == null ) {
291+ Set <String > tmp = new HashSet <>();
292+ Set <String > prev = this .topicGroupTable .putIfAbsent (subscriptionData .getTopic (), tmp );
293+ groups = prev != null ? prev : tmp ;
294+ }
295+ groups .add (subscriptionData .getTopic ());
296+ }
297+
261298 boolean updateChannelRst = consumerGroupInfo .updateChannel (clientChannelInfo , consumeType , messageModel , consumeFromWhere );
262299 if (updateChannelRst && isNotifyConsumerIdsChangedEnable && !isBroadcastMode (consumerGroupInfo .getMessageModel ())) {
263300 callConsumerIdsChangeListener (ConsumerGroupEvent .CHANGE , group , consumerGroupInfo .getAllChannel ());
@@ -282,6 +319,7 @@ public void unregisterConsumer(final String group, final ClientChannelInfo clien
282319 LOGGER .info ("unregister consumer ok, no any connection, and remove consumer group, {}" , group );
283320
284321 callConsumerIdsChangeListener (ConsumerGroupEvent .UNREGISTER , group );
322+ clearTopicGroupTable (remove );
285323 }
286324 }
287325 if (isNotifyConsumerIdsChangedEnable && !isBroadcastMode (consumerGroupInfo .getMessageModel ())) {
@@ -349,14 +387,8 @@ public void scanNotActiveChannel() {
349387
350388 public HashSet <String > queryTopicConsumeByWho (final String topic ) {
351389 HashSet <String > groups = new HashSet <>();
352- Iterator <Entry <String , ConsumerGroupInfo >> it = this .consumerTable .entrySet ().iterator ();
353- while (it .hasNext ()) {
354- Entry <String , ConsumerGroupInfo > entry = it .next ();
355- ConcurrentMap <String , SubscriptionData > subscriptionTable =
356- entry .getValue ().getSubscriptionTable ();
357- if (subscriptionTable .containsKey (topic )) {
358- groups .add (entry .getKey ());
359- }
390+ if (this .topicGroupTable .get (topic ) != null ) {
391+ groups .addAll (this .topicGroupTable .get (topic ));
360392 }
361393 return groups ;
362394 }
0 commit comments