From ff689d6d10033f1d25f3f2e97f54593163e58793 Mon Sep 17 00:00:00 2001 From: "maowei.ymw" Date: Fri, 28 Mar 2025 15:45:27 +0800 Subject: [PATCH 1/2] support reject producer when not transaction producer --- .../broker/client/ProducerManager.java | 33 +++++++++++++++++++ .../broker/client/ProducerManagerTest.java | 16 +++++++++ .../apache/rocketmq/common/BrokerConfig.java | 10 ++++++ 3 files changed, 59 insertions(+) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java index 2c3acb6ba9b..65c42aa084c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java @@ -29,6 +29,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import org.apache.rocketmq.broker.util.PositiveAtomicCounter; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; @@ -44,15 +45,23 @@ public class ProducerManager { new ConcurrentHashMap<>(); private final ConcurrentMap clientChannelTable = new ConcurrentHashMap<>(); protected final BrokerStatsManager brokerStatsManager; + private final BrokerConfig brokerConfig; private final PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter(); private final List producerChangeListenerList = new CopyOnWriteArrayList<>(); public ProducerManager() { this.brokerStatsManager = null; + this.brokerConfig = null; } public ProducerManager(final BrokerStatsManager brokerStatsManager) { this.brokerStatsManager = brokerStatsManager; + this.brokerConfig = null; + } + + public ProducerManager(final BrokerStatsManager brokerStatsManager, final BrokerConfig brokerConfig) { + this.brokerStatsManager = brokerStatsManager; + this.brokerConfig = brokerConfig; } public int groupSize() { @@ -162,9 +171,33 @@ public boolean doChannelCloseEvent(final String remoteAddr, final Channel channe } public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) { + + long start = System.currentTimeMillis(); ClientChannelInfo clientChannelInfoFound; ConcurrentMap channelTable = this.groupChannelTable.get(group); + // note that we must take care of the exist groups and channels, + // only can return when groups or channels not exist. + if (this.brokerConfig != null + && !this.brokerConfig.isEnableRegisterProducer() + && this.brokerConfig.isRejectTransactionMessage()) { + boolean needRegister = true; + if (null == channelTable) { + needRegister = false; + } else { + clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel()); + if (null == clientChannelInfoFound) { + needRegister = false; + } + } + if (!needRegister) { + if (null != this.brokerStatsManager) { + this.brokerStatsManager.incProducerRegisterTime((int) (System.currentTimeMillis() - start)); + } + return; + } + } + if (null == channelTable) { channelTable = new ConcurrentHashMap<>(); // Make sure channelTable will NOT be cleaned by #scanNotActiveChannel diff --git a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java index 3d6091e02fb..0274babf341 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java @@ -22,6 +22,8 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicReference; + +import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.junit.Before; import org.junit.Test; @@ -150,6 +152,20 @@ public void testRegisterProducer() throws Exception { assertThat(channel1).isEqualTo(channel); } + @Test + public void testRegisterProducerWhenRegisterProducerIsNotEnabled() throws Exception { + BrokerConfig brokerConfig = new BrokerConfig(); + brokerConfig.setEnableRegisterProducer(false); + brokerConfig.setRejectTransactionMessage(true); + ProducerManager producerManager = new ProducerManager(null, brokerConfig); + + producerManager.registerProducer(group, clientInfo); + Map channelMap = producerManager.getGroupChannelTable().get(group); + Channel channel1 = producerManager.findChannel("clientId"); + assertThat(channelMap).isNull(); + assertThat(channel1).isNull(); + } + @Test public void unregisterProducer() throws Exception { producerManager.registerProducer(group, clientInfo); diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index dd345449351..d60987b7081 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -455,6 +455,8 @@ public class BrokerConfig extends BrokerIdentity { private boolean recallMessageEnable = false; + private boolean enableRegisterProducer = true; + public String getConfigBlackList() { return configBlackList; } @@ -2006,4 +2008,12 @@ public boolean isRecallMessageEnable() { public void setRecallMessageEnable(boolean recallMessageEnable) { this.recallMessageEnable = recallMessageEnable; } + + public boolean isEnableRegisterProducer() { + return enableRegisterProducer; + } + + public void setEnableRegisterProducer(boolean enableRegisterProducer) { + this.enableRegisterProducer = enableRegisterProducer; + } } From 301cdb2ff3049ac42d796e3710cd37924c7bee33 Mon Sep 17 00:00:00 2001 From: "maowei.ymw" Date: Fri, 28 Mar 2025 16:44:39 +0800 Subject: [PATCH 2/2] support reject producer when not transaction producer --- .../org/apache/rocketmq/broker/client/ProducerManager.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java index 65c42aa084c..1e8b92d75b1 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java @@ -223,6 +223,10 @@ public void registerProducer(final String group, final ClientChannelInfo clientC if (clientChannelInfoFound != null) { clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis()); } + + if (null != this.brokerStatsManager) { + this.brokerStatsManager.incProducerRegisterTime((int) (System.currentTimeMillis() - start)); + } } public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {