Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,10 @@ public BrokerMetricsManager getBrokerMetricsManager() {
return brokerMetricsManager;
}

public void setBrokerMetricsManager(BrokerMetricsManager brokerMetricsManager) {
this.brokerMetricsManager = brokerMetricsManager;
}

protected void initializeRemotingServer() throws CloneNotSupportedException {
RemotingServer tcpRemotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,46 +121,45 @@ public class BrokerMetricsManager {
private final MessageStore messageStore;
private final BrokerController brokerController;
private final ConsumerLagCalculator consumerLagCalculator;
private final static Map<String, String> LABEL_MAP = new HashMap<>();
private final Map<String, String> labelMap = new HashMap<>();
private OtlpGrpcMetricExporter metricExporter;
private PeriodicMetricReader periodicMetricReader;
private PrometheusHttpServer prometheusHttpServer;
private MetricExporter loggingMetricExporter;
private Meter brokerMeter;

public static Supplier<AttributesBuilder> attributesBuilderSupplier = Attributes::builder;
private Supplier<AttributesBuilder> attributesBuilderSupplier = Attributes::builder;

// broker stats metrics
public static ObservableLongGauge processorWatermark = new NopObservableLongGauge();
public static ObservableLongGauge brokerPermission = new NopObservableLongGauge();
public static ObservableLongGauge topicNum = new NopObservableLongGauge();
public static ObservableLongGauge consumerGroupNum = new NopObservableLongGauge();

private ObservableLongGauge processorWatermark = new NopObservableLongGauge();
private ObservableLongGauge brokerPermission = new NopObservableLongGauge();
private ObservableLongGauge topicNum = new NopObservableLongGauge();
private ObservableLongGauge consumerGroupNum = new NopObservableLongGauge();

// request metrics
public static LongCounter messagesInTotal = new NopLongCounter();
public static LongCounter messagesOutTotal = new NopLongCounter();
public static LongCounter throughputInTotal = new NopLongCounter();
public static LongCounter throughputOutTotal = new NopLongCounter();
public static LongHistogram messageSize = new NopLongHistogram();
public static LongHistogram topicCreateExecuteTime = new NopLongHistogram();
public static LongHistogram consumerGroupCreateExecuteTime = new NopLongHistogram();
private LongCounter messagesInTotal = new NopLongCounter();
private LongCounter messagesOutTotal = new NopLongCounter();
private LongCounter throughputInTotal = new NopLongCounter();
private LongCounter throughputOutTotal = new NopLongCounter();
private LongHistogram messageSize = new NopLongHistogram();
private LongHistogram topicCreateExecuteTime = new NopLongHistogram();
private LongHistogram consumerGroupCreateExecuteTime = new NopLongHistogram();

// client connection metrics
public static ObservableLongGauge producerConnection = new NopObservableLongGauge();
public static ObservableLongGauge consumerConnection = new NopObservableLongGauge();
private ObservableLongGauge producerConnection = new NopObservableLongGauge();
private ObservableLongGauge consumerConnection = new NopObservableLongGauge();

// Lag metrics
public static ObservableLongGauge consumerLagMessages = new NopObservableLongGauge();
public static ObservableLongGauge consumerLagLatency = new NopObservableLongGauge();
public static ObservableLongGauge consumerInflightMessages = new NopObservableLongGauge();
public static ObservableLongGauge consumerQueueingLatency = new NopObservableLongGauge();
public static ObservableLongGauge consumerReadyMessages = new NopObservableLongGauge();
public static LongCounter sendToDlqMessages = new NopLongCounter();
public static ObservableLongGauge halfMessages = new NopObservableLongGauge();
public static LongCounter commitMessagesTotal = new NopLongCounter();
public static LongCounter rollBackMessagesTotal = new NopLongCounter();
public static LongHistogram transactionFinishLatency = new NopLongHistogram();
private ObservableLongGauge consumerLagMessages = new NopObservableLongGauge();
private ObservableLongGauge consumerLagLatency = new NopObservableLongGauge();
private ObservableLongGauge consumerInflightMessages = new NopObservableLongGauge();
private ObservableLongGauge consumerQueueingLatency = new NopObservableLongGauge();
private ObservableLongGauge consumerReadyMessages = new NopObservableLongGauge();
private LongCounter sendToDlqMessages = new NopLongCounter();
private ObservableLongGauge halfMessages = new NopObservableLongGauge();
private LongCounter commitMessagesTotal = new NopLongCounter();
private LongCounter rollBackMessagesTotal = new NopLongCounter();
private LongHistogram transactionFinishLatency = new NopLongHistogram();

@SuppressWarnings("DoubleBraceInitialization")
public static final List<String> SYSTEM_GROUP_PREFIX_LIST = new ArrayList<String>() {
Expand All @@ -177,13 +176,13 @@ public BrokerMetricsManager(BrokerController brokerController) {
init();
}

public static AttributesBuilder newAttributesBuilder() {
public AttributesBuilder newAttributesBuilder() {
AttributesBuilder attributesBuilder;
if (attributesBuilderSupplier == null) {
attributesBuilderSupplier = Attributes::builder;
}
attributesBuilder = attributesBuilderSupplier.get();
LABEL_MAP.forEach(attributesBuilder::put);
labelMap.forEach(attributesBuilder::put);
return attributesBuilder;
}

Expand Down Expand Up @@ -242,6 +241,56 @@ public Meter getBrokerMeter() {
return brokerMeter;
}

// Getter methods for metrics variables
public LongCounter getMessagesInTotal() {
return messagesInTotal;
}

public LongCounter getMessagesOutTotal() {
return messagesOutTotal;
}

public LongCounter getThroughputInTotal() {
return throughputInTotal;
}

public LongCounter getThroughputOutTotal() {
return throughputOutTotal;
}

public LongHistogram getMessageSize() {
return messageSize;
}

public LongCounter getSendToDlqMessages() {
return sendToDlqMessages;
}

public LongCounter getCommitMessagesTotal() {
return commitMessagesTotal;
}

public LongCounter getRollBackMessagesTotal() {
return rollBackMessagesTotal;
}

public LongHistogram getTransactionFinishLatency() {
return transactionFinishLatency;
}

public LongHistogram getTopicCreateExecuteTime() {
return topicCreateExecuteTime;
}

public LongHistogram getConsumerGroupCreateExecuteTime() {
return consumerGroupCreateExecuteTime;
}

// Setter method for testing purposes
public void setAttributesBuilderSupplier(Supplier<AttributesBuilder> attributesBuilderSupplier) {
this.attributesBuilderSupplier = attributesBuilderSupplier;
}

private boolean checkConfig() {
if (brokerConfig == null) {
return false;
Expand Down Expand Up @@ -282,15 +331,15 @@ private void init() {
LOGGER.warn("metricsLabel is not valid: {}", labels);
continue;
}
LABEL_MAP.put(split[0], split[1]);
labelMap.put(split[0], split[1]);
}
}
if (brokerConfig.isMetricsInDelta()) {
LABEL_MAP.put(LABEL_AGGREGATION, AGGREGATION_DELTA);
labelMap.put(LABEL_AGGREGATION, AGGREGATION_DELTA);
}
LABEL_MAP.put(LABEL_NODE_TYPE, NODE_TYPE_BROKER);
LABEL_MAP.put(LABEL_CLUSTER_NAME, brokerConfig.getBrokerClusterName());
LABEL_MAP.put(LABEL_NODE_ID, brokerConfig.getBrokerName());
labelMap.put(LABEL_NODE_TYPE, NODE_TYPE_BROKER);
labelMap.put(LABEL_CLUSTER_NAME, brokerConfig.getBrokerClusterName());
labelMap.put(LABEL_NODE_ID, brokerConfig.getBrokerName());

SdkMeterProviderBuilder providerBuilder = SdkMeterProvider.builder()
.setResource(Resource.empty());
Expand Down Expand Up @@ -699,13 +748,13 @@ private void initTransactionMetrics() {
}
private void initOtherMetrics() {
if (brokerConfig.isEnableRemotingMetrics()) {
RemotingMetricsManager.initMetrics(brokerMeter, BrokerMetricsManager::newAttributesBuilder);
RemotingMetricsManager.initMetrics(brokerMeter, this::newAttributesBuilder);
}
if (brokerConfig.isEnableMessageStoreMetrics()) {
messageStore.initMetrics(brokerMeter, BrokerMetricsManager::newAttributesBuilder);
messageStore.initMetrics(brokerMeter, this::newAttributesBuilder);
}
if (brokerConfig.isEnablePopMetrics()) {
PopMetricsManager.initMetrics(brokerMeter, brokerController, BrokerMetricsManager::newAttributesBuilder);
PopMetricsManager.initMetrics(brokerMeter, brokerController, this::newAttributesBuilder);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,12 @@ protected RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, f
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {

Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
Attributes attributes = this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
.put(LABEL_CONSUMER_GROUP, requestHeader.getGroup())
.put(LABEL_TOPIC, requestHeader.getOriginTopic())
.put(LABEL_IS_SYSTEM, BrokerMetricsManager.isSystem(requestHeader.getOriginTopic(), requestHeader.getGroup()))
.build();
BrokerMetricsManager.sendToDlqMessages.add(1, attributes);
this.brokerController.getBrokerMetricsManager().getSendToDlqMessages().add(1, attributes);

isDLQ = true;
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
import org.apache.rocketmq.broker.controller.ReplicasManager;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;

import org.apache.rocketmq.broker.metrics.InvocationStatus;
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
Expand Down Expand Up @@ -614,11 +614,11 @@ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext
executionTime = System.currentTimeMillis() - startTime;
InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ?
InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
Attributes attributes = this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
.put(LABEL_INVOCATION_STATUS, status.getName())
.put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic))
.build();
BrokerMetricsManager.topicCreateExecuteTime.record(executionTime, attributes);
this.brokerController.getBrokerMetricsManager().getTopicCreateExecuteTime().record(executionTime, attributes);
}
LOGGER.info("executionTime of create topic:{} is {} ms", topic, executionTime);
return response;
Expand Down Expand Up @@ -695,11 +695,11 @@ private synchronized RemotingCommand updateAndCreateTopicList(ChannelHandlerCont
executionTime = System.currentTimeMillis() - startTime;
InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ?
InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
Attributes attributes = this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
.put(LABEL_INVOCATION_STATUS, status.getName())
.put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topicNames))
.build();
BrokerMetricsManager.topicCreateExecuteTime.record(executionTime, attributes);
this.brokerController.getBrokerMetricsManager().getTopicCreateExecuteTime().record(executionTime, attributes);
}
LOGGER.info("executionTime of all topics:{} is {} ms", topicNames, executionTime);
return response;
Expand Down Expand Up @@ -1551,10 +1551,10 @@ private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext c
}
InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ?
InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
Attributes attributes = this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
.put(LABEL_INVOCATION_STATUS, status.getName())
.build();
BrokerMetricsManager.consumerGroupCreateExecuteTime.record(executionTime, attributes);
this.brokerController.getBrokerMetricsManager().getConsumerGroupCreateExecuteTime().record(executionTime, attributes);
return response;
}

Expand Down Expand Up @@ -1589,10 +1589,10 @@ private RemotingCommand updateAndCreateSubscriptionGroupList(ChannelHandlerConte
LOGGER.info("executionTime of create updateAndCreateSubscriptionGroupList: {} is {} ms", groupNames, executionTime);
InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ?
InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
Attributes attributes = this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
.put(LABEL_INVOCATION_STATUS, status.getName())
.build();
BrokerMetricsManager.consumerGroupCreateExecuteTime.record(executionTime, attributes);
this.brokerController.getBrokerMetricsManager().getConsumerGroupCreateExecuteTime().record(executionTime, attributes);
}

return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,13 @@ public RemotingCommand handle(final GetMessageResult getMessageResult,
this.brokerController.getBrokerStatsManager().incBrokerGetNums(requestHeader.getTopic(), getMessageResult.getMessageCount());

if (!BrokerMetricsManager.isRetryOrDlqTopic(requestHeader.getTopic())) {
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
Attributes attributes = this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
.put(LABEL_TOPIC, requestHeader.getTopic())
.put(LABEL_CONSUMER_GROUP, requestHeader.getConsumerGroup())
.put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(requestHeader.getTopic()) || MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup()))
.build();
BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), attributes);
BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(), attributes);
this.brokerController.getBrokerMetricsManager().getMessagesOutTotal().add(getMessageResult.getMessageCount(), attributes);
this.brokerController.getBrokerMetricsManager().getThroughputOutTotal().add(getMessageResult.getBufferTotalSize(), attributes);
}

if (!channelIsWritable(channel, requestHeader)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.netty.channel.ChannelHandlerContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;

import org.apache.rocketmq.broker.transaction.OperationResult;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.common.TopicFilterType;
Expand Down Expand Up @@ -149,12 +149,12 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
// successful committed, then total num of half-messages minus 1
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(msgInner.getTopic(), -1);
BrokerMetricsManager.commitMessagesTotal.add(1, BrokerMetricsManager.newAttributesBuilder()
this.brokerController.getBrokerMetricsManager().getCommitMessagesTotal().add(1, this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
.put(LABEL_TOPIC, msgInner.getTopic())
.build());
// record the commit latency.
Long commitLatency = (System.currentTimeMillis() - result.getPrepareMessage().getBornTimestamp()) / 1000;
BrokerMetricsManager.transactionFinishLatency.record(commitLatency, BrokerMetricsManager.newAttributesBuilder()
this.brokerController.getBrokerMetricsManager().getTransactionFinishLatency().record(commitLatency, this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
.put(LABEL_TOPIC, msgInner.getTopic())
.build());
}
Expand All @@ -176,7 +176,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
// roll back, then total num of half-messages minus 1
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(result.getPrepareMessage().getProperty(MessageConst.PROPERTY_REAL_TOPIC), -1);
BrokerMetricsManager.rollBackMessagesTotal.add(1, BrokerMetricsManager.newAttributesBuilder()
this.brokerController.getBrokerMetricsManager().getRollBackMessagesTotal().add(1, this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
.put(LABEL_TOPIC, result.getPrepareMessage().getProperty(MessageConst.PROPERTY_REAL_TOPIC))
.build());
}
Expand Down
Loading
Loading