From c351d72c6f15963ec16bea65c24d3c7dfaabe3be Mon Sep 17 00:00:00 2001 From: lizhimins <707364882@qq.com> Date: Thu, 6 Nov 2025 19:18:30 +0800 Subject: [PATCH] [ISSUE #9816] Fix concurrent modify opentelemetry record in calculate consumer lag Change-Id: I73109061cab5ab98fea5c1f8d23fd8bd159e2406 Signed-off-by: terrance.lzm --- .../rocketmq/broker/metrics/BrokerMetricsManager.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java index a6707d0dd2c..960c1dd2505 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java @@ -674,7 +674,13 @@ private void initLagAndDlqMetrics() { .setDescription("Consumer lag messages") .ofLongs() .buildWithCallback(measurement -> - consumerLagCalculator.calculateLag(result -> measurement.record(result.lag, buildLagAttributes(result)))); + consumerLagCalculator.calculateLag(result -> { + // Note: 'record' method uses HashMap which may cause + // concurrent access issues when Pull thread executes Pop callbacks. + synchronized (this) { + measurement.record(result.lag, buildLagAttributes(result)); + } + })); consumerLagLatency = brokerMeter.gaugeBuilder(GAUGE_CONSUMER_LAG_LATENCY) .setDescription("Consumer lag time")