Skip to content

Commit 775eb64

Browse files
authored
[ISSUE #9288] Support the disablement of producer registration and fast channel shutdown (#9293)
1 parent 881507e commit 775eb64

7 files changed

Lines changed: 257 additions & 10 deletions

File tree

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.broker.client;
18+
19+
import io.netty.channel.Channel;
20+
import io.netty.util.AttributeKey;
21+
22+
import java.util.Arrays;
23+
import java.util.Collections;
24+
import java.util.List;
25+
26+
public class ClientChannelAttributeHelper {
27+
private static final AttributeKey<String> ATTR_CG = AttributeKey.valueOf("CHANNEL_CONSUMER_GROUP");
28+
private static final AttributeKey<String> ATTR_PG = AttributeKey.valueOf("CHANNEL_PRODUCER_GROUP");
29+
private static final String SEPARATOR = "|";
30+
31+
public static void addProducerGroup(Channel channel, String group) {
32+
addGroup(channel, group, ATTR_PG);
33+
}
34+
35+
public static void addConsumerGroup(Channel channel, String group) {
36+
addGroup(channel, group, ATTR_CG);
37+
}
38+
39+
public static List<String> getProducerGroups(Channel channel) {
40+
return getGroups(channel, ATTR_PG);
41+
}
42+
43+
public static List<String> getConsumerGroups(Channel channel) {
44+
return getGroups(channel, ATTR_CG);
45+
}
46+
47+
private static void addGroup(Channel channel, String group, AttributeKey<String> key) {
48+
if (null == channel || !channel.isActive()) { // no side effect if check active status.
49+
return;
50+
}
51+
if (null == group || group.length() == 0 || null == key) {
52+
return;
53+
}
54+
String groups = channel.attr(key).get();
55+
if (null == groups) {
56+
channel.attr(key).set(group + SEPARATOR);
57+
} else {
58+
if (groups.contains(SEPARATOR + group + SEPARATOR)) {
59+
return;
60+
} else {
61+
channel.attr(key).compareAndSet(groups, groups + group + SEPARATOR);
62+
}
63+
}
64+
}
65+
66+
private static List<String> getGroups(Channel channel, AttributeKey<String> key) {
67+
if (null == channel) {
68+
return Collections.emptyList();
69+
}
70+
if (null == key) {
71+
return Collections.emptyList();
72+
}
73+
String groups = channel.attr(key).get();
74+
return null == groups ? Collections.<String>emptyList() : Arrays.asList(groups.split("\\|"));
75+
}
76+
77+
}

broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,14 @@ public class ConsumerManager {
4848
protected final BrokerStatsManager brokerStatsManager;
4949
private final long channelExpiredTimeout;
5050
private final long subscriptionExpiredTimeout;
51+
private final BrokerConfig brokerConfig;
5152

5253
public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener, long expiredTimeout) {
5354
this.consumerIdsChangeListenerList.add(consumerIdsChangeListener);
5455
this.brokerStatsManager = null;
5556
this.channelExpiredTimeout = expiredTimeout;
5657
this.subscriptionExpiredTimeout = expiredTimeout;
58+
this.brokerConfig = null;
5759
}
5860

5961
public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener,
@@ -62,6 +64,7 @@ public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener
6264
this.brokerStatsManager = brokerStatsManager;
6365
this.channelExpiredTimeout = brokerConfig.getChannelExpiredTimeout();
6466
this.subscriptionExpiredTimeout = brokerConfig.getSubscriptionExpiredTimeout();
67+
this.brokerConfig = brokerConfig;
6568
}
6669

6770
public ClientChannelInfo findChannel(final String group, final String clientId) {
@@ -130,12 +133,43 @@ public int findSubscriptionDataCount(final String group) {
130133

131134
public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {
132135
boolean removed = false;
136+
if (this.brokerConfig != null && this.brokerConfig.isEnableFastChannelEventProcess()) {
137+
List<String> groups = ClientChannelAttributeHelper.getConsumerGroups(channel);
138+
if (this.brokerConfig.isPrintChannelGroups() && groups.size() >= 5 && groups.size() >= this.brokerConfig.getPrintChannelGroupsMinNum()) {
139+
LOGGER.warn("channel close event, too many consumer groups one channel, {}, {}, {}", groups.size(), remoteAddr, groups);
140+
}
141+
for (String group : groups) {
142+
if (null == group || group.length() == 0) {
143+
continue;
144+
}
145+
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
146+
if (null == consumerGroupInfo) {
147+
continue;
148+
}
149+
ClientChannelInfo clientChannelInfo = consumerGroupInfo.doChannelCloseEvent(remoteAddr, channel);
150+
if (clientChannelInfo != null) {
151+
removed = true;
152+
callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_UNREGISTER, group, clientChannelInfo, consumerGroupInfo.getSubscribeTopics());
153+
if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
154+
ConsumerGroupInfo remove = this.consumerTable.remove(group);
155+
if (remove != null) {
156+
LOGGER.info("unregister consumer ok, no any connection, and remove consumer group, {}",
157+
group);
158+
callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, group);
159+
}
160+
}
161+
callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
162+
}
163+
}
164+
return removed;
165+
}
133166
Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
134167
while (it.hasNext()) {
135168
Entry<String, ConsumerGroupInfo> next = it.next();
136169
ConsumerGroupInfo info = next.getValue();
137170
ClientChannelInfo clientChannelInfo = info.doChannelCloseEvent(remoteAddr, channel);
138171
if (clientChannelInfo != null) {
172+
removed = true;
139173
callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_UNREGISTER, next.getKey(), clientChannelInfo, info.getSubscribeTopics());
140174
if (info.getChannelInfoTable().isEmpty()) {
141175
ConsumerGroupInfo remove = this.consumerTable.remove(next.getKey());
@@ -201,6 +235,11 @@ public boolean registerConsumer(final String group, final ClientChannelInfo clie
201235
callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
202236
}
203237
}
238+
239+
if (this.brokerConfig != null && this.brokerConfig.isEnableFastChannelEventProcess() && r1) {
240+
ClientChannelAttributeHelper.addConsumerGroup(clientChannelInfo.getChannel(), group);
241+
}
242+
204243
if (null != this.brokerStatsManager) {
205244
this.brokerStatsManager.incConsumerRegisterTime((int) (System.currentTimeMillis() - start));
206245
}

broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java

Lines changed: 74 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.concurrent.ConcurrentMap;
2929
import java.util.concurrent.CopyOnWriteArrayList;
3030
import org.apache.rocketmq.broker.util.PositiveAtomicCounter;
31+
import org.apache.rocketmq.common.BrokerConfig;
3132
import org.apache.rocketmq.common.constant.LoggerName;
3233
import org.apache.rocketmq.logging.org.slf4j.Logger;
3334
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -44,15 +45,23 @@ public class ProducerManager {
4445
new ConcurrentHashMap<>();
4546
private final ConcurrentMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>();
4647
protected final BrokerStatsManager brokerStatsManager;
48+
private final BrokerConfig brokerConfig;
4749
private final PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
4850
private final List<ProducerChangeListener> producerChangeListenerList = new CopyOnWriteArrayList<>();
4951

5052
public ProducerManager() {
5153
this.brokerStatsManager = null;
54+
this.brokerConfig = null;
5255
}
5356

5457
public ProducerManager(final BrokerStatsManager brokerStatsManager) {
5558
this.brokerStatsManager = brokerStatsManager;
59+
this.brokerConfig = null;
60+
}
61+
62+
public ProducerManager(final BrokerStatsManager brokerStatsManager, final BrokerConfig brokerConfig) {
63+
this.brokerStatsManager = brokerStatsManager;
64+
this.brokerConfig = brokerConfig;
5665
}
5766

5867
public int groupSize() {
@@ -136,6 +145,39 @@ public void scanNotActiveChannel() {
136145
public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {
137146
boolean removed = false;
138147
if (channel != null) {
148+
if (this.brokerConfig != null && this.brokerConfig.isEnableFastChannelEventProcess()) {
149+
List<String> groups = ClientChannelAttributeHelper.getProducerGroups(channel);
150+
if (this.brokerConfig.isPrintChannelGroups() && groups.size() >= 5 && groups.size() >= this.brokerConfig.getPrintChannelGroupsMinNum()) {
151+
log.warn("channel close event, too many producer groups one channel, {}, {}, {}", groups.size(), remoteAddr, groups);
152+
}
153+
for (String group : groups) {
154+
if (null == group || group.length() == 0) {
155+
continue;
156+
}
157+
ConcurrentMap<Channel, ClientChannelInfo> clientChannelInfoTable = this.groupChannelTable.get(group);
158+
if (null == clientChannelInfoTable) {
159+
continue;
160+
}
161+
final ClientChannelInfo clientChannelInfo =
162+
clientChannelInfoTable.remove(channel);
163+
if (clientChannelInfo != null) {
164+
clientChannelTable.remove(clientChannelInfo.getClientId());
165+
removed = true;
166+
log.info(
167+
"NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
168+
clientChannelInfo.toString(), remoteAddr, group);
169+
callProducerChangeListener(ProducerGroupEvent.CLIENT_UNREGISTER, group, clientChannelInfo);
170+
if (clientChannelInfoTable.isEmpty()) {
171+
ConcurrentMap<Channel, ClientChannelInfo> oldGroupTable = this.groupChannelTable.remove(group);
172+
if (oldGroupTable != null) {
173+
log.info("unregister a producer group[{}] from groupChannelTable", group);
174+
callProducerChangeListener(ProducerGroupEvent.GROUP_UNREGISTER, group, null);
175+
}
176+
}
177+
}
178+
}
179+
return removed; // must return here, degrade to scanNotActiveChannel at worst.
180+
}
139181
for (final Map.Entry<String, ConcurrentMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable.entrySet()) {
140182
final String group = entry.getKey();
141183
final ConcurrentMap<Channel, ClientChannelInfo> clientChannelInfoTable = entry.getValue();
@@ -162,20 +204,37 @@ public boolean doChannelCloseEvent(final String remoteAddr, final Channel channe
162204
}
163205

164206
public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
207+
208+
long start = System.currentTimeMillis();
165209
ClientChannelInfo clientChannelInfoFound;
166210

167211
ConcurrentMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
212+
// note that we must take care of the exist groups and channels,
213+
// only can return when groups or channels not exist.
214+
if (this.brokerConfig != null
215+
&& !this.brokerConfig.isEnableRegisterProducer()
216+
&& this.brokerConfig.isRejectTransactionMessage()) {
217+
boolean needRegister = true;
218+
if (null == channelTable) {
219+
needRegister = false;
220+
} else {
221+
clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
222+
if (null == clientChannelInfoFound) {
223+
needRegister = false;
224+
}
225+
}
226+
if (!needRegister) {
227+
if (null != this.brokerStatsManager) {
228+
this.brokerStatsManager.incProducerRegisterTime((int) (System.currentTimeMillis() - start));
229+
}
230+
return;
231+
}
232+
}
233+
168234
if (null == channelTable) {
169235
channelTable = new ConcurrentHashMap<>();
170-
// Make sure channelTable will NOT be cleaned by #scanNotActiveChannel
171-
channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
172236
ConcurrentMap<Channel, ClientChannelInfo> prev = this.groupChannelTable.putIfAbsent(group, channelTable);
173-
if (null == prev) {
174-
// Add client-id to channel mapping for new producer group
175-
clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());
176-
} else {
177-
channelTable = prev;
178-
}
237+
channelTable = prev != null ? prev : channelTable;
179238
}
180239

181240
clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
@@ -184,12 +243,19 @@ public void registerProducer(final String group, final ClientChannelInfo clientC
184243
channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
185244
clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());
186245
log.info("new producer connected, group: {} channel: {}", group, clientChannelInfo.toString());
246+
if (this.brokerConfig != null && this.brokerConfig.isEnableFastChannelEventProcess()) {
247+
ClientChannelAttributeHelper.addProducerGroup(clientChannelInfo.getChannel(), group);
248+
}
187249
}
188250

189251
// Refresh existing client-channel-info update-timestamp
190252
if (clientChannelInfoFound != null) {
191253
clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
192254
}
255+
256+
if (null != this.brokerStatsManager) {
257+
this.brokerStatsManager.incProducerRegisterTime((int) (System.currentTimeMillis() - start));
258+
}
193259
}
194260

195261
public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {

broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.util.Map;
2323

2424
import java.util.concurrent.atomic.AtomicReference;
25+
26+
import org.apache.rocketmq.common.BrokerConfig;
2527
import org.apache.rocketmq.remoting.protocol.LanguageCode;
2628
import org.junit.Before;
2729
import org.junit.Test;
@@ -36,6 +38,8 @@
3638

3739
@RunWith(MockitoJUnitRunner.class)
3840
public class ProducerManagerTest {
41+
42+
private BrokerConfig brokerConfig;
3943
private ProducerManager producerManager;
4044
private String group = "FooBar";
4145
private ClientChannelInfo clientInfo;
@@ -45,7 +49,8 @@ public class ProducerManagerTest {
4549

4650
@Before
4751
public void init() {
48-
producerManager = new ProducerManager();
52+
brokerConfig = new BrokerConfig();
53+
producerManager = new ProducerManager(null, brokerConfig);
4954
clientInfo = new ClientChannelInfo(channel, "clientId", LanguageCode.JAVA, 0);
5055
}
5156

@@ -140,10 +145,20 @@ public void doChannelCloseEvent() throws Exception {
140145
}
141146

142147
@Test
143-
public void testRegisterProducer() throws Exception {
148+
public void testRegisterProducer() {
149+
brokerConfig.setEnableRegisterProducer(false);
150+
brokerConfig.setRejectTransactionMessage(true);
144151
producerManager.registerProducer(group, clientInfo);
145152
Map<Channel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group);
146153
Channel channel1 = producerManager.findChannel("clientId");
154+
assertThat(channelMap).isNull();
155+
assertThat(channel1).isNull();
156+
157+
brokerConfig.setEnableRegisterProducer(true);
158+
brokerConfig.setRejectTransactionMessage(false);
159+
producerManager.registerProducer(group, clientInfo);
160+
channelMap = producerManager.getGroupChannelTable().get(group);
161+
channel1 = producerManager.findChannel("clientId");
147162
assertThat(channelMap).isNotNull();
148163
assertThat(channel1).isNotNull();
149164
assertThat(channelMap.get(channel)).isEqualTo(clientInfo);

client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,8 @@ public ProduceAccumulator getOrCreateProduceAccumulator(final ClientConfig clien
8585
public void removeClientFactory(final String clientId) {
8686
this.factoryTable.remove(clientId);
8787
}
88+
89+
public ConcurrentMap<String, MQClientInstance> getFactoryTable() {
90+
return factoryTable;
91+
}
8892
}

client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1395,6 +1395,14 @@ public ClientConfig getClientConfig() {
13951395
return clientConfig;
13961396
}
13971397

1398+
public ConcurrentMap<String, MQProducerInner> getProducerTable() {
1399+
return producerTable;
1400+
}
1401+
1402+
public ConcurrentMap<String, MQConsumerInner> getConsumerTable() {
1403+
return consumerTable;
1404+
}
1405+
13981406
public TopicRouteData queryTopicRouteData(String topic) {
13991407
TopicRouteData data = this.getAnExistTopicRouteData(topic);
14001408
if (data == null) {

0 commit comments

Comments
 (0)