Skip to content

Commit 7967edf

Browse files
authored
[ISSUE #9677] Resolve metrics static variable conflicts in BrokerContainer mode (#9678)
* Cherry-pick partial changes from 67db1757df - BrokerMetricsManager refactoring - Excluded DefaultMappedFile, METRICS_REFACTORING_GUIDE.md, TimerMetrics, and RocksDB files per requirements - Successfully applied changes to most broker and store modules - Compilation errors in TimerMessageStore.java will be fixed in next commit * fix: Resolve metrics static variable conflicts in BrokerContainer mode Convert static metrics variables to instance-level to fix resource leaks and data conflicts in BrokerContainer scenarios with multiple broker instances. ## Problem Statement In BrokerContainer mode, multiple broker instances share static metrics variables from BrokerMetricsManager and DefaultStoreMetricsManager, causing: - Metrics data conflicts between different broker instances - Resource leaks during frequent addBroker/removeBroker operations - Incorrect metrics aggregation across multiple brokers ## Solution - Convert static metrics variables to instance-level variables - Add proper getter methods for external access - Ensure each broker instance maintains isolated metrics - Apply instanceof checks for type safety in TimerMessageStore ## Files Modified - broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java - store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java - METRICS_REFACTORING_GUIDE.md (documentation) ## Key Benefits ✅ Eliminates metrics conflicts between broker instances ✅ Prevents resource leaks in dynamic broker scenarios ✅ Maintains proper metrics isolation per broker ✅ Supports BrokerContainer mode with multiple brokers ✅ Backward compatible with existing functionality Resolves metrics static variable issues in multi-broker container environments. * Delete useless file * Fix test can not pass * Fix test can not pass
1 parent c3c6832 commit 7967edf

27 files changed

Lines changed: 365 additions & 157 deletions

broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,10 @@ public BrokerMetricsManager getBrokerMetricsManager() {
500500
return brokerMetricsManager;
501501
}
502502

503+
public void setBrokerMetricsManager(BrokerMetricsManager brokerMetricsManager) {
504+
this.brokerMetricsManager = brokerMetricsManager;
505+
}
506+
503507
protected void initializeRemotingServer() throws CloneNotSupportedException {
504508
RemotingServer tcpRemotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
505509
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();

broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java

Lines changed: 85 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -121,46 +121,45 @@ public class BrokerMetricsManager {
121121
private final MessageStore messageStore;
122122
private final BrokerController brokerController;
123123
private final ConsumerLagCalculator consumerLagCalculator;
124-
private final static Map<String, String> LABEL_MAP = new HashMap<>();
124+
private final Map<String, String> labelMap = new HashMap<>();
125125
private OtlpGrpcMetricExporter metricExporter;
126126
private PeriodicMetricReader periodicMetricReader;
127127
private PrometheusHttpServer prometheusHttpServer;
128128
private MetricExporter loggingMetricExporter;
129129
private Meter brokerMeter;
130130

131-
public static Supplier<AttributesBuilder> attributesBuilderSupplier = Attributes::builder;
131+
private Supplier<AttributesBuilder> attributesBuilderSupplier = Attributes::builder;
132132

133133
// broker stats metrics
134-
public static ObservableLongGauge processorWatermark = new NopObservableLongGauge();
135-
public static ObservableLongGauge brokerPermission = new NopObservableLongGauge();
136-
public static ObservableLongGauge topicNum = new NopObservableLongGauge();
137-
public static ObservableLongGauge consumerGroupNum = new NopObservableLongGauge();
138-
134+
private ObservableLongGauge processorWatermark = new NopObservableLongGauge();
135+
private ObservableLongGauge brokerPermission = new NopObservableLongGauge();
136+
private ObservableLongGauge topicNum = new NopObservableLongGauge();
137+
private ObservableLongGauge consumerGroupNum = new NopObservableLongGauge();
139138

140139
// request metrics
141-
public static LongCounter messagesInTotal = new NopLongCounter();
142-
public static LongCounter messagesOutTotal = new NopLongCounter();
143-
public static LongCounter throughputInTotal = new NopLongCounter();
144-
public static LongCounter throughputOutTotal = new NopLongCounter();
145-
public static LongHistogram messageSize = new NopLongHistogram();
146-
public static LongHistogram topicCreateExecuteTime = new NopLongHistogram();
147-
public static LongHistogram consumerGroupCreateExecuteTime = new NopLongHistogram();
140+
private LongCounter messagesInTotal = new NopLongCounter();
141+
private LongCounter messagesOutTotal = new NopLongCounter();
142+
private LongCounter throughputInTotal = new NopLongCounter();
143+
private LongCounter throughputOutTotal = new NopLongCounter();
144+
private LongHistogram messageSize = new NopLongHistogram();
145+
private LongHistogram topicCreateExecuteTime = new NopLongHistogram();
146+
private LongHistogram consumerGroupCreateExecuteTime = new NopLongHistogram();
148147

149148
// client connection metrics
150-
public static ObservableLongGauge producerConnection = new NopObservableLongGauge();
151-
public static ObservableLongGauge consumerConnection = new NopObservableLongGauge();
149+
private ObservableLongGauge producerConnection = new NopObservableLongGauge();
150+
private ObservableLongGauge consumerConnection = new NopObservableLongGauge();
152151

153152
// Lag metrics
154-
public static ObservableLongGauge consumerLagMessages = new NopObservableLongGauge();
155-
public static ObservableLongGauge consumerLagLatency = new NopObservableLongGauge();
156-
public static ObservableLongGauge consumerInflightMessages = new NopObservableLongGauge();
157-
public static ObservableLongGauge consumerQueueingLatency = new NopObservableLongGauge();
158-
public static ObservableLongGauge consumerReadyMessages = new NopObservableLongGauge();
159-
public static LongCounter sendToDlqMessages = new NopLongCounter();
160-
public static ObservableLongGauge halfMessages = new NopObservableLongGauge();
161-
public static LongCounter commitMessagesTotal = new NopLongCounter();
162-
public static LongCounter rollBackMessagesTotal = new NopLongCounter();
163-
public static LongHistogram transactionFinishLatency = new NopLongHistogram();
153+
private ObservableLongGauge consumerLagMessages = new NopObservableLongGauge();
154+
private ObservableLongGauge consumerLagLatency = new NopObservableLongGauge();
155+
private ObservableLongGauge consumerInflightMessages = new NopObservableLongGauge();
156+
private ObservableLongGauge consumerQueueingLatency = new NopObservableLongGauge();
157+
private ObservableLongGauge consumerReadyMessages = new NopObservableLongGauge();
158+
private LongCounter sendToDlqMessages = new NopLongCounter();
159+
private ObservableLongGauge halfMessages = new NopObservableLongGauge();
160+
private LongCounter commitMessagesTotal = new NopLongCounter();
161+
private LongCounter rollBackMessagesTotal = new NopLongCounter();
162+
private LongHistogram transactionFinishLatency = new NopLongHistogram();
164163

165164
@SuppressWarnings("DoubleBraceInitialization")
166165
public static final List<String> SYSTEM_GROUP_PREFIX_LIST = new ArrayList<String>() {
@@ -177,13 +176,13 @@ public BrokerMetricsManager(BrokerController brokerController) {
177176
init();
178177
}
179178

180-
public static AttributesBuilder newAttributesBuilder() {
179+
public AttributesBuilder newAttributesBuilder() {
181180
AttributesBuilder attributesBuilder;
182181
if (attributesBuilderSupplier == null) {
183182
attributesBuilderSupplier = Attributes::builder;
184183
}
185184
attributesBuilder = attributesBuilderSupplier.get();
186-
LABEL_MAP.forEach(attributesBuilder::put);
185+
labelMap.forEach(attributesBuilder::put);
187186
return attributesBuilder;
188187
}
189188

@@ -242,6 +241,56 @@ public Meter getBrokerMeter() {
242241
return brokerMeter;
243242
}
244243

244+
// Getter methods for metrics variables
245+
public LongCounter getMessagesInTotal() {
246+
return messagesInTotal;
247+
}
248+
249+
public LongCounter getMessagesOutTotal() {
250+
return messagesOutTotal;
251+
}
252+
253+
public LongCounter getThroughputInTotal() {
254+
return throughputInTotal;
255+
}
256+
257+
public LongCounter getThroughputOutTotal() {
258+
return throughputOutTotal;
259+
}
260+
261+
public LongHistogram getMessageSize() {
262+
return messageSize;
263+
}
264+
265+
public LongCounter getSendToDlqMessages() {
266+
return sendToDlqMessages;
267+
}
268+
269+
public LongCounter getCommitMessagesTotal() {
270+
return commitMessagesTotal;
271+
}
272+
273+
public LongCounter getRollBackMessagesTotal() {
274+
return rollBackMessagesTotal;
275+
}
276+
277+
public LongHistogram getTransactionFinishLatency() {
278+
return transactionFinishLatency;
279+
}
280+
281+
public LongHistogram getTopicCreateExecuteTime() {
282+
return topicCreateExecuteTime;
283+
}
284+
285+
public LongHistogram getConsumerGroupCreateExecuteTime() {
286+
return consumerGroupCreateExecuteTime;
287+
}
288+
289+
// Setter method for testing purposes
290+
public void setAttributesBuilderSupplier(Supplier<AttributesBuilder> attributesBuilderSupplier) {
291+
this.attributesBuilderSupplier = attributesBuilderSupplier;
292+
}
293+
245294
private boolean checkConfig() {
246295
if (brokerConfig == null) {
247296
return false;
@@ -282,15 +331,15 @@ private void init() {
282331
LOGGER.warn("metricsLabel is not valid: {}", labels);
283332
continue;
284333
}
285-
LABEL_MAP.put(split[0], split[1]);
334+
labelMap.put(split[0], split[1]);
286335
}
287336
}
288337
if (brokerConfig.isMetricsInDelta()) {
289-
LABEL_MAP.put(LABEL_AGGREGATION, AGGREGATION_DELTA);
338+
labelMap.put(LABEL_AGGREGATION, AGGREGATION_DELTA);
290339
}
291-
LABEL_MAP.put(LABEL_NODE_TYPE, NODE_TYPE_BROKER);
292-
LABEL_MAP.put(LABEL_CLUSTER_NAME, brokerConfig.getBrokerClusterName());
293-
LABEL_MAP.put(LABEL_NODE_ID, brokerConfig.getBrokerName());
340+
labelMap.put(LABEL_NODE_TYPE, NODE_TYPE_BROKER);
341+
labelMap.put(LABEL_CLUSTER_NAME, brokerConfig.getBrokerClusterName());
342+
labelMap.put(LABEL_NODE_ID, brokerConfig.getBrokerName());
294343

295344
SdkMeterProviderBuilder providerBuilder = SdkMeterProvider.builder()
296345
.setResource(Resource.empty());
@@ -699,13 +748,13 @@ private void initTransactionMetrics() {
699748
}
700749
private void initOtherMetrics() {
701750
if (brokerConfig.isEnableRemotingMetrics()) {
702-
RemotingMetricsManager.initMetrics(brokerMeter, BrokerMetricsManager::newAttributesBuilder);
751+
RemotingMetricsManager.initMetrics(brokerMeter, this::newAttributesBuilder);
703752
}
704753
if (brokerConfig.isEnableMessageStoreMetrics()) {
705-
messageStore.initMetrics(brokerMeter, BrokerMetricsManager::newAttributesBuilder);
754+
messageStore.initMetrics(brokerMeter, this::newAttributesBuilder);
706755
}
707756
if (brokerConfig.isEnablePopMetrics()) {
708-
PopMetricsManager.initMetrics(brokerMeter, brokerController, BrokerMetricsManager::newAttributesBuilder);
757+
PopMetricsManager.initMetrics(brokerMeter, brokerController, this::newAttributesBuilder);
709758
}
710759
}
711760

broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,12 +182,12 @@ protected RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, f
182182
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
183183
|| delayLevel < 0) {
184184

185-
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
185+
Attributes attributes = this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
186186
.put(LABEL_CONSUMER_GROUP, requestHeader.getGroup())
187187
.put(LABEL_TOPIC, requestHeader.getOriginTopic())
188188
.put(LABEL_IS_SYSTEM, BrokerMetricsManager.isSystem(requestHeader.getOriginTopic(), requestHeader.getGroup()))
189189
.build();
190-
BrokerMetricsManager.sendToDlqMessages.add(1, attributes);
190+
this.brokerController.getBrokerMetricsManager().getSendToDlqMessages().add(1, attributes);
191191

192192
isDLQ = true;
193193
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());

broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
import org.apache.rocketmq.broker.controller.ReplicasManager;
6767
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
6868
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
69-
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
69+
7070
import org.apache.rocketmq.broker.metrics.InvocationStatus;
7171
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
7272
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
@@ -614,11 +614,11 @@ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext
614614
executionTime = System.currentTimeMillis() - startTime;
615615
InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ?
616616
InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
617-
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
617+
Attributes attributes = this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
618618
.put(LABEL_INVOCATION_STATUS, status.getName())
619619
.put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic))
620620
.build();
621-
BrokerMetricsManager.topicCreateExecuteTime.record(executionTime, attributes);
621+
this.brokerController.getBrokerMetricsManager().getTopicCreateExecuteTime().record(executionTime, attributes);
622622
}
623623
LOGGER.info("executionTime of create topic:{} is {} ms", topic, executionTime);
624624
return response;
@@ -695,11 +695,11 @@ private synchronized RemotingCommand updateAndCreateTopicList(ChannelHandlerCont
695695
executionTime = System.currentTimeMillis() - startTime;
696696
InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ?
697697
InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
698-
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
698+
Attributes attributes = this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
699699
.put(LABEL_INVOCATION_STATUS, status.getName())
700700
.put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topicNames))
701701
.build();
702-
BrokerMetricsManager.topicCreateExecuteTime.record(executionTime, attributes);
702+
this.brokerController.getBrokerMetricsManager().getTopicCreateExecuteTime().record(executionTime, attributes);
703703
}
704704
LOGGER.info("executionTime of all topics:{} is {} ms", topicNames, executionTime);
705705
return response;
@@ -1551,10 +1551,10 @@ private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext c
15511551
}
15521552
InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ?
15531553
InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
1554-
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
1554+
Attributes attributes = this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
15551555
.put(LABEL_INVOCATION_STATUS, status.getName())
15561556
.build();
1557-
BrokerMetricsManager.consumerGroupCreateExecuteTime.record(executionTime, attributes);
1557+
this.brokerController.getBrokerMetricsManager().getConsumerGroupCreateExecuteTime().record(executionTime, attributes);
15581558
return response;
15591559
}
15601560

@@ -1589,10 +1589,10 @@ private RemotingCommand updateAndCreateSubscriptionGroupList(ChannelHandlerConte
15891589
LOGGER.info("executionTime of create updateAndCreateSubscriptionGroupList: {} is {} ms", groupNames, executionTime);
15901590
InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ?
15911591
InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
1592-
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
1592+
Attributes attributes = this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
15931593
.put(LABEL_INVOCATION_STATUS, status.getName())
15941594
.build();
1595-
BrokerMetricsManager.consumerGroupCreateExecuteTime.record(executionTime, attributes);
1595+
this.brokerController.getBrokerMetricsManager().getConsumerGroupCreateExecuteTime().record(executionTime, attributes);
15961596
}
15971597

15981598
return response;

broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,13 +122,13 @@ public RemotingCommand handle(final GetMessageResult getMessageResult,
122122
this.brokerController.getBrokerStatsManager().incBrokerGetNums(requestHeader.getTopic(), getMessageResult.getMessageCount());
123123

124124
if (!BrokerMetricsManager.isRetryOrDlqTopic(requestHeader.getTopic())) {
125-
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
125+
Attributes attributes = this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
126126
.put(LABEL_TOPIC, requestHeader.getTopic())
127127
.put(LABEL_CONSUMER_GROUP, requestHeader.getConsumerGroup())
128128
.put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(requestHeader.getTopic()) || MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup()))
129129
.build();
130-
BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), attributes);
131-
BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(), attributes);
130+
this.brokerController.getBrokerMetricsManager().getMessagesOutTotal().add(getMessageResult.getMessageCount(), attributes);
131+
this.brokerController.getBrokerMetricsManager().getThroughputOutTotal().add(getMessageResult.getBufferTotalSize(), attributes);
132132
}
133133

134134
if (!channelIsWritable(channel, requestHeader)) {

broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import io.netty.channel.ChannelHandlerContext;
2020
import org.apache.commons.lang3.StringUtils;
2121
import org.apache.rocketmq.broker.BrokerController;
22-
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
22+
2323
import org.apache.rocketmq.broker.transaction.OperationResult;
2424
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
2525
import org.apache.rocketmq.common.TopicFilterType;
@@ -149,12 +149,12 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
149149
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
150150
// successful committed, then total num of half-messages minus 1
151151
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(msgInner.getTopic(), -1);
152-
BrokerMetricsManager.commitMessagesTotal.add(1, BrokerMetricsManager.newAttributesBuilder()
152+
this.brokerController.getBrokerMetricsManager().getCommitMessagesTotal().add(1, this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
153153
.put(LABEL_TOPIC, msgInner.getTopic())
154154
.build());
155155
// record the commit latency.
156156
Long commitLatency = (System.currentTimeMillis() - result.getPrepareMessage().getBornTimestamp()) / 1000;
157-
BrokerMetricsManager.transactionFinishLatency.record(commitLatency, BrokerMetricsManager.newAttributesBuilder()
157+
this.brokerController.getBrokerMetricsManager().getTransactionFinishLatency().record(commitLatency, this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
158158
.put(LABEL_TOPIC, msgInner.getTopic())
159159
.build());
160160
}
@@ -176,7 +176,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
176176
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
177177
// roll back, then total num of half-messages minus 1
178178
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(result.getPrepareMessage().getProperty(MessageConst.PROPERTY_REAL_TOPIC), -1);
179-
BrokerMetricsManager.rollBackMessagesTotal.add(1, BrokerMetricsManager.newAttributesBuilder()
179+
this.brokerController.getBrokerMetricsManager().getRollBackMessagesTotal().add(1, this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
180180
.put(LABEL_TOPIC, result.getPrepareMessage().getProperty(MessageConst.PROPERTY_REAL_TOPIC))
181181
.build());
182182
}

0 commit comments

Comments
 (0)