diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 595737adf71..6835c9ebef0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -490,6 +490,10 @@ public BrokerMetricsManager getBrokerMetricsManager() { return brokerMetricsManager; } + public void setBrokerMetricsManager(BrokerMetricsManager brokerMetricsManager) { + this.brokerMetricsManager = brokerMetricsManager; + } + protected void initializeRemotingServer() throws CloneNotSupportedException { RemotingServer tcpRemotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService); NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java index 6300d763d65..5a32cf3c673 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java @@ -121,46 +121,45 @@ public class BrokerMetricsManager { private final MessageStore messageStore; private final BrokerController brokerController; private final ConsumerLagCalculator consumerLagCalculator; - private final static Map LABEL_MAP = new HashMap<>(); + private final Map labelMap = new HashMap<>(); private OtlpGrpcMetricExporter metricExporter; private PeriodicMetricReader periodicMetricReader; private PrometheusHttpServer prometheusHttpServer; private MetricExporter loggingMetricExporter; private Meter brokerMeter; - public static Supplier attributesBuilderSupplier = Attributes::builder; + private Supplier attributesBuilderSupplier = Attributes::builder; // broker stats metrics - public static ObservableLongGauge processorWatermark = new NopObservableLongGauge(); - public static ObservableLongGauge brokerPermission = new NopObservableLongGauge(); - public static ObservableLongGauge topicNum = new NopObservableLongGauge(); - public static ObservableLongGauge consumerGroupNum = new NopObservableLongGauge(); - + private ObservableLongGauge processorWatermark = new NopObservableLongGauge(); + private ObservableLongGauge brokerPermission = new NopObservableLongGauge(); + private ObservableLongGauge topicNum = new NopObservableLongGauge(); + private ObservableLongGauge consumerGroupNum = new NopObservableLongGauge(); // request metrics - public static LongCounter messagesInTotal = new NopLongCounter(); - public static LongCounter messagesOutTotal = new NopLongCounter(); - public static LongCounter throughputInTotal = new NopLongCounter(); - public static LongCounter throughputOutTotal = new NopLongCounter(); - public static LongHistogram messageSize = new NopLongHistogram(); - public static LongHistogram topicCreateExecuteTime = new NopLongHistogram(); - public static LongHistogram consumerGroupCreateExecuteTime = new NopLongHistogram(); + private LongCounter messagesInTotal = new NopLongCounter(); + private LongCounter messagesOutTotal = new NopLongCounter(); + private LongCounter throughputInTotal = new NopLongCounter(); + private LongCounter throughputOutTotal = new NopLongCounter(); + private LongHistogram messageSize = new NopLongHistogram(); + private LongHistogram topicCreateExecuteTime = new NopLongHistogram(); + private LongHistogram consumerGroupCreateExecuteTime = new NopLongHistogram(); // client connection metrics - public static ObservableLongGauge producerConnection = new NopObservableLongGauge(); - public static ObservableLongGauge consumerConnection = new NopObservableLongGauge(); + private ObservableLongGauge producerConnection = new NopObservableLongGauge(); + private ObservableLongGauge consumerConnection = new NopObservableLongGauge(); // Lag metrics - public static ObservableLongGauge consumerLagMessages = new NopObservableLongGauge(); - public static ObservableLongGauge consumerLagLatency = new NopObservableLongGauge(); - public static ObservableLongGauge consumerInflightMessages = new NopObservableLongGauge(); - public static ObservableLongGauge consumerQueueingLatency = new NopObservableLongGauge(); - public static ObservableLongGauge consumerReadyMessages = new NopObservableLongGauge(); - public static LongCounter sendToDlqMessages = new NopLongCounter(); - public static ObservableLongGauge halfMessages = new NopObservableLongGauge(); - public static LongCounter commitMessagesTotal = new NopLongCounter(); - public static LongCounter rollBackMessagesTotal = new NopLongCounter(); - public static LongHistogram transactionFinishLatency = new NopLongHistogram(); + private ObservableLongGauge consumerLagMessages = new NopObservableLongGauge(); + private ObservableLongGauge consumerLagLatency = new NopObservableLongGauge(); + private ObservableLongGauge consumerInflightMessages = new NopObservableLongGauge(); + private ObservableLongGauge consumerQueueingLatency = new NopObservableLongGauge(); + private ObservableLongGauge consumerReadyMessages = new NopObservableLongGauge(); + private LongCounter sendToDlqMessages = new NopLongCounter(); + private ObservableLongGauge halfMessages = new NopObservableLongGauge(); + private LongCounter commitMessagesTotal = new NopLongCounter(); + private LongCounter rollBackMessagesTotal = new NopLongCounter(); + private LongHistogram transactionFinishLatency = new NopLongHistogram(); @SuppressWarnings("DoubleBraceInitialization") public static final List SYSTEM_GROUP_PREFIX_LIST = new ArrayList() { @@ -177,13 +176,13 @@ public BrokerMetricsManager(BrokerController brokerController) { init(); } - public static AttributesBuilder newAttributesBuilder() { + public AttributesBuilder newAttributesBuilder() { AttributesBuilder attributesBuilder; if (attributesBuilderSupplier == null) { attributesBuilderSupplier = Attributes::builder; } attributesBuilder = attributesBuilderSupplier.get(); - LABEL_MAP.forEach(attributesBuilder::put); + labelMap.forEach(attributesBuilder::put); return attributesBuilder; } @@ -242,6 +241,56 @@ public Meter getBrokerMeter() { return brokerMeter; } + // Getter methods for metrics variables + public LongCounter getMessagesInTotal() { + return messagesInTotal; + } + + public LongCounter getMessagesOutTotal() { + return messagesOutTotal; + } + + public LongCounter getThroughputInTotal() { + return throughputInTotal; + } + + public LongCounter getThroughputOutTotal() { + return throughputOutTotal; + } + + public LongHistogram getMessageSize() { + return messageSize; + } + + public LongCounter getSendToDlqMessages() { + return sendToDlqMessages; + } + + public LongCounter getCommitMessagesTotal() { + return commitMessagesTotal; + } + + public LongCounter getRollBackMessagesTotal() { + return rollBackMessagesTotal; + } + + public LongHistogram getTransactionFinishLatency() { + return transactionFinishLatency; + } + + public LongHistogram getTopicCreateExecuteTime() { + return topicCreateExecuteTime; + } + + public LongHistogram getConsumerGroupCreateExecuteTime() { + return consumerGroupCreateExecuteTime; + } + + // Setter method for testing purposes + public void setAttributesBuilderSupplier(Supplier attributesBuilderSupplier) { + this.attributesBuilderSupplier = attributesBuilderSupplier; + } + private boolean checkConfig() { if (brokerConfig == null) { return false; @@ -282,15 +331,15 @@ private void init() { LOGGER.warn("metricsLabel is not valid: {}", labels); continue; } - LABEL_MAP.put(split[0], split[1]); + labelMap.put(split[0], split[1]); } } if (brokerConfig.isMetricsInDelta()) { - LABEL_MAP.put(LABEL_AGGREGATION, AGGREGATION_DELTA); + labelMap.put(LABEL_AGGREGATION, AGGREGATION_DELTA); } - LABEL_MAP.put(LABEL_NODE_TYPE, NODE_TYPE_BROKER); - LABEL_MAP.put(LABEL_CLUSTER_NAME, brokerConfig.getBrokerClusterName()); - LABEL_MAP.put(LABEL_NODE_ID, brokerConfig.getBrokerName()); + labelMap.put(LABEL_NODE_TYPE, NODE_TYPE_BROKER); + labelMap.put(LABEL_CLUSTER_NAME, brokerConfig.getBrokerClusterName()); + labelMap.put(LABEL_NODE_ID, brokerConfig.getBrokerName()); SdkMeterProviderBuilder providerBuilder = SdkMeterProvider.builder() .setResource(Resource.empty()); @@ -699,13 +748,13 @@ private void initTransactionMetrics() { } private void initOtherMetrics() { if (brokerConfig.isEnableRemotingMetrics()) { - RemotingMetricsManager.initMetrics(brokerMeter, BrokerMetricsManager::newAttributesBuilder); + RemotingMetricsManager.initMetrics(brokerMeter, this::newAttributesBuilder); } if (brokerConfig.isEnableMessageStoreMetrics()) { - messageStore.initMetrics(brokerMeter, BrokerMetricsManager::newAttributesBuilder); + messageStore.initMetrics(brokerMeter, this::newAttributesBuilder); } if (brokerConfig.isEnablePopMetrics()) { - PopMetricsManager.initMetrics(brokerMeter, brokerController, BrokerMetricsManager::newAttributesBuilder); + PopMetricsManager.initMetrics(brokerMeter, brokerController, this::newAttributesBuilder); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index 928bd397e1d..22778812093 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -182,12 +182,12 @@ protected RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, f if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) { - Attributes attributes = BrokerMetricsManager.newAttributesBuilder() + Attributes attributes = this.brokerController.getBrokerMetricsManager().newAttributesBuilder() .put(LABEL_CONSUMER_GROUP, requestHeader.getGroup()) .put(LABEL_TOPIC, requestHeader.getOriginTopic()) .put(LABEL_IS_SYSTEM, BrokerMetricsManager.isSystem(requestHeader.getOriginTopic(), requestHeader.getGroup())) .build(); - BrokerMetricsManager.sendToDlqMessages.add(1, attributes); + this.brokerController.getBrokerMetricsManager().getSendToDlqMessages().add(1, attributes); isDLQ = true; newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 4eb78fc1c2b..298e2390864 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -66,7 +66,7 @@ import org.apache.rocketmq.broker.controller.ReplicasManager; import org.apache.rocketmq.broker.filter.ConsumerFilterData; import org.apache.rocketmq.broker.filter.ExpressionMessageFilter; -import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; + import org.apache.rocketmq.broker.metrics.InvocationStatus; import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; @@ -614,11 +614,11 @@ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext executionTime = System.currentTimeMillis() - startTime; InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ? InvocationStatus.SUCCESS : InvocationStatus.FAILURE; - Attributes attributes = BrokerMetricsManager.newAttributesBuilder() + Attributes attributes = this.brokerController.getBrokerMetricsManager().newAttributesBuilder() .put(LABEL_INVOCATION_STATUS, status.getName()) .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic)) .build(); - BrokerMetricsManager.topicCreateExecuteTime.record(executionTime, attributes); + this.brokerController.getBrokerMetricsManager().getTopicCreateExecuteTime().record(executionTime, attributes); } LOGGER.info("executionTime of create topic:{} is {} ms", topic, executionTime); return response; @@ -695,11 +695,11 @@ private synchronized RemotingCommand updateAndCreateTopicList(ChannelHandlerCont executionTime = System.currentTimeMillis() - startTime; InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ? InvocationStatus.SUCCESS : InvocationStatus.FAILURE; - Attributes attributes = BrokerMetricsManager.newAttributesBuilder() + Attributes attributes = this.brokerController.getBrokerMetricsManager().newAttributesBuilder() .put(LABEL_INVOCATION_STATUS, status.getName()) .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topicNames)) .build(); - BrokerMetricsManager.topicCreateExecuteTime.record(executionTime, attributes); + this.brokerController.getBrokerMetricsManager().getTopicCreateExecuteTime().record(executionTime, attributes); } LOGGER.info("executionTime of all topics:{} is {} ms", topicNames, executionTime); return response; @@ -1551,10 +1551,10 @@ private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext c } InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ? InvocationStatus.SUCCESS : InvocationStatus.FAILURE; - Attributes attributes = BrokerMetricsManager.newAttributesBuilder() + Attributes attributes = this.brokerController.getBrokerMetricsManager().newAttributesBuilder() .put(LABEL_INVOCATION_STATUS, status.getName()) .build(); - BrokerMetricsManager.consumerGroupCreateExecuteTime.record(executionTime, attributes); + this.brokerController.getBrokerMetricsManager().getConsumerGroupCreateExecuteTime().record(executionTime, attributes); return response; } @@ -1589,10 +1589,10 @@ private RemotingCommand updateAndCreateSubscriptionGroupList(ChannelHandlerConte LOGGER.info("executionTime of create updateAndCreateSubscriptionGroupList: {} is {} ms", groupNames, executionTime); InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ? InvocationStatus.SUCCESS : InvocationStatus.FAILURE; - Attributes attributes = BrokerMetricsManager.newAttributesBuilder() + Attributes attributes = this.brokerController.getBrokerMetricsManager().newAttributesBuilder() .put(LABEL_INVOCATION_STATUS, status.getName()) .build(); - BrokerMetricsManager.consumerGroupCreateExecuteTime.record(executionTime, attributes); + this.brokerController.getBrokerMetricsManager().getConsumerGroupCreateExecuteTime().record(executionTime, attributes); } return response; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java index 43b66b4c516..37257325379 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java @@ -122,13 +122,13 @@ public RemotingCommand handle(final GetMessageResult getMessageResult, this.brokerController.getBrokerStatsManager().incBrokerGetNums(requestHeader.getTopic(), getMessageResult.getMessageCount()); if (!BrokerMetricsManager.isRetryOrDlqTopic(requestHeader.getTopic())) { - Attributes attributes = BrokerMetricsManager.newAttributesBuilder() + Attributes attributes = this.brokerController.getBrokerMetricsManager().newAttributesBuilder() .put(LABEL_TOPIC, requestHeader.getTopic()) .put(LABEL_CONSUMER_GROUP, requestHeader.getConsumerGroup()) .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(requestHeader.getTopic()) || MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup())) .build(); - BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), attributes); - BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(), attributes); + this.brokerController.getBrokerMetricsManager().getMessagesOutTotal().add(getMessageResult.getMessageCount(), attributes); + this.brokerController.getBrokerMetricsManager().getThroughputOutTotal().add(getMessageResult.getBufferTotalSize(), attributes); } if (!channelIsWritable(channel, requestHeader)) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java index 468a8791d40..153ac24c1f6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java @@ -19,7 +19,7 @@ import io.netty.channel.ChannelHandlerContext; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; + import org.apache.rocketmq.broker.transaction.OperationResult; import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil; import org.apache.rocketmq.common.TopicFilterType; @@ -149,12 +149,12 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); // successful committed, then total num of half-messages minus 1 this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(msgInner.getTopic(), -1); - BrokerMetricsManager.commitMessagesTotal.add(1, BrokerMetricsManager.newAttributesBuilder() + this.brokerController.getBrokerMetricsManager().getCommitMessagesTotal().add(1, this.brokerController.getBrokerMetricsManager().newAttributesBuilder() .put(LABEL_TOPIC, msgInner.getTopic()) .build()); // record the commit latency. Long commitLatency = (System.currentTimeMillis() - result.getPrepareMessage().getBornTimestamp()) / 1000; - BrokerMetricsManager.transactionFinishLatency.record(commitLatency, BrokerMetricsManager.newAttributesBuilder() + this.brokerController.getBrokerMetricsManager().getTransactionFinishLatency().record(commitLatency, this.brokerController.getBrokerMetricsManager().newAttributesBuilder() .put(LABEL_TOPIC, msgInner.getTopic()) .build()); } @@ -176,7 +176,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); // roll back, then total num of half-messages minus 1 this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(result.getPrepareMessage().getProperty(MessageConst.PROPERTY_REAL_TOPIC), -1); - BrokerMetricsManager.rollBackMessagesTotal.add(1, BrokerMetricsManager.newAttributesBuilder() + this.brokerController.getBrokerMetricsManager().getRollBackMessagesTotal().add(1, this.brokerController.getBrokerMetricsManager().newAttributesBuilder() .put(LABEL_TOPIC, result.getPrepareMessage().getProperty(MessageConst.PROPERTY_REAL_TOPIC)) .build()); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java index 40117b74a54..584d248ab2f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java @@ -26,7 +26,7 @@ import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; + import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.KeyBuilder; @@ -255,13 +255,13 @@ private long peekMsgFromQueue(boolean isRetry, GetMessageResult getMessageResult } if (getMessageTmpResult != null) { if (!getMessageTmpResult.getMessageMapedList().isEmpty() && !isRetry) { - Attributes attributes = BrokerMetricsManager.newAttributesBuilder() + Attributes attributes = this.brokerController.getBrokerMetricsManager().newAttributesBuilder() .put(LABEL_TOPIC, requestHeader.getTopic()) .put(LABEL_CONSUMER_GROUP, requestHeader.getConsumerGroup()) .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(requestHeader.getTopic()) || MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup())) .build(); - BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), attributes); - BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(), attributes); + this.brokerController.getBrokerMetricsManager().getMessagesOutTotal().add(getMessageResult.getMessageCount(), attributes); + this.brokerController.getBrokerMetricsManager().getThroughputOutTotal().add(getMessageResult.getBufferTotalSize(), attributes); } for (SelectMappedBufferResult mappedBuffer : getMessageTmpResult.getMessageMapedList()) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 7d98705576b..83ca35091ea 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -45,7 +45,7 @@ import org.apache.rocketmq.broker.longpolling.PollingResult; import org.apache.rocketmq.broker.longpolling.PopLongPollingService; import org.apache.rocketmq.broker.longpolling.PopRequest; -import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; + import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer; import org.apache.rocketmq.broker.pop.PopConsumerContext; import org.apache.rocketmq.common.BrokerConfig; @@ -788,14 +788,14 @@ private CompletableFuture popMsgFromQueue(String topic, String attemptId, this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), topic, result.getBufferTotalSize()); - Attributes attributes = BrokerMetricsManager.newAttributesBuilder() + Attributes attributes = this.brokerController.getBrokerMetricsManager().newAttributesBuilder() .put(LABEL_TOPIC, requestHeader.getTopic()) .put(LABEL_CONSUMER_GROUP, requestHeader.getConsumerGroup()) .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(requestHeader.getTopic()) || MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup())) .put(LABEL_IS_RETRY, isRetry) .build(); - BrokerMetricsManager.messagesOutTotal.add(result.getMessageCount(), attributes); - BrokerMetricsManager.throughputOutTotal.add(result.getBufferTotalSize(), attributes); + this.brokerController.getBrokerMetricsManager().getMessagesOutTotal().add(result.getMessageCount(), attributes); + this.brokerController.getBrokerMetricsManager().getThroughputOutTotal().add(result.getBufferTotalSize(), attributes); if (isOrder) { this.brokerController.getConsumerOrderInfoManager().update(requestHeader.getAttemptId(), isRetry, topic, diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java index 2be41a69d63..ea7f177f397 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java @@ -31,7 +31,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Triple; import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; + import org.apache.rocketmq.broker.metrics.PopMetricsManager; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullStatus; @@ -223,13 +223,13 @@ public PullResult getMessage(String group, String topic, int queueId, long offse brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId, brokerController.getMessageStore().now() - foundList.get(foundList.size() - 1).getStoreTimestamp()); - Attributes attributes = BrokerMetricsManager.newAttributesBuilder() + Attributes attributes = this.brokerController.getBrokerMetricsManager().newAttributesBuilder() .put(LABEL_TOPIC, topic) .put(LABEL_CONSUMER_GROUP, group) .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic) || MixAll.isSysConsumerGroup(group)) .build(); - BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), attributes); - BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(), attributes); + this.brokerController.getBrokerMetricsManager().getMessagesOutTotal().add(getMessageResult.getMessageCount(), attributes); + this.brokerController.getBrokerMetricsManager().getThroughputOutTotal().add(getMessageResult.getBufferTotalSize(), attributes); break; case NO_MATCHED_MESSAGE: diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java index a70b48debe1..9b2bbc34e86 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java @@ -298,14 +298,14 @@ private void handlePutMessageResult(PutMessageResult putMessageResult, this.brokerController.getBrokerStatsManager().incBrokerPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum()); if (!BrokerMetricsManager.isRetryOrDlqTopic(msg.getTopic())) { - Attributes attributes = BrokerMetricsManager.newAttributesBuilder() + Attributes attributes = this.brokerController.getBrokerMetricsManager().newAttributesBuilder() .put(LABEL_TOPIC, msg.getTopic()) .put(LABEL_MESSAGE_TYPE, messageType.getMetricsValue()) .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(msg.getTopic())) .build(); - BrokerMetricsManager.messagesInTotal.add(putMessageResult.getAppendMessageResult().getMsgNum(), attributes); - BrokerMetricsManager.throughputInTotal.add(putMessageResult.getAppendMessageResult().getWroteBytes(), attributes); - BrokerMetricsManager.messageSize.record(putMessageResult.getAppendMessageResult().getWroteBytes() / putMessageResult.getAppendMessageResult().getMsgNum(), attributes); + this.brokerController.getBrokerMetricsManager().getMessagesInTotal().add(putMessageResult.getAppendMessageResult().getMsgNum(), attributes); + this.brokerController.getBrokerMetricsManager().getThroughputInTotal().add(putMessageResult.getAppendMessageResult().getWroteBytes(), attributes); + this.brokerController.getBrokerMetricsManager().getMessageSize().record(putMessageResult.getAppendMessageResult().getWroteBytes() / putMessageResult.getAppendMessageResult().getMsgNum(), attributes); } responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId()); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 669cd5e6771..6d60290a58f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -207,12 +207,12 @@ private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, Remoti } if (reconsumeTimes > maxReconsumeTimes || sendRetryMessageToDeadLetterQueueDirectly) { - Attributes attributes = BrokerMetricsManager.newAttributesBuilder() + Attributes attributes = this.brokerController.getBrokerMetricsManager().newAttributesBuilder() .put(LABEL_CONSUMER_GROUP, requestHeader.getProducerGroup()) .put(LABEL_TOPIC, requestHeader.getTopic()) .put(LABEL_IS_SYSTEM, BrokerMetricsManager.isSystem(requestHeader.getTopic(), requestHeader.getProducerGroup())) .build(); - BrokerMetricsManager.sendToDlqMessages.add(1, attributes); + this.brokerController.getBrokerMetricsManager().getSendToDlqMessages().add(1, attributes); properties.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "-1"); newTopic = MixAll.getDLQTopic(groupName); @@ -468,14 +468,14 @@ private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult (int) (this.brokerController.getMessageStore().now() - beginTimeMillis)); if (!BrokerMetricsManager.isRetryOrDlqTopic(msg.getTopic())) { - Attributes attributes = BrokerMetricsManager.newAttributesBuilder() + Attributes attributes = this.brokerController.getBrokerMetricsManager().newAttributesBuilder() .put(LABEL_TOPIC, msg.getTopic()) .put(LABEL_MESSAGE_TYPE, messageType.getMetricsValue()) .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(msg.getTopic())) .build(); - BrokerMetricsManager.messagesInTotal.add(putMessageResult.getAppendMessageResult().getMsgNum(), attributes); - BrokerMetricsManager.throughputInTotal.add(putMessageResult.getAppendMessageResult().getWroteBytes(), attributes); - BrokerMetricsManager.messageSize.record(putMessageResult.getAppendMessageResult().getWroteBytes() / putMessageResult.getAppendMessageResult().getMsgNum(), attributes); + this.brokerController.getBrokerMetricsManager().getMessagesInTotal().add(putMessageResult.getAppendMessageResult().getMsgNum(), attributes); + this.brokerController.getBrokerMetricsManager().getThroughputInTotal().add(putMessageResult.getAppendMessageResult().getWroteBytes(), attributes); + this.brokerController.getBrokerMetricsManager().getMessageSize().record(putMessageResult.getAppendMessageResult().getWroteBytes() / putMessageResult.getAppendMessageResult().getMsgNum(), attributes); } response.setRemark(null); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java index 25c24aff987..bec75fe2fb6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java @@ -32,7 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; + import org.apache.rocketmq.common.ConfigManager; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ThreadFactoryImpl; @@ -722,26 +722,26 @@ public void onSuccess(PutMessageResult result) { ScheduleMessageService.this.brokerController.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, result.getAppendMessageResult().getMsgNum()); ScheduleMessageService.this.brokerController.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, result.getAppendMessageResult().getWroteBytes()); - Attributes attributes = BrokerMetricsManager.newAttributesBuilder() + Attributes attributes = ScheduleMessageService.this.brokerController.getBrokerMetricsManager().newAttributesBuilder() .put(LABEL_TOPIC, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC) .put(LABEL_CONSUMER_GROUP, MixAll.SCHEDULE_CONSUMER_GROUP) .put(LABEL_IS_SYSTEM, true) .build(); - BrokerMetricsManager.messagesOutTotal.add(result.getAppendMessageResult().getMsgNum(), attributes); - BrokerMetricsManager.throughputOutTotal.add(result.getAppendMessageResult().getWroteBytes(), attributes); + ScheduleMessageService.this.brokerController.getBrokerMetricsManager().getMessagesOutTotal().add(result.getAppendMessageResult().getMsgNum(), attributes); + ScheduleMessageService.this.brokerController.getBrokerMetricsManager().getThroughputOutTotal().add(result.getAppendMessageResult().getWroteBytes(), attributes); ScheduleMessageService.this.brokerController.getBrokerStatsManager().incTopicPutNums(this.topic, result.getAppendMessageResult().getMsgNum(), 1); ScheduleMessageService.this.brokerController.getBrokerStatsManager().incTopicPutSize(this.topic, result.getAppendMessageResult().getWroteBytes()); ScheduleMessageService.this.brokerController.getBrokerStatsManager().incBrokerPutNums(this.topic, result.getAppendMessageResult().getMsgNum()); - attributes = BrokerMetricsManager.newAttributesBuilder() + attributes = ScheduleMessageService.this.brokerController.getBrokerMetricsManager().newAttributesBuilder() .put(LABEL_TOPIC, topic) .put(LABEL_MESSAGE_TYPE, TopicMessageType.DELAY.getMetricsValue()) .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic)) .build(); - BrokerMetricsManager.messagesInTotal.add(result.getAppendMessageResult().getMsgNum(), attributes); - BrokerMetricsManager.throughputInTotal.add(result.getAppendMessageResult().getWroteBytes(), attributes); - BrokerMetricsManager.messageSize.record(result.getAppendMessageResult().getWroteBytes() / result.getAppendMessageResult().getMsgNum(), attributes); + ScheduleMessageService.this.brokerController.getBrokerMetricsManager().getMessagesInTotal().add(result.getAppendMessageResult().getMsgNum(), attributes); + ScheduleMessageService.this.brokerController.getBrokerMetricsManager().getThroughputInTotal().add(result.getAppendMessageResult().getWroteBytes(), attributes); + ScheduleMessageService.this.brokerController.getBrokerMetricsManager().getMessageSize().record(result.getAppendMessageResult().getWroteBytes() / result.getAppendMessageResult().getMsgNum(), attributes); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java index 2383f4f917c..5fcc1f56b78 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java @@ -27,7 +27,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; + import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullStatus; import org.apache.rocketmq.common.MixAll; @@ -146,13 +146,13 @@ private PullResult getMessage(String group, String topic, int queueId, long offs this.brokerController.getMessageStore().now() - foundList.get(foundList.size() - 1) .getStoreTimestamp()); - Attributes attributes = BrokerMetricsManager.newAttributesBuilder() + Attributes attributes = this.brokerController.getBrokerMetricsManager().newAttributesBuilder() .put(LABEL_TOPIC, topic) .put(LABEL_CONSUMER_GROUP, group) .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic) || MixAll.isSysConsumerGroup(group)) .build(); - BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), attributes); - BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(), attributes); + this.brokerController.getBrokerMetricsManager().getMessagesOutTotal().add(getMessageResult.getMessageCount(), attributes); + this.brokerController.getBrokerMetricsManager().getThroughputOutTotal().add(getMessageResult.getBufferTotalSize(), attributes); break; case NO_MATCHED_MESSAGE: diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java index 3ce1fe3dbdf..db16b3d97e7 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java @@ -74,6 +74,15 @@ public void testBrokerRestart() throws Exception { brokerController.shutdown(); } + @Test + public void testBrokerMetricsManagerInitialization() throws Exception { + BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, new NettyClientConfig(), messageStoreConfig); + assertThat(brokerController.initialize()).isTrue(); + // Verify that brokerMetricsManager is properly initialized and not null + assertThat(brokerController.getBrokerMetricsManager()).isNotNull(); + brokerController.shutdown(); + } + @After public void destroy() { UtilAll.deleteFile(new File(messageStoreConfig.getStorePathRootDir())); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManagerTest.java index 9264eb4b56b..9e4cfa70c18 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManagerTest.java @@ -43,16 +43,36 @@ public class BrokerMetricsManagerTest { + private BrokerMetricsManager createTestBrokerMetricsManager() { + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + String storePathRootDir = System.getProperty("java.io.tmpdir") + File.separator + "store-" + + UUID.randomUUID(); + messageStoreConfig.setStorePathRootDir(storePathRootDir); + BrokerConfig brokerConfig = new BrokerConfig(); + + NettyServerConfig nettyServerConfig = new NettyServerConfig(); + nettyServerConfig.setListenPort(0); + + BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, + new NettyClientConfig(), messageStoreConfig); + + return new BrokerMetricsManager(brokerController); + } + @Test public void testNewAttributesBuilder() { - Attributes attributes = BrokerMetricsManager.newAttributesBuilder().put("a", "b") + BrokerMetricsManager metricsManager = createTestBrokerMetricsManager(); + Attributes attributes = metricsManager.newAttributesBuilder().put("a", "b") .build(); assertThat(attributes.get(AttributeKey.stringKey("a"))).isEqualTo("b"); } @Test public void testCustomizedAttributesBuilder() { - BrokerMetricsManager.attributesBuilderSupplier = () -> new AttributesBuilder() { + BrokerMetricsManager metricsManager = createTestBrokerMetricsManager(); + + // Create a custom attributes builder supplier for testing + metricsManager.setAttributesBuilderSupplier(() -> new AttributesBuilder() { private AttributesBuilder attributesBuilder = Attributes.builder(); @Override @@ -77,8 +97,9 @@ public AttributesBuilder putAll(Attributes attributes) { attributesBuilder.putAll(attributes); return this; } - }; - Attributes attributes = BrokerMetricsManager.newAttributesBuilder().put("a", "b") + }); + + Attributes attributes = metricsManager.newAttributesBuilder().put("a", "b") .build(); assertThat(attributes.get(AttributeKey.stringKey("a"))).isEqualTo("b"); assertThat(attributes.get(AttributeKey.stringKey("customized"))).isEqualTo("value"); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index f3d0eb07820..1bf99eadfba 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -37,6 +37,7 @@ import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.broker.client.ConsumerManager; import org.apache.rocketmq.broker.client.net.Broker2Client; +import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; import org.apache.rocketmq.broker.schedule.ScheduleMessageService; import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager; @@ -238,6 +239,8 @@ public void init() throws Exception { brokerController.setMessageStore(messageStore); brokerController.setAuthenticationMetadataManager(authenticationMetadataManager); brokerController.setAuthorizationMetadataManager(authorizationMetadataManager); + // Initialize BrokerMetricsManager to prevent NPE in tests + brokerController.setBrokerMetricsManager(new BrokerMetricsManager(brokerController)); Field field = BrokerController.class.getDeclaredField("broker2Client"); field.setAccessible(true); field.set(brokerController, broker2Client); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java index e4360f147b0..1751ad96fdb 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java @@ -18,6 +18,7 @@ import io.netty.channel.ChannelHandlerContext; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; import org.apache.rocketmq.broker.transaction.OperationResult; import org.apache.rocketmq.broker.transaction.TransactionMetrics; import org.apache.rocketmq.broker.transaction.TransactionalMessageService; @@ -83,6 +84,8 @@ public void init() { when(transactionMsgService.getTransactionMetrics()).thenReturn(transactionMetrics); brokerController.setMessageStore(messageStore); brokerController.setTransactionalMessageService(transactionMsgService); + // Initialize BrokerMetricsManager to prevent NPE in tests + brokerController.setBrokerMetricsManager(new BrokerMetricsManager(brokerController)); endTransactionProcessor = new EndTransactionProcessor(brokerController); } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PeekMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PeekMessageProcessorTest.java index 9baf2a6ebb3..5722b031b0a 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PeekMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PeekMessageProcessorTest.java @@ -19,6 +19,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.broker.topic.TopicConfigManager; @@ -86,6 +87,8 @@ public class PeekMessageProcessorTest { @Before public void init() { + // Initialize BrokerMetricsManager to prevent NPE in tests + brokerController.setBrokerMetricsManager(new BrokerMetricsManager(brokerController)); peekMessageProcessor = new PeekMessageProcessor(brokerController); when(brokerController.getMessageStore()).thenReturn(messageStore); topicConfigManager = new TopicConfigManager(brokerController); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java index fdb0690e5dc..59559d3cfd0 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java @@ -22,6 +22,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; +import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.TopicConfig; @@ -78,6 +79,8 @@ public class PopMessageProcessorTest { public void init() { brokerController.setMessageStore(messageStore); brokerController.getBrokerConfig().setEnablePopBufferMerge(true); + // Initialize BrokerMetricsManager to prevent NPE in tests + brokerController.setBrokerMetricsManager(new BrokerMetricsManager(brokerController)); popMessageProcessor = new PopMessageProcessor(brokerController); when(handlerContext.channel()).thenReturn(embeddedChannel); brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new TopicConfig(topic)); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java index 83c30111854..cecd1ff86a9 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java @@ -28,6 +28,7 @@ import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.broker.filter.ExpressionMessageFilter; +import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; @@ -82,6 +83,8 @@ public class PullMessageProcessorTest { @Before public void init() { brokerController.setMessageStore(messageStore); + // Initialize BrokerMetricsManager to prevent NPE in tests + brokerController.setBrokerMetricsManager(new BrokerMetricsManager(brokerController)); SubscriptionGroupManager subscriptionGroupManager = new SubscriptionGroupManager(brokerController); pullMessageProcessor = new PullMessageProcessor(brokerController); when(brokerController.getPullMessageProcessor()).thenReturn(pullMessageProcessor); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java index 266c8491cbf..03af9b948ba 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java @@ -25,6 +25,7 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.net.Broker2Client; +import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; @@ -81,6 +82,8 @@ public class ReplyMessageProcessorTest { public void init() throws IllegalAccessException, NoSuchFieldException { clientInfo = new ClientChannelInfo(channel, "127.0.0.1", LanguageCode.JAVA, 0); brokerController.setMessageStore(messageStore); + // Initialize BrokerMetricsManager to prevent NPE in tests + brokerController.setBrokerMetricsManager(new BrokerMetricsManager(brokerController)); Field field = BrokerController.class.getDeclaredField("broker2Client"); field.setAccessible(true); field.set(brokerController, broker2Client); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java index 9da6a96ec99..ce8b3405f63 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.Executors; import org.apache.commons.codec.DecoderException; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; import org.apache.rocketmq.common.AbortProcessException; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook; @@ -102,6 +103,8 @@ public class SendMessageProcessorTest { @Before public void init() { brokerController.setMessageStore(messageStore); + // Initialize BrokerMetricsManager to prevent NPE in tests + brokerController.setBrokerMetricsManager(new BrokerMetricsManager(brokerController)); TopicConfigManager topicConfigManager = new TopicConfigManager(brokerController); topicConfigManager.getTopicConfigTable().put(topic, new TopicConfig(topic)); SubscriptionGroupManager subscriptionGroupManager = new SubscriptionGroupManager(brokerController); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java index b90fb2931d5..675c9a57c86 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/schedule/ScheduleMessageServiceTest.java @@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.failover.EscapeBridge; +import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; import org.apache.rocketmq.broker.util.HookUtils; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.UtilAll; @@ -48,6 +49,9 @@ import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.apache.rocketmq.common.metrics.NopLongCounter; +import org.apache.rocketmq.common.metrics.NopLongHistogram; +import io.opentelemetry.api.common.Attributes; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -132,10 +136,22 @@ public void setUp() throws Exception { Mockito.when(brokerController.getBrokerStatsManager()).thenReturn(manager); EscapeBridge escapeBridge = new EscapeBridge(brokerController); Mockito.when(brokerController.getEscapeBridge()).thenReturn(escapeBridge); - scheduleMessageService = new ScheduleMessageService(brokerController); + // Initialize BrokerMetricsManager to prevent NPE in tests + BrokerMetricsManager brokerMetricsManager = Mockito.mock(BrokerMetricsManager.class); + // Mock newAttributesBuilder to return a valid AttributesBuilder instead of null + Mockito.when(brokerMetricsManager.newAttributesBuilder()).thenReturn(Attributes.builder()); + // Mock metrics getter methods to return Nop implementations to prevent NPE + Mockito.when(brokerMetricsManager.getMessagesInTotal()).thenReturn(new NopLongCounter()); + Mockito.when(brokerMetricsManager.getMessagesOutTotal()).thenReturn(new NopLongCounter()); + Mockito.when(brokerMetricsManager.getThroughputInTotal()).thenReturn(new NopLongCounter()); + Mockito.when(brokerMetricsManager.getThroughputOutTotal()).thenReturn(new NopLongCounter()); + Mockito.when(brokerMetricsManager.getMessageSize()).thenReturn(new NopLongHistogram()); + Mockito.when(brokerController.getBrokerMetricsManager()).thenReturn(brokerMetricsManager); + scheduleMessageService = Mockito.spy(new ScheduleMessageService(brokerController)); + // Mock ScheduleMessageService before it's used in HookUtils + Mockito.when(brokerController.getScheduleMessageService()).thenReturn(scheduleMessageService); scheduleMessageService.load(); scheduleMessageService.start(); - Mockito.when(brokerController.getScheduleMessageService()).thenReturn(scheduleMessageService); } @Test diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/metrics/ProxyMetricsManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/metrics/ProxyMetricsManager.java index 2b8dac5d8be..81db576e3d2 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/metrics/ProxyMetricsManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/metrics/ProxyMetricsManager.java @@ -81,7 +81,7 @@ public static void initLocalMode(BrokerMetricsManager brokerMetricsManager, Prox LABEL_MAP.put(LABEL_CLUSTER_NAME, proxyConfig.getProxyClusterName()); LABEL_MAP.put(LABEL_NODE_ID, proxyConfig.getProxyName()); LABEL_MAP.put(LABEL_PROXY_MODE, proxyConfig.getProxyMode().toLowerCase()); - initMetrics(brokerMetricsManager.getBrokerMeter(), BrokerMetricsManager::newAttributesBuilder); + initMetrics(brokerMetricsManager.getBrokerMeter(), brokerMetricsManager::newAttributesBuilder); } public static ProxyMetricsManager initClusterMode(ProxyConfig proxyConfig) { diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 2bdd058f3fa..4d13acf225d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -160,6 +160,7 @@ public class DefaultMessageStore implements MessageStore { protected StoreCheckpoint storeCheckpoint; private TimerMessageStore timerMessageStore; + private final DefaultStoreMetricsManager defaultStoreMetricsManager; private final LinkedList dispatcherList = new LinkedList<>(); @@ -237,6 +238,8 @@ public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final Br this.transientStorePool = new TransientStorePool(messageStoreConfig.getTransientStorePoolSize(), messageStoreConfig.getMappedFileSizeCommitLog()); + this.defaultStoreMetricsManager = new DefaultStoreMetricsManager(); + this.scheduledExecutorService = ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity())); @@ -2970,12 +2973,12 @@ public long estimateMessageCount(String topic, int queueId, long from, long to, @Override public List> getMetricsView() { - return DefaultStoreMetricsManager.getMetricsView(); + return this.defaultStoreMetricsManager.getMetricsView(); } @Override public void initMetrics(Meter meter, Supplier attributesBuilderSupplier) { - DefaultStoreMetricsManager.init(meter, attributesBuilderSupplier, this); + this.defaultStoreMetricsManager.init(meter, attributesBuilderSupplier, this); } /** @@ -3021,4 +3024,8 @@ public void setNotifyMessageArriveInBatch(boolean notifyMessageArriveInBatch) { this.notifyMessageArriveInBatch = notifyMessageArriveInBatch; } + public DefaultStoreMetricsManager getDefaultStoreMetricsManager() { + return defaultStoreMetricsManager; + } + } diff --git a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java index ef72de8baa8..8d3963bb4a1 100644 --- a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java +++ b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java @@ -64,26 +64,29 @@ import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_TOPIC; public class DefaultStoreMetricsManager { - public static Supplier attributesBuilderSupplier; - public static MessageStoreConfig messageStoreConfig; - - public static ObservableLongGauge storageSize = new NopObservableLongGauge(); - public static ObservableLongGauge flushBehind = new NopObservableLongGauge(); - public static ObservableLongGauge dispatchBehind = new NopObservableLongGauge(); - public static ObservableLongGauge messageReserveTime = new NopObservableLongGauge(); - - public static ObservableLongGauge timerEnqueueLag = new NopObservableLongGauge(); - public static ObservableLongGauge timerEnqueueLatency = new NopObservableLongGauge(); - public static ObservableLongGauge timerDequeueLag = new NopObservableLongGauge(); - public static ObservableLongGauge timerDequeueLatency = new NopObservableLongGauge(); - public static ObservableLongGauge timingMessages = new NopObservableLongGauge(); - - public static LongCounter timerDequeueTotal = new NopLongCounter(); - public static LongCounter timerEnqueueTotal = new NopLongCounter(); - public static ObservableLongGauge timerMessageSnapshot = new NopObservableLongGauge(); - public static LongHistogram timerMessageSetLatency = new NopLongHistogram(); - - public static List> getMetricsView() { + private Supplier attributesBuilderSupplier; + private MessageStoreConfig messageStoreConfig; + + private ObservableLongGauge storageSize = new NopObservableLongGauge(); + private ObservableLongGauge flushBehind = new NopObservableLongGauge(); + private ObservableLongGauge dispatchBehind = new NopObservableLongGauge(); + private ObservableLongGauge messageReserveTime = new NopObservableLongGauge(); + + private ObservableLongGauge timerEnqueueLag = new NopObservableLongGauge(); + private ObservableLongGauge timerEnqueueLatency = new NopObservableLongGauge(); + private ObservableLongGauge timerDequeueLag = new NopObservableLongGauge(); + private ObservableLongGauge timerDequeueLatency = new NopObservableLongGauge(); + private ObservableLongGauge timingMessages = new NopObservableLongGauge(); + + private LongCounter timerDequeueTotal = new NopLongCounter(); + private LongCounter timerEnqueueTotal = new NopLongCounter(); + private ObservableLongGauge timerMessageSnapshot = new NopObservableLongGauge(); + private LongHistogram timerMessageSetLatency = new NopLongHistogram(); + + public DefaultStoreMetricsManager() { + } + + public List> getMetricsView() { List rpcCostTimeBuckets = Arrays.asList( // day * hour * min * second 1d * 1 * 1 * 60, // 60 second @@ -102,42 +105,42 @@ public static List> getMetricsView() { return Lists.newArrayList(new Pair<>(selector, viewBuilder)); } - public static void init(Meter meter, Supplier attributesBuilderSupplier, + public void init(Meter meter, Supplier attributesBuilderSupplier, DefaultMessageStore messageStore) { // Also add some metrics for rocksdb's monitoring. RocksDBStoreMetricsManager.init(meter, attributesBuilderSupplier, messageStore.getQueueStore()); - DefaultStoreMetricsManager.attributesBuilderSupplier = attributesBuilderSupplier; - DefaultStoreMetricsManager.messageStoreConfig = messageStore.getMessageStoreConfig(); + this.attributesBuilderSupplier = attributesBuilderSupplier; + this.messageStoreConfig = messageStore.getMessageStoreConfig(); - storageSize = meter.gaugeBuilder(GAUGE_STORAGE_SIZE) + this.storageSize = meter.gaugeBuilder(GAUGE_STORAGE_SIZE) .setDescription("Broker storage size") .setUnit("bytes") .ofLongs() .buildWithCallback(measurement -> { - File storeDir = new File(messageStoreConfig.getStorePathRootDir()); + File storeDir = new File(this.messageStoreConfig.getStorePathRootDir()); if (storeDir.exists() && storeDir.isDirectory()) { long totalSpace = storeDir.getTotalSpace(); if (totalSpace > 0) { - measurement.record(totalSpace - storeDir.getFreeSpace(), newAttributesBuilder().build()); + measurement.record(totalSpace - storeDir.getFreeSpace(), this.newAttributesBuilder().build()); } } }); - flushBehind = meter.gaugeBuilder(GAUGE_STORAGE_FLUSH_BEHIND) + this.flushBehind = meter.gaugeBuilder(GAUGE_STORAGE_FLUSH_BEHIND) .setDescription("Broker flush behind bytes") .setUnit("bytes") .ofLongs() - .buildWithCallback(measurement -> measurement.record(messageStore.flushBehindBytes(), newAttributesBuilder().build())); + .buildWithCallback(measurement -> measurement.record(messageStore.flushBehindBytes(), this.newAttributesBuilder().build())); - dispatchBehind = meter.gaugeBuilder(GAUGE_STORAGE_DISPATCH_BEHIND) + this.dispatchBehind = meter.gaugeBuilder(GAUGE_STORAGE_DISPATCH_BEHIND) .setDescription("Broker dispatch behind bytes") .setUnit("bytes") .ofLongs() - .buildWithCallback(measurement -> measurement.record(messageStore.dispatchBehindBytes(), newAttributesBuilder().build())); + .buildWithCallback(measurement -> measurement.record(messageStore.dispatchBehindBytes(), this.newAttributesBuilder().build())); - messageReserveTime = meter.gaugeBuilder(GAUGE_STORAGE_MESSAGE_RESERVE_TIME) + this.messageReserveTime = meter.gaugeBuilder(GAUGE_STORAGE_MESSAGE_RESERVE_TIME) .setDescription("Broker message reserve time") .setUnit("milliseconds") .ofLongs() @@ -146,42 +149,42 @@ public static void init(Meter meter, Supplier attributesBuild if (earliestMessageTime <= 0) { return; } - measurement.record(System.currentTimeMillis() - earliestMessageTime, newAttributesBuilder().build()); + measurement.record(System.currentTimeMillis() - earliestMessageTime, this.newAttributesBuilder().build()); }); if (messageStore.getMessageStoreConfig().isTimerWheelEnable()) { - timerEnqueueLag = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LAG) + this.timerEnqueueLag = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LAG) .setDescription("Timer enqueue messages lag") .ofLongs() .buildWithCallback(measurement -> { TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore(); - measurement.record(timerMessageStore.getEnqueueBehindMessages(), newAttributesBuilder().build()); + measurement.record(timerMessageStore.getEnqueueBehindMessages(), this.newAttributesBuilder().build()); }); - timerEnqueueLatency = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LATENCY) + this.timerEnqueueLatency = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LATENCY) .setDescription("Timer enqueue latency") .setUnit("milliseconds") .ofLongs() .buildWithCallback(measurement -> { TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore(); - measurement.record(timerMessageStore.getEnqueueBehindMillis(), newAttributesBuilder().build()); + measurement.record(timerMessageStore.getEnqueueBehindMillis(), this.newAttributesBuilder().build()); }); - timerDequeueLag = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LAG) + this.timerDequeueLag = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LAG) .setDescription("Timer dequeue messages lag") .ofLongs() .buildWithCallback(measurement -> { TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore(); - measurement.record(timerMessageStore.getDequeueBehindMessages(), newAttributesBuilder().build()); + measurement.record(timerMessageStore.getDequeueBehindMessages(), this.newAttributesBuilder().build()); }); - timerDequeueLatency = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LATENCY) + this.timerDequeueLatency = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LATENCY) .setDescription("Timer dequeue latency") .setUnit("milliseconds") .ofLongs() .buildWithCallback(measurement -> { TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore(); - measurement.record(timerMessageStore.getDequeueBehind(), newAttributesBuilder().build()); + measurement.record(timerMessageStore.getDequeueBehind(), this.newAttributesBuilder().build()); }); - timingMessages = meter.gaugeBuilder(GAUGE_TIMING_MESSAGES) + this.timingMessages = meter.gaugeBuilder(GAUGE_TIMING_MESSAGES) .setDescription("Current message number in timing") .ofLongs() .buildWithCallback(measurement -> { @@ -191,23 +194,23 @@ public static void init(Meter meter, Supplier attributesBuild .forEach((topic, metric) -> { measurement.record( metric.getCount().get(), - newAttributesBuilder().put(LABEL_TOPIC, topic).build() + this.newAttributesBuilder().put(LABEL_TOPIC, topic).build() ); }); }); - timerDequeueTotal = meter.counterBuilder(COUNTER_TIMER_DEQUEUE_TOTAL) + this.timerDequeueTotal = meter.counterBuilder(COUNTER_TIMER_DEQUEUE_TOTAL) .setDescription("Total number of timer dequeue") .build(); - timerEnqueueTotal = meter.counterBuilder(COUNTER_TIMER_ENQUEUE_TOTAL) + this.timerEnqueueTotal = meter.counterBuilder(COUNTER_TIMER_ENQUEUE_TOTAL) .setDescription("Total number of timer enqueue") .build(); - timerMessageSnapshot = meter.gaugeBuilder(GAUGE_TIMER_MESSAGE_SNAPSHOT) + this.timerMessageSnapshot = meter.gaugeBuilder(GAUGE_TIMER_MESSAGE_SNAPSHOT) .setDescription("Timer message distribution snapshot, only count timing messages in 24h.") .ofLongs() .buildWithCallback(measurement -> { TimerMetrics timerMetrics = messageStore.getTimerMessageStore().getTimerMetrics(); TimerWheel timerWheel = messageStore.getTimerMessageStore().getTimerWheel(); - int precisionMs = messageStoreConfig.getTimerPrecisionMs(); + int precisionMs = this.messageStoreConfig.getTimerPrecisionMs(); List timerDist = timerMetrics.getTimerDistList(); long currTime = System.currentTimeMillis() / precisionMs * precisionMs; for (int i = 0; i < timerDist.size(); i++) { @@ -218,10 +221,10 @@ public static void init(Meter meter, Supplier attributesBuild Slot slotEach = timerWheel.getSlot(currTime + (long) j * precisionMs); periodTotal += slotEach.num; } - measurement.record(periodTotal, newAttributesBuilder().put(LABEL_TIMING_BOUND, timerDist.get(i).toString()).build()); + measurement.record(periodTotal, this.newAttributesBuilder().put(LABEL_TIMING_BOUND, timerDist.get(i).toString()).build()); } }); - timerMessageSetLatency = meter.histogramBuilder(HISTOGRAM_DELAY_MSG_LATENCY) + this.timerMessageSetLatency = meter.histogramBuilder(HISTOGRAM_DELAY_MSG_LATENCY) .setDescription("Timer message set latency distribution") .setUnit("seconds") .ofLongs() @@ -229,26 +232,96 @@ public static void init(Meter meter, Supplier attributesBuild } } - public static void incTimerDequeueCount(String topic) { - timerDequeueTotal.add(1, newAttributesBuilder() + public void incTimerDequeueCount(String topic) { + this.timerDequeueTotal.add(1, this.newAttributesBuilder() .put(LABEL_TOPIC, topic) .build()); } - public static void incTimerEnqueueCount(String topic) { - AttributesBuilder attributesBuilder = newAttributesBuilder(); + public void incTimerEnqueueCount(String topic) { + AttributesBuilder attributesBuilder = this.newAttributesBuilder(); if (topic != null) { attributesBuilder.put(LABEL_TOPIC, topic); } - timerEnqueueTotal.add(1, attributesBuilder.build()); + this.timerEnqueueTotal.add(1, attributesBuilder.build()); } - public static AttributesBuilder newAttributesBuilder() { - if (attributesBuilderSupplier == null) { + public AttributesBuilder newAttributesBuilder() { + if (this.attributesBuilderSupplier == null) { return Attributes.builder(); } - return attributesBuilderSupplier.get() + return this.attributesBuilderSupplier.get() .put(LABEL_STORAGE_TYPE, DEFAULT_STORAGE_TYPE) .put(LABEL_STORAGE_MEDIUM, DEFAULT_STORAGE_MEDIUM); } + + // Getter methods for external access + public Supplier getAttributesBuilderSupplier() { + return attributesBuilderSupplier; + } + + public MessageStoreConfig getMessageStoreConfig() { + return messageStoreConfig; + } + + public ObservableLongGauge getStorageSize() { + return storageSize; + } + + public ObservableLongGauge getFlushBehind() { + return flushBehind; + } + + public ObservableLongGauge getDispatchBehind() { + return dispatchBehind; + } + + public ObservableLongGauge getMessageReserveTime() { + return messageReserveTime; + } + + public ObservableLongGauge getTimerEnqueueLag() { + return timerEnqueueLag; + } + + public ObservableLongGauge getTimerEnqueueLatency() { + return timerEnqueueLatency; + } + + public ObservableLongGauge getTimerDequeueLag() { + return timerDequeueLag; + } + + public ObservableLongGauge getTimerDequeueLatency() { + return timerDequeueLatency; + } + + public ObservableLongGauge getTimingMessages() { + return timingMessages; + } + + public LongCounter getTimerDequeueTotal() { + return timerDequeueTotal; + } + + public LongCounter getTimerEnqueueTotal() { + return timerEnqueueTotal; + } + + public ObservableLongGauge getTimerMessageSnapshot() { + return timerMessageSnapshot; + } + + public LongHistogram getTimerMessageSetLatency() { + return timerMessageSetLatency; + } + + // Setter methods for testing + public void setAttributesBuilderSupplier(Supplier attributesBuilderSupplier) { + this.attributesBuilderSupplier = attributesBuilderSupplier; + } + + public void setMessageStoreConfig(MessageStoreConfig messageStoreConfig) { + this.messageStoreConfig = messageStoreConfig; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java index d6af7b84e79..1f51a063d63 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java @@ -701,9 +701,13 @@ public boolean enqueue(int queueId) { return false; } } - Attributes attributes = DefaultStoreMetricsManager.newAttributesBuilder() + // Record timer message set latency + if (messageStore instanceof DefaultMessageStore) { + DefaultStoreMetricsManager metricsManager = ((DefaultMessageStore) messageStore).getDefaultStoreMetricsManager(); + Attributes attributes = metricsManager.newAttributesBuilder() .put(DefaultStoreMetricsConstant.LABEL_TOPIC, msgExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC)).build(); - DefaultStoreMetricsManager.timerMessageSetLatency.record((delayedTime - msgExt.getBornTimestamp()) / 1000, attributes); + metricsManager.getTimerMessageSetLatency().record((delayedTime - msgExt.getBornTimestamp()) / 1000, attributes); + } } } catch (Exception e) { // here may cause the message loss @@ -1366,7 +1370,9 @@ protected List fetchTimerRequests() throws InterruptedException { protected void putMessageToTimerWheel(TimerRequest req) { try { perfCounterTicks.startTick(ENQUEUE_PUT); - DefaultStoreMetricsManager.incTimerEnqueueCount(getRealTopic(req.getMsg())); + if (messageStore instanceof DefaultMessageStore) { + ((DefaultMessageStore) messageStore).getDefaultStoreMetricsManager().incTimerEnqueueCount(getRealTopic(req.getMsg())); + } if (shouldRunningDequeue && req.getDelayTime() < currWriteTimeMs) { req.setEnqueueTime(Long.MAX_VALUE); dequeuePutQueue.put(req); @@ -1502,7 +1508,9 @@ public void run() { perfCounterTicks.startTick(DEQUEUE_PUT); MessageExt msgExt = tr.getMsg(); - DefaultStoreMetricsManager.incTimerDequeueCount(getRealTopic(msgExt)); + if (messageStore instanceof DefaultMessageStore) { + ((DefaultMessageStore) messageStore).getDefaultStoreMetricsManager().incTimerDequeueCount(getRealTopic(msgExt)); + } if (tr.getEnqueueTime() == Long.MAX_VALUE) { // Never enqueue, mark it.