Skip to content

Commit 7d75021

Browse files
authored
[ISSUE #9719] Fix Static Variable Conflicts in Multi-Broker Metrics Management (#9720)
* refactor: convert RemotingMetricsManager from static to instance-based - Remove all static global instance related code - Convert all static methods to instance methods - Remove static wrapper methods for backward compatibility - Add constructor and getter/setter methods for instance variables - This ensures each broker instance manages its own metrics in multi-broker scenarios * refactor: convert PopMetricsManager from static to instance-based - Remove all static global instance related code - Convert all static methods to instance methods - Remove static wrapper methods for backward compatibility - Add constructor and getter/setter methods for instance variables - This ensures each broker instance manages its own Pop metrics in multi-broker scenarios * refactor: implement instance-based metrics management for multi-broker scenarios - Add RemotingMetricsManager instance variable to NettyRemotingAbstract - Add setter/getter methods for RemotingMetricsManager in NettyRemotingAbstract - Update writeResponse method to use instance-based metrics recording - Add writeResponse method to RemotingServer interface - Update BrokerController to set RemotingMetricsManager on NettyRemotingServer instances - Update InnerBrokerController to set RemotingMetricsManager on SubRemotingServer instances - Update all processor classes to use instance-based writeResponse calls - Update RocksDBStoreMetricsManager integration in DefaultStoreMetricsManager - Remove unused imports and clean up code This ensures each broker instance manages its own metrics independently, solving the static variable conflict issue in multi-broker processes. * refactor: fix test failures caused by metrics manager refactoring - Fix ChangeInvisibleTimeProcessorTest by using @SPY instead of @mock for BrokerController - Inject BrokerMetricsManager into BrokerController using reflection - Remove unnecessary mock stubbings - Update all processors to use new static writeResponse method - Fix BrokerMetricsManager initialization issues in AckMessageProcessorTest and PopReviveServiceTest These changes ensure all related tests pass after refactoring metrics managers from static variables to instance variables. * fix: resolve checkstyle errors and ensure compilation success - Remove duplicate import in PopMessageProcessor.java - Clean up unused imports in ChangeInvisibleTimeProcessorTest.java - Clean up unused imports in AckMessageProcessorTest.java - Ensure all modules compile successfully without checkstyle violations These changes complete the metrics manager refactoring from static variables to instance variables, resolving statistical information conflicts in multi-broker processes. * Fix PopReviveServiceTest by adding proper PopMetricsManager mock - Add PopMetricsManager mock to PopReviveServiceTest - Fix NullPointerException in reviveRetry method - Ensure rePutCK method is called correctly when retry fails - Test now passes successfully with proper metrics manager setup * Remove debug code from PopReviveService - Remove debug logging statements from reviveMsgFromCk method - Clean up test output for better readability * fix * fix
1 parent aa9ee1d commit 7d75021

23 files changed

Lines changed: 455 additions & 184 deletions

broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ public class BrokerController {
294294
protected final List<ScheduledFuture<?>> scheduledFutures = new ArrayList<>();
295295
protected ReplicasManager replicasManager;
296296
private long lastSyncTimeMs = System.currentTimeMillis();
297-
private BrokerMetricsManager brokerMetricsManager;
297+
protected BrokerMetricsManager brokerMetricsManager;
298298
private ColdDataPullRequestHoldService coldDataPullRequestHoldService;
299299
private ColdDataCgCtrService coldDataCgCtrService;
300300
private TransactionMetricsFlushService transactionMetricsFlushService;
@@ -505,7 +505,7 @@ public void setBrokerMetricsManager(BrokerMetricsManager brokerMetricsManager) {
505505
}
506506

507507
protected void initializeRemotingServer() throws CloneNotSupportedException {
508-
RemotingServer tcpRemotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
508+
NettyRemotingServer tcpRemotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
509509
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
510510

511511
int listeningPort = nettyServerConfig.getListenPort() - 2;
@@ -514,7 +514,13 @@ protected void initializeRemotingServer() throws CloneNotSupportedException {
514514
}
515515
fastConfig.setListenPort(listeningPort);
516516

517-
RemotingServer fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
517+
NettyRemotingServer fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
518+
519+
// Set RemotingMetricsManager on both remoting servers
520+
if (this.brokerMetricsManager != null) {
521+
tcpRemotingServer.setRemotingMetricsManager(this.brokerMetricsManager.getRemotingMetricsManager());
522+
fastRemotingServer.setRemotingMetricsManager(this.brokerMetricsManager.getRemotingMetricsManager());
523+
}
518524

519525
remotingServerMap.put(TCP_REMOTING_SERVER, tcpRemotingServer);
520526
remotingServerMap.put(FAST_REMOTING_SERVER, fastRemotingServer);

broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@
3535
import org.apache.rocketmq.logging.org.slf4j.Logger;
3636
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
3737
import org.apache.rocketmq.remoting.CommandCallback;
38-
import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
3938
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
4039
import org.apache.rocketmq.remoting.netty.RequestTask;
4140
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
41+
import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
4242
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
4343
import org.apache.rocketmq.store.ConsumeQueueExt;
4444
import org.apache.rocketmq.store.MessageFilter;
@@ -266,7 +266,7 @@ public boolean wakeUp(final PopRequest request, CommandCallback callback) {
266266
POP_LOGGER.error(request.toString());
267267
POP_LOGGER.error(response.toString());
268268
}
269-
});
269+
}, brokerController.getBrokerMetricsManager().getRemotingMetricsManager());
270270
}
271271
} catch (Exception e1) {
272272
POP_LOGGER.error("ExecuteRequestWhenWakeup run", e1);

broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,9 @@ public class BrokerMetricsManager {
161161
private LongCounter rollBackMessagesTotal = new NopLongCounter();
162162
private LongHistogram transactionFinishLatency = new NopLongHistogram();
163163

164+
private final RemotingMetricsManager remotingMetricsManager;
165+
private final PopMetricsManager popMetricsManager;
166+
164167
@SuppressWarnings("DoubleBraceInitialization")
165168
public static final List<String> SYSTEM_GROUP_PREFIX_LIST = new ArrayList<String>() {
166169
{
@@ -173,6 +176,8 @@ public BrokerMetricsManager(BrokerController brokerController) {
173176
brokerConfig = brokerController.getBrokerConfig();
174177
this.messageStore = brokerController.getMessageStore();
175178
this.consumerLagCalculator = new ConsumerLagCalculator(brokerController);
179+
this.remotingMetricsManager = new RemotingMetricsManager();
180+
this.popMetricsManager = new PopMetricsManager();
176181
init();
177182
}
178183

@@ -487,7 +492,7 @@ private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) {
487492
SdkMeterProviderUtil.setCardinalityLimit(createSubGroupTimeViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
488493
providerBuilder.registerView(createSubGroupTimeSelector, createSubGroupTimeViewBuilder.build());
489494

490-
for (Pair<InstrumentSelector, ViewBuilder> selectorViewPair : RemotingMetricsManager.getMetricsView()) {
495+
for (Pair<InstrumentSelector, ViewBuilder> selectorViewPair : this.remotingMetricsManager.getMetricsView()) {
491496
ViewBuilder viewBuilder = selectorViewPair.getObject2();
492497
SdkMeterProviderUtil.setCardinalityLimit(viewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
493498
providerBuilder.registerView(selectorViewPair.getObject1(), viewBuilder.build());
@@ -499,7 +504,7 @@ private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) {
499504
providerBuilder.registerView(selectorViewPair.getObject1(), viewBuilder.build());
500505
}
501506

502-
for (Pair<InstrumentSelector, ViewBuilder> selectorViewPair : PopMetricsManager.getMetricsView()) {
507+
for (Pair<InstrumentSelector, ViewBuilder> selectorViewPair : this.popMetricsManager.getMetricsView()) {
503508
ViewBuilder viewBuilder = selectorViewPair.getObject2();
504509
SdkMeterProviderUtil.setCardinalityLimit(viewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
505510
providerBuilder.registerView(selectorViewPair.getObject1(), viewBuilder.build());
@@ -749,13 +754,13 @@ private void initTransactionMetrics() {
749754

750755
private void initOtherMetrics() {
751756
if (brokerConfig.isEnableRemotingMetrics()) {
752-
RemotingMetricsManager.initMetrics(brokerMeter, this::newAttributesBuilder);
757+
this.remotingMetricsManager.initMetrics(brokerMeter, this::newAttributesBuilder);
753758
}
754759
if (brokerConfig.isEnableMessageStoreMetrics()) {
755760
messageStore.initMetrics(brokerMeter, this::newAttributesBuilder);
756761
}
757762
if (brokerConfig.isEnablePopMetrics()) {
758-
PopMetricsManager.initMetrics(brokerMeter, brokerController, this::newAttributesBuilder);
763+
this.popMetricsManager.initMetrics(brokerMeter, brokerController, this::newAttributesBuilder);
759764
}
760765
}
761766

@@ -802,4 +807,12 @@ public void shutdown() {
802807
}
803808
}
804809

810+
public RemotingMetricsManager getRemotingMetricsManager() {
811+
return remotingMetricsManager;
812+
}
813+
814+
public PopMetricsManager getPopMetricsManager() {
815+
return popMetricsManager;
816+
}
817+
805818
}

broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java

Lines changed: 68 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,18 @@
6161

6262
public class PopMetricsManager {
6363
private static final Logger log = LoggerFactory.getLogger(PopMetricsManager.class);
64-
public static Supplier<AttributesBuilder> attributesBuilderSupplier;
64+
65+
private Supplier<AttributesBuilder> attributesBuilderSupplier;
6566

66-
private static LongHistogram popBufferScanTimeConsume = new NopLongHistogram();
67-
private static LongCounter popRevivePutTotal = new NopLongCounter();
68-
private static LongCounter popReviveGetTotal = new NopLongCounter();
69-
private static LongCounter popReviveRetryMessageTotal = new NopLongCounter();
67+
private LongHistogram popBufferScanTimeConsume = new NopLongHistogram();
68+
private LongCounter popRevivePutTotal = new NopLongCounter();
69+
private LongCounter popReviveGetTotal = new NopLongCounter();
70+
private LongCounter popReviveRetryMessageTotal = new NopLongCounter();
7071

71-
public static List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
72+
public PopMetricsManager() {
73+
}
74+
75+
public List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
7276
List<Double> rpcCostTimeBuckets = Arrays.asList(
7377
(double) Duration.ofMillis(1).toMillis(),
7478
(double) Duration.ofMillis(10).toMillis(),
@@ -87,22 +91,22 @@ public static List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
8791
return Lists.newArrayList(new Pair<>(popBufferScanTimeConsumeSelector, popBufferScanTimeConsumeViewBuilder));
8892
}
8993

90-
public static void initMetrics(Meter meter, BrokerController brokerController,
94+
public void initMetrics(Meter meter, BrokerController brokerController,
9195
Supplier<AttributesBuilder> attributesBuilderSupplier) {
92-
PopMetricsManager.attributesBuilderSupplier = attributesBuilderSupplier;
96+
this.attributesBuilderSupplier = attributesBuilderSupplier;
9397

94-
popBufferScanTimeConsume = meter.histogramBuilder(HISTOGRAM_POP_BUFFER_SCAN_TIME_CONSUME)
98+
this.popBufferScanTimeConsume = meter.histogramBuilder(HISTOGRAM_POP_BUFFER_SCAN_TIME_CONSUME)
9599
.setDescription("Time consuming of pop buffer scan")
96100
.setUnit("milliseconds")
97101
.ofLongs()
98102
.build();
99-
popRevivePutTotal = meter.counterBuilder(COUNTER_POP_REVIVE_IN_MESSAGE_TOTAL)
103+
this.popRevivePutTotal = meter.counterBuilder(COUNTER_POP_REVIVE_IN_MESSAGE_TOTAL)
100104
.setDescription("Total number of put message to revive topic")
101105
.build();
102-
popReviveGetTotal = meter.counterBuilder(COUNTER_POP_REVIVE_OUT_MESSAGE_TOTAL)
106+
this.popReviveGetTotal = meter.counterBuilder(COUNTER_POP_REVIVE_OUT_MESSAGE_TOTAL)
103107
.setDescription("Total number of get message from revive topic")
104108
.build();
105-
popReviveRetryMessageTotal = meter.counterBuilder(COUNTER_POP_REVIVE_RETRY_MESSAGES_TOTAL)
109+
this.popReviveRetryMessageTotal = meter.counterBuilder(COUNTER_POP_REVIVE_RETRY_MESSAGES_TOTAL)
106110
.setDescription("Total number of put message to pop retry topic")
107111
.build();
108112

@@ -126,24 +130,24 @@ public static void initMetrics(Meter meter, BrokerController brokerController,
126130
.buildWithCallback(measurement -> calculatePopReviveLatency(brokerController, measurement));
127131
}
128132

129-
private static void calculatePopBufferOffsetSize(BrokerController brokerController,
133+
private void calculatePopBufferOffsetSize(BrokerController brokerController,
130134
ObservableLongMeasurement measurement) {
131135
PopBufferMergeService popBufferMergeService = brokerController.getPopMessageProcessor().getPopBufferMergeService();
132-
measurement.record(popBufferMergeService.getOffsetTotalSize(), newAttributesBuilder().build());
136+
measurement.record(popBufferMergeService.getOffsetTotalSize(), this.newAttributesBuilder().build());
133137
}
134138

135-
private static void calculatePopBufferCkSize(BrokerController brokerController,
139+
private void calculatePopBufferCkSize(BrokerController brokerController,
136140
ObservableLongMeasurement measurement) {
137141
PopBufferMergeService popBufferMergeService = brokerController.getPopMessageProcessor().getPopBufferMergeService();
138-
measurement.record(popBufferMergeService.getBufferedCKSize(), newAttributesBuilder().build());
142+
measurement.record(popBufferMergeService.getBufferedCKSize(), this.newAttributesBuilder().build());
139143
}
140144

141-
private static void calculatePopReviveLatency(BrokerController brokerController,
145+
private void calculatePopReviveLatency(BrokerController brokerController,
142146
ObservableLongMeasurement measurement) {
143147
PopReviveService[] popReviveServices = brokerController.getAckMessageProcessor().getPopReviveServices();
144148
for (PopReviveService popReviveService : popReviveServices) {
145149
try {
146-
measurement.record(popReviveService.getReviveBehindMillis(), newAttributesBuilder()
150+
measurement.record(popReviveService.getReviveBehindMillis(), this.newAttributesBuilder()
147151
.put(LABEL_QUEUE_ID, popReviveService.getQueueId())
148152
.build());
149153
} catch (ConsumeQueueException e) {
@@ -152,12 +156,12 @@ private static void calculatePopReviveLatency(BrokerController brokerController,
152156
}
153157
}
154158

155-
private static void calculatePopReviveLag(BrokerController brokerController,
159+
private void calculatePopReviveLag(BrokerController brokerController,
156160
ObservableLongMeasurement measurement) {
157161
PopReviveService[] popReviveServices = brokerController.getAckMessageProcessor().getPopReviveServices();
158162
for (PopReviveService popReviveService : popReviveServices) {
159163
try {
160-
measurement.record(popReviveService.getReviveBehindMessages(), newAttributesBuilder()
164+
measurement.record(popReviveService.getReviveBehindMessages(), this.newAttributesBuilder()
161165
.put(LABEL_QUEUE_ID, popReviveService.getQueueId())
162166
.build());
163167
} catch (ConsumeQueueException e) {
@@ -166,60 +170,87 @@ private static void calculatePopReviveLag(BrokerController brokerController,
166170
}
167171
}
168172

169-
public static void incPopReviveAckPutCount(AckMsg ackMsg, PutMessageStatus status) {
173+
public void incPopReviveAckPutCount(AckMsg ackMsg, PutMessageStatus status) {
170174
incPopRevivePutCount(ackMsg.getConsumerGroup(), ackMsg.getTopic(), PopReviveMessageType.ACK, status, 1);
171175
}
172176

173-
public static void incPopReviveCkPutCount(PopCheckPoint checkPoint, PutMessageStatus status) {
177+
public void incPopReviveCkPutCount(PopCheckPoint checkPoint, PutMessageStatus status) {
174178
incPopRevivePutCount(checkPoint.getCId(), checkPoint.getTopic(), PopReviveMessageType.CK, status, 1);
175179
}
176180

177-
public static void incPopRevivePutCount(String group, String topic, PopReviveMessageType messageType,
181+
public void incPopRevivePutCount(String group, String topic, PopReviveMessageType messageType,
178182
PutMessageStatus status, int num) {
179-
Attributes attributes = newAttributesBuilder()
183+
Attributes attributes = this.newAttributesBuilder()
180184
.put(LABEL_CONSUMER_GROUP, group)
181185
.put(LABEL_TOPIC, topic)
182186
.put(LABEL_REVIVE_MESSAGE_TYPE, messageType.name())
183187
.put(LABEL_PUT_STATUS, status.name())
184188
.build();
185-
popRevivePutTotal.add(num, attributes);
189+
this.popRevivePutTotal.add(num, attributes);
186190
}
187191

188-
public static void incPopReviveAckGetCount(AckMsg ackMsg, int queueId) {
192+
public void incPopReviveAckGetCount(AckMsg ackMsg, int queueId) {
189193
incPopReviveGetCount(ackMsg.getConsumerGroup(), ackMsg.getTopic(), PopReviveMessageType.ACK, queueId, 1);
190194
}
191195

192-
public static void incPopReviveCkGetCount(PopCheckPoint checkPoint, int queueId) {
196+
public void incPopReviveCkGetCount(PopCheckPoint checkPoint, int queueId) {
193197
incPopReviveGetCount(checkPoint.getCId(), checkPoint.getTopic(), PopReviveMessageType.CK, queueId, 1);
194198
}
195199

196-
public static void incPopReviveGetCount(String group, String topic, PopReviveMessageType messageType, int queueId,
200+
public void incPopReviveGetCount(String group, String topic, PopReviveMessageType messageType, int queueId,
197201
int num) {
198-
AttributesBuilder builder = newAttributesBuilder();
202+
AttributesBuilder builder = this.newAttributesBuilder();
199203
Attributes attributes = builder
200204
.put(LABEL_CONSUMER_GROUP, group)
201205
.put(LABEL_TOPIC, topic)
202206
.put(LABEL_QUEUE_ID, queueId)
203207
.put(LABEL_REVIVE_MESSAGE_TYPE, messageType.name())
204208
.build();
205-
popReviveGetTotal.add(num, attributes);
209+
this.popReviveGetTotal.add(num, attributes);
206210
}
207211

208-
public static void incPopReviveRetryMessageCount(PopCheckPoint checkPoint, PutMessageStatus status) {
209-
AttributesBuilder builder = newAttributesBuilder();
212+
public void incPopReviveRetryMessageCount(PopCheckPoint checkPoint, PutMessageStatus status) {
213+
AttributesBuilder builder = this.newAttributesBuilder();
210214
Attributes attributes = builder
211215
.put(LABEL_CONSUMER_GROUP, checkPoint.getCId())
212216
.put(LABEL_TOPIC, checkPoint.getTopic())
213217
.put(LABEL_PUT_STATUS, status.name())
214218
.build();
215-
popReviveRetryMessageTotal.add(1, attributes);
219+
this.popReviveRetryMessageTotal.add(1, attributes);
220+
}
221+
222+
public void recordPopBufferScanTimeConsume(long time) {
223+
this.popBufferScanTimeConsume.record(time, this.newAttributesBuilder().build());
224+
}
225+
226+
public AttributesBuilder newAttributesBuilder() {
227+
return this.attributesBuilderSupplier != null ? this.attributesBuilderSupplier.get() : Attributes.builder();
228+
}
229+
230+
// Getter methods for external access
231+
public LongHistogram getPopBufferScanTimeConsume() {
232+
return popBufferScanTimeConsume;
216233
}
217234

218-
public static void recordPopBufferScanTimeConsume(long time) {
219-
popBufferScanTimeConsume.record(time, newAttributesBuilder().build());
235+
public LongCounter getPopRevivePutTotal() {
236+
return popRevivePutTotal;
220237
}
221238

222-
public static AttributesBuilder newAttributesBuilder() {
223-
return attributesBuilderSupplier != null ? attributesBuilderSupplier.get() : Attributes.builder();
239+
public LongCounter getPopReviveGetTotal() {
240+
return popReviveGetTotal;
224241
}
242+
243+
public LongCounter getPopReviveRetryMessageTotal() {
244+
return popReviveRetryMessageTotal;
245+
}
246+
247+
public Supplier<AttributesBuilder> getAttributesBuilderSupplier() {
248+
return attributesBuilderSupplier;
249+
}
250+
251+
// Setter methods for testing
252+
public void setAttributesBuilderSupplier(Supplier<AttributesBuilder> attributesBuilderSupplier) {
253+
this.attributesBuilderSupplier = attributesBuilderSupplier;
254+
}
255+
225256
}

broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,13 @@
5454
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
5555
import org.apache.rocketmq.remoting.common.RemotingHelper;
5656
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
57-
import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
5857
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
5958
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
6059
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
6160
import org.apache.rocketmq.remoting.protocol.ResponseCode;
6261
import org.apache.rocketmq.remoting.protocol.header.ConsumerSendMsgBackRequestHeader;
6362
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
63+
import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
6464
import org.apache.rocketmq.remoting.protocol.header.SendMessageResponseHeader;
6565
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
6666
import org.apache.rocketmq.store.PutMessageResult;
@@ -543,7 +543,7 @@ public void registerSendMessageHook(List<SendMessageHook> sendMessageHookList) {
543543

544544
protected void doResponse(ChannelHandlerContext ctx, RemotingCommand request,
545545
final RemotingCommand response) {
546-
NettyRemotingAbstract.writeResponse(ctx.channel(), request, response);
546+
NettyRemotingAbstract.writeResponse(ctx.channel(), request, response, null, brokerController.getBrokerMetricsManager().getRemotingMetricsManager());
547547
}
548548

549549
public void executeSendMessageHookBefore(SendMessageContext context) {

broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.BitSet;
2323
import java.nio.charset.StandardCharsets;
2424
import org.apache.rocketmq.broker.BrokerController;
25-
import org.apache.rocketmq.broker.metrics.PopMetricsManager;
2625
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
2726
import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
2827
import org.apache.rocketmq.broker.pop.PopConsumerLockService;
@@ -390,7 +389,7 @@ private void handlePutMessageResult(PutMessageResult putMessageResult, AckMsg ac
390389
&& putMessageResult.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) {
391390
POP_LOGGER.error("put ack msg error:" + putMessageResult);
392391
}
393-
PopMetricsManager.incPopReviveAckPutCount(ackMsg, putMessageResult.getPutMessageStatus());
392+
brokerController.getBrokerMetricsManager().getPopMetricsManager().incPopReviveAckPutCount(ackMsg, putMessageResult.getPutMessageStatus());
394393
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount);
395394
}
396395

0 commit comments

Comments
 (0)