2323import java .util .concurrent .ConcurrentHashMap ;
2424import java .util .concurrent .ScheduledExecutorService ;
2525import java .util .concurrent .TimeUnit ;
26+ import java .util .concurrent .atomic .AtomicBoolean ;
27+
2628import org .apache .rocketmq .broker .BrokerController ;
2729import org .apache .rocketmq .common .AbstractBrokerRunnable ;
2830import org .apache .rocketmq .common .constant .LoggerName ;
@@ -41,6 +43,8 @@ public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListen
4143
4244 private ConcurrentHashMap <String ,List <Channel >> consumerChannelMap = new ConcurrentHashMap <>(cacheSize );
4345
46+ private final ConcurrentHashMap <String , NotifyTaskControl > activeGroupNotifyMap = new ConcurrentHashMap <>();
47+
4448 public DefaultConsumerIdsChangeListener (BrokerController brokerController ) {
4549 this .brokerController = brokerController ;
4650
@@ -70,9 +74,25 @@ public void handle(ConsumerGroupEvent event, String group, Object... args) {
7074 List <Channel > channels = (List <Channel >) args [0 ];
7175 if (channels != null && brokerController .getBrokerConfig ().isNotifyConsumerIdsChangedEnable ()) {
7276 if (this .brokerController .getBrokerConfig ().isRealTimeNotifyConsumerChange ()) {
73- for (Channel chl : channels ) {
77+ NotifyTaskControl currentNotifyTaskControl = new NotifyTaskControl (channels );
78+ activeGroupNotifyMap .compute (group , (k , oldVal ) -> {
79+ if (null != oldVal ) {
80+ oldVal .interrupt ();
81+ }
82+ return currentNotifyTaskControl ;
83+ });
84+
85+ boolean isNormalCompletion = true ;
86+ for (Channel chl : currentNotifyTaskControl .getChannels ()) {
87+ if (currentNotifyTaskControl .isInterrupted ()) {
88+ isNormalCompletion = false ;
89+ break ;
90+ }
7491 this .brokerController .getBroker2Client ().notifyConsumerIdsChanged (chl , group );
7592 }
93+ if (isNormalCompletion ) {
94+ activeGroupNotifyMap .computeIfPresent (group , (k , val ) -> val == currentNotifyTaskControl ? null : val );
95+ }
7696 } else {
7797 consumerChannelMap .put (group , channels );
7898 }
@@ -125,4 +145,27 @@ private void notifyConsumerChange() {
125145 public void shutdown () {
126146 this .scheduledExecutorService .shutdown ();
127147 }
148+
149+ private static class NotifyTaskControl {
150+
151+ private final AtomicBoolean interrupted = new AtomicBoolean (false );
152+
153+ private final List <Channel > channels ;
154+
155+ public NotifyTaskControl (List <Channel > channels ) {
156+ this .channels = channels ;
157+ }
158+
159+ public boolean isInterrupted () {
160+ return interrupted .get ();
161+ }
162+
163+ public void interrupt () {
164+ interrupted .set (true );
165+ }
166+
167+ public List <Channel > getChannels () {
168+ return channels ;
169+ }
170+ }
128171}
0 commit comments