Skip to content

Commit 16a6971

Browse files
committed
fix
1 parent 2791833 commit 16a6971

13 files changed

Lines changed: 162 additions & 56 deletions

broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java

Lines changed: 52 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -131,36 +131,36 @@ public class BrokerMetricsManager {
131131
public static Supplier<AttributesBuilder> attributesBuilderSupplier = Attributes::builder;
132132

133133
// broker stats metrics
134-
public static ObservableLongGauge processorWatermark = new NopObservableLongGauge();
135-
public static ObservableLongGauge brokerPermission = new NopObservableLongGauge();
136-
public static ObservableLongGauge topicNum = new NopObservableLongGauge();
137-
public static ObservableLongGauge consumerGroupNum = new NopObservableLongGauge();
134+
public ObservableLongGauge processorWatermark = new NopObservableLongGauge();
135+
public ObservableLongGauge brokerPermission = new NopObservableLongGauge();
136+
public ObservableLongGauge topicNum = new NopObservableLongGauge();
137+
public ObservableLongGauge consumerGroupNum = new NopObservableLongGauge();
138138

139139

140140
// request metrics
141-
public static LongCounter messagesInTotal = new NopLongCounter();
142-
public static LongCounter messagesOutTotal = new NopLongCounter();
143-
public static LongCounter throughputInTotal = new NopLongCounter();
144-
public static LongCounter throughputOutTotal = new NopLongCounter();
145-
public static LongHistogram messageSize = new NopLongHistogram();
146-
public static LongHistogram topicCreateExecuteTime = new NopLongHistogram();
147-
public static LongHistogram consumerGroupCreateExecuteTime = new NopLongHistogram();
141+
public LongCounter messagesInTotal = new NopLongCounter();
142+
public LongCounter messagesOutTotal = new NopLongCounter();
143+
public LongCounter throughputInTotal = new NopLongCounter();
144+
public LongCounter throughputOutTotal = new NopLongCounter();
145+
public LongHistogram messageSize = new NopLongHistogram();
146+
public LongHistogram topicCreateExecuteTime = new NopLongHistogram();
147+
public LongHistogram consumerGroupCreateExecuteTime = new NopLongHistogram();
148148

149149
// client connection metrics
150-
public static ObservableLongGauge producerConnection = new NopObservableLongGauge();
151-
public static ObservableLongGauge consumerConnection = new NopObservableLongGauge();
150+
public ObservableLongGauge producerConnection = new NopObservableLongGauge();
151+
public ObservableLongGauge consumerConnection = new NopObservableLongGauge();
152152

153153
// Lag metrics
154-
public static ObservableLongGauge consumerLagMessages = new NopObservableLongGauge();
155-
public static ObservableLongGauge consumerLagLatency = new NopObservableLongGauge();
156-
public static ObservableLongGauge consumerInflightMessages = new NopObservableLongGauge();
157-
public static ObservableLongGauge consumerQueueingLatency = new NopObservableLongGauge();
158-
public static ObservableLongGauge consumerReadyMessages = new NopObservableLongGauge();
159-
public static LongCounter sendToDlqMessages = new NopLongCounter();
160-
public static ObservableLongGauge halfMessages = new NopObservableLongGauge();
161-
public static LongCounter commitMessagesTotal = new NopLongCounter();
162-
public static LongCounter rollBackMessagesTotal = new NopLongCounter();
163-
public static LongHistogram transactionFinishLatency = new NopLongHistogram();
154+
public ObservableLongGauge consumerLagMessages = new NopObservableLongGauge();
155+
public ObservableLongGauge consumerLagLatency = new NopObservableLongGauge();
156+
public ObservableLongGauge consumerInflightMessages = new NopObservableLongGauge();
157+
public ObservableLongGauge consumerQueueingLatency = new NopObservableLongGauge();
158+
public ObservableLongGauge consumerReadyMessages = new NopObservableLongGauge();
159+
public LongCounter sendToDlqMessages = new NopLongCounter();
160+
public ObservableLongGauge halfMessages = new NopObservableLongGauge();
161+
public LongCounter commitMessagesTotal = new NopLongCounter();
162+
public LongCounter rollBackMessagesTotal = new NopLongCounter();
163+
public LongHistogram transactionFinishLatency = new NopLongHistogram();
164164

165165
public static final List<String> SYSTEM_GROUP_PREFIX_LIST = new ArrayList<String>() {
166166
{
@@ -473,6 +473,10 @@ private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) {
473473
}
474474

475475
private void initStatsMetrics() {
476+
if (!brokerConfig.isEnableStatsMetrics()) {
477+
return;
478+
}
479+
476480
processorWatermark = brokerMeter.gaugeBuilder(GAUGE_PROCESSOR_WATERMARK)
477481
.setDescription("Request processor watermark")
478482
.ofLongs()
@@ -508,6 +512,10 @@ private void initStatsMetrics() {
508512
}
509513

510514
private void initRequestMetrics() {
515+
if (!brokerConfig.isEnableRequestMetrics()) {
516+
return;
517+
}
518+
511519
messagesInTotal = brokerMeter.counterBuilder(COUNTER_MESSAGES_IN_TOTAL)
512520
.setDescription("Total number of incoming messages")
513521
.build();
@@ -543,6 +551,10 @@ private void initRequestMetrics() {
543551
}
544552

545553
private void initConnectionMetrics() {
554+
if (!brokerConfig.isEnableConnectionMetrics()) {
555+
return;
556+
}
557+
546558
producerConnection = brokerMeter.gaugeBuilder(GAUGE_PRODUCER_CONNECTIONS)
547559
.setDescription("Producer connections")
548560
.ofLongs()
@@ -599,6 +611,10 @@ private void initConnectionMetrics() {
599611
}
600612

601613
private void initLagAndDlqMetrics() {
614+
if (!brokerConfig.isEnableLagAndDlqMetrics()) {
615+
return;
616+
}
617+
602618
consumerLagMessages = brokerMeter.gaugeBuilder(GAUGE_CONSUMER_LAG_MESSAGES)
603619
.setDescription("Consumer lag messages")
604620
.ofLongs()
@@ -649,6 +665,10 @@ private void initLagAndDlqMetrics() {
649665
}
650666

651667
private void initTransactionMetrics() {
668+
if (!brokerController.getBrokerConfig().isEnableTransactionMetrics()) {
669+
return;
670+
}
671+
652672
commitMessagesTotal = brokerMeter.counterBuilder(COUNTER_COMMIT_MESSAGES_TOTAL)
653673
.setDescription("Total number of commit messages")
654674
.build();
@@ -677,9 +697,15 @@ private void initTransactionMetrics() {
677697
});
678698
}
679699
private void initOtherMetrics() {
680-
RemotingMetricsManager.initMetrics(brokerMeter, BrokerMetricsManager::newAttributesBuilder);
681-
messageStore.initMetrics(brokerMeter, BrokerMetricsManager::newAttributesBuilder);
682-
PopMetricsManager.initMetrics(brokerMeter, brokerController, BrokerMetricsManager::newAttributesBuilder);
700+
if (brokerConfig.isEnableRemotingMetrics()) {
701+
RemotingMetricsManager.initMetrics(brokerMeter, BrokerMetricsManager::newAttributesBuilder);
702+
}
703+
if (brokerConfig.isEnableMessageStoreMetrics()) {
704+
messageStore.initMetrics(brokerMeter, BrokerMetricsManager::newAttributesBuilder);
705+
}
706+
if (brokerConfig.isEnablePopMetrics()) {
707+
PopMetricsManager.initMetrics(brokerMeter, brokerController, BrokerMetricsManager::newAttributesBuilder);
708+
}
683709
}
684710

685711
public void shutdown() {

broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ protected RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, f
187187
.put(LABEL_TOPIC, requestHeader.getOriginTopic())
188188
.put(LABEL_IS_SYSTEM, BrokerMetricsManager.isSystem(requestHeader.getOriginTopic(), requestHeader.getGroup()))
189189
.build();
190-
BrokerMetricsManager.sendToDlqMessages.add(1, attributes);
190+
brokerController.getBrokerMetricsManager().sendToDlqMessages.add(1, attributes);
191191

192192
isDLQ = true;
193193
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());

broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -618,7 +618,7 @@ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext
618618
.put(LABEL_INVOCATION_STATUS, status.getName())
619619
.put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic))
620620
.build();
621-
BrokerMetricsManager.topicCreateExecuteTime.record(executionTime, attributes);
621+
brokerController.getBrokerMetricsManager().topicCreateExecuteTime.record(executionTime, attributes);
622622
}
623623
LOGGER.info("executionTime of create topic:{} is {} ms", topic, executionTime);
624624
return response;
@@ -699,7 +699,7 @@ private synchronized RemotingCommand updateAndCreateTopicList(ChannelHandlerCont
699699
.put(LABEL_INVOCATION_STATUS, status.getName())
700700
.put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topicNames))
701701
.build();
702-
BrokerMetricsManager.topicCreateExecuteTime.record(executionTime, attributes);
702+
brokerController.getBrokerMetricsManager().topicCreateExecuteTime.record(executionTime, attributes);
703703
}
704704
LOGGER.info("executionTime of all topics:{} is {} ms", topicNames, executionTime);
705705
return response;
@@ -1554,7 +1554,7 @@ private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext c
15541554
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
15551555
.put(LABEL_INVOCATION_STATUS, status.getName())
15561556
.build();
1557-
BrokerMetricsManager.consumerGroupCreateExecuteTime.record(executionTime, attributes);
1557+
brokerController.getBrokerMetricsManager().consumerGroupCreateExecuteTime.record(executionTime, attributes);
15581558
return response;
15591559
}
15601560

@@ -1592,7 +1592,7 @@ private RemotingCommand updateAndCreateSubscriptionGroupList(ChannelHandlerConte
15921592
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
15931593
.put(LABEL_INVOCATION_STATUS, status.getName())
15941594
.build();
1595-
BrokerMetricsManager.consumerGroupCreateExecuteTime.record(executionTime, attributes);
1595+
brokerController.getBrokerMetricsManager().consumerGroupCreateExecuteTime.record(executionTime, attributes);
15961596
}
15971597

15981598
return response;

broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,8 @@ public RemotingCommand handle(final GetMessageResult getMessageResult,
127127
.put(LABEL_CONSUMER_GROUP, requestHeader.getConsumerGroup())
128128
.put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(requestHeader.getTopic()) || MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup()))
129129
.build();
130-
BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), attributes);
131-
BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(), attributes);
130+
brokerController.getBrokerMetricsManager().messagesOutTotal.add(getMessageResult.getMessageCount(), attributes);
131+
brokerController.getBrokerMetricsManager().throughputOutTotal.add(getMessageResult.getBufferTotalSize(), attributes);
132132
}
133133

134134
if (!channelIsWritable(channel, requestHeader)) {

broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,12 +149,14 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
149149
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
150150
// successful committed, then total num of half-messages minus 1
151151
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(msgInner.getTopic(), -1);
152-
BrokerMetricsManager.commitMessagesTotal.add(1, BrokerMetricsManager.newAttributesBuilder()
152+
brokerController.getBrokerMetricsManager().commitMessagesTotal.add(1,
153+
BrokerMetricsManager.newAttributesBuilder()
153154
.put(LABEL_TOPIC, msgInner.getTopic())
154155
.build());
155156
// record the commit latency.
156157
Long commitLatency = (System.currentTimeMillis() - result.getPrepareMessage().getBornTimestamp()) / 1000;
157-
BrokerMetricsManager.transactionFinishLatency.record(commitLatency, BrokerMetricsManager.newAttributesBuilder()
158+
brokerController.getBrokerMetricsManager().transactionFinishLatency.record(commitLatency,
159+
BrokerMetricsManager.newAttributesBuilder()
158160
.put(LABEL_TOPIC, msgInner.getTopic())
159161
.build());
160162
}
@@ -176,7 +178,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
176178
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
177179
// roll back, then total num of half-messages minus 1
178180
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(result.getPrepareMessage().getProperty(MessageConst.PROPERTY_REAL_TOPIC), -1);
179-
BrokerMetricsManager.rollBackMessagesTotal.add(1, BrokerMetricsManager.newAttributesBuilder()
181+
brokerController.getBrokerMetricsManager().rollBackMessagesTotal.add(1,
182+
BrokerMetricsManager.newAttributesBuilder()
180183
.put(LABEL_TOPIC, result.getPrepareMessage().getProperty(MessageConst.PROPERTY_REAL_TOPIC))
181184
.build());
182185
}

broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,8 +260,9 @@ private long peekMsgFromQueue(boolean isRetry, GetMessageResult getMessageResult
260260
.put(LABEL_CONSUMER_GROUP, requestHeader.getConsumerGroup())
261261
.put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(requestHeader.getTopic()) || MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup()))
262262
.build();
263-
BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), attributes);
264-
BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(), attributes);
263+
brokerController.getBrokerMetricsManager().messagesOutTotal.add(getMessageResult.getMessageCount(),
264+
attributes);
265+
brokerController.getBrokerMetricsManager().throughputOutTotal.add(getMessageResult.getBufferTotalSize(), attributes);
265266
}
266267

267268
for (SelectMappedBufferResult mappedBuffer : getMessageTmpResult.getMessageMapedList()) {

broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -785,8 +785,10 @@ private CompletableFuture<Long> popMsgFromQueue(String topic, String attemptId,
785785
.put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(requestHeader.getTopic()) || MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup()))
786786
.put(LABEL_IS_RETRY, isRetry)
787787
.build();
788-
BrokerMetricsManager.messagesOutTotal.add(result.getMessageCount(), attributes);
789-
BrokerMetricsManager.throughputOutTotal.add(result.getBufferTotalSize(), attributes);
788+
brokerController.getBrokerMetricsManager().messagesOutTotal.add(result.getMessageCount(),
789+
attributes);
790+
brokerController.getBrokerMetricsManager().throughputOutTotal.add(result.getBufferTotalSize(),
791+
attributes);
790792

791793
if (isOrder) {
792794
this.brokerController.getConsumerOrderInfoManager().update(requestHeader.getAttemptId(), isRetry, topic,

broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,8 +228,8 @@ public PullResult getMessage(String group, String topic, int queueId, long offse
228228
.put(LABEL_CONSUMER_GROUP, group)
229229
.put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic) || MixAll.isSysConsumerGroup(group))
230230
.build();
231-
BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), attributes);
232-
BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(), attributes);
231+
brokerController.getBrokerMetricsManager().messagesOutTotal.add(getMessageResult.getMessageCount(), attributes);
232+
brokerController.getBrokerMetricsManager().throughputOutTotal.add(getMessageResult.getBufferTotalSize(), attributes);
233233

234234
break;
235235
case NO_MATCHED_MESSAGE:

broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -303,9 +303,9 @@ private void handlePutMessageResult(PutMessageResult putMessageResult,
303303
.put(LABEL_MESSAGE_TYPE, messageType.getMetricsValue())
304304
.put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(msg.getTopic()))
305305
.build();
306-
BrokerMetricsManager.messagesInTotal.add(putMessageResult.getAppendMessageResult().getMsgNum(), attributes);
307-
BrokerMetricsManager.throughputInTotal.add(putMessageResult.getAppendMessageResult().getWroteBytes(), attributes);
308-
BrokerMetricsManager.messageSize.record(putMessageResult.getAppendMessageResult().getWroteBytes() / putMessageResult.getAppendMessageResult().getMsgNum(), attributes);
306+
brokerController.getBrokerMetricsManager().messagesInTotal.add(putMessageResult.getAppendMessageResult().getMsgNum(), attributes);
307+
brokerController.getBrokerMetricsManager().throughputInTotal.add(putMessageResult.getAppendMessageResult().getWroteBytes(), attributes);
308+
brokerController.getBrokerMetricsManager().messageSize.record(putMessageResult.getAppendMessageResult().getWroteBytes() / putMessageResult.getAppendMessageResult().getMsgNum(), attributes);
309309
}
310310

311311
responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());

broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, Remoti
212212
.put(LABEL_TOPIC, requestHeader.getTopic())
213213
.put(LABEL_IS_SYSTEM, BrokerMetricsManager.isSystem(requestHeader.getTopic(), requestHeader.getProducerGroup()))
214214
.build();
215-
BrokerMetricsManager.sendToDlqMessages.add(1, attributes);
215+
brokerController.getBrokerMetricsManager().sendToDlqMessages.add(1, attributes);
216216

217217
properties.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "-1");
218218
newTopic = MixAll.getDLQTopic(groupName);
@@ -473,9 +473,9 @@ private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult
473473
.put(LABEL_MESSAGE_TYPE, messageType.getMetricsValue())
474474
.put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(msg.getTopic()))
475475
.build();
476-
BrokerMetricsManager.messagesInTotal.add(putMessageResult.getAppendMessageResult().getMsgNum(), attributes);
477-
BrokerMetricsManager.throughputInTotal.add(putMessageResult.getAppendMessageResult().getWroteBytes(), attributes);
478-
BrokerMetricsManager.messageSize.record(putMessageResult.getAppendMessageResult().getWroteBytes() / putMessageResult.getAppendMessageResult().getMsgNum(), attributes);
476+
brokerController.getBrokerMetricsManager().messagesInTotal.add(putMessageResult.getAppendMessageResult().getMsgNum(), attributes);
477+
brokerController.getBrokerMetricsManager().throughputInTotal.add(putMessageResult.getAppendMessageResult().getWroteBytes(), attributes);
478+
brokerController.getBrokerMetricsManager().messageSize.record(putMessageResult.getAppendMessageResult().getWroteBytes() / putMessageResult.getAppendMessageResult().getMsgNum(), attributes);
479479
}
480480

481481
response.setRemark(null);

0 commit comments

Comments
 (0)