Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.rocketmq.broker.util.PositiveAtomicCounter;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
Expand All @@ -44,15 +45,23 @@ public class ProducerManager {
new ConcurrentHashMap<>();
private final ConcurrentMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>();
protected final BrokerStatsManager brokerStatsManager;
private final BrokerConfig brokerConfig;
private final PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
private final List<ProducerChangeListener> producerChangeListenerList = new CopyOnWriteArrayList<>();

public ProducerManager() {
this.brokerStatsManager = null;
this.brokerConfig = null;
}

public ProducerManager(final BrokerStatsManager brokerStatsManager) {
this.brokerStatsManager = brokerStatsManager;
this.brokerConfig = null;
}

public ProducerManager(final BrokerStatsManager brokerStatsManager, final BrokerConfig brokerConfig) {
this.brokerStatsManager = brokerStatsManager;
this.brokerConfig = brokerConfig;
}

public int groupSize() {
Expand Down Expand Up @@ -162,9 +171,33 @@ public boolean doChannelCloseEvent(final String remoteAddr, final Channel channe
}

public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {

long start = System.currentTimeMillis();
ClientChannelInfo clientChannelInfoFound;

ConcurrentMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
// note that we must take care of the exist groups and channels,
// only can return when groups or channels not exist.
if (this.brokerConfig != null
&& !this.brokerConfig.isEnableRegisterProducer()
&& this.brokerConfig.isRejectTransactionMessage()) {
boolean needRegister = true;
if (null == channelTable) {
needRegister = false;
} else {
clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
if (null == clientChannelInfoFound) {
needRegister = false;
}
}
if (!needRegister) {
if (null != this.brokerStatsManager) {
this.brokerStatsManager.incProducerRegisterTime((int) (System.currentTimeMillis() - start));
}
return;
}
}

if (null == channelTable) {
channelTable = new ConcurrentHashMap<>();
// Make sure channelTable will NOT be cleaned by #scanNotActiveChannel
Expand All @@ -190,6 +223,10 @@ public void registerProducer(final String group, final ClientChannelInfo clientC
if (clientChannelInfoFound != null) {
clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
}

if (null != this.brokerStatsManager) {
this.brokerStatsManager.incProducerRegisterTime((int) (System.currentTimeMillis() - start));
}
}

public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.Map;

import java.util.concurrent.atomic.AtomicReference;

import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -150,6 +152,20 @@ public void testRegisterProducer() throws Exception {
assertThat(channel1).isEqualTo(channel);
}

@Test
public void testRegisterProducerWhenRegisterProducerIsNotEnabled() throws Exception {
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setEnableRegisterProducer(false);
brokerConfig.setRejectTransactionMessage(true);
ProducerManager producerManager = new ProducerManager(null, brokerConfig);

producerManager.registerProducer(group, clientInfo);
Map<Channel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group);
Channel channel1 = producerManager.findChannel("clientId");
assertThat(channelMap).isNull();
assertThat(channel1).isNull();
}

@Test
public void unregisterProducer() throws Exception {
producerManager.registerProducer(group, clientInfo);
Expand Down
10 changes: 10 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,8 @@ public class BrokerConfig extends BrokerIdentity {

private boolean recallMessageEnable = false;

private boolean enableRegisterProducer = true;

public String getConfigBlackList() {
return configBlackList;
}
Expand Down Expand Up @@ -2006,4 +2008,12 @@ public boolean isRecallMessageEnable() {
public void setRecallMessageEnable(boolean recallMessageEnable) {
this.recallMessageEnable = recallMessageEnable;
}

public boolean isEnableRegisterProducer() {
return enableRegisterProducer;
}

public void setEnableRegisterProducer(boolean enableRegisterProducer) {
this.enableRegisterProducer = enableRegisterProducer;
}
}
Loading