Skip to content

Commit 02f413d

Browse files
guyinyouguyinyou
andauthored
[ISSUE #9701] Synchronize metrics shutdown to prevent JVM crashes during broker shutdown (#9702)
* fix: synchronize metrics shutdown to prevent JVM crash - Change async shutdown to sync blocking wait in BrokerMetricsManager - Ensure proper shutdown order to avoid race conditions - Prevent accessing dependencies after they are shutdown - Use join() with timeout to wait for CompletableFuture completion - Apply fix to all metrics exporter types (OTLP_GRPC, PROM, LOG) * fix codestyle --------- Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
1 parent 4ae0294 commit 02f413d

1 file changed

Lines changed: 86 additions & 59 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java

Lines changed: 86 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -434,20 +434,20 @@ private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) {
434434
);
435435

436436
List<Double> commitLatencyBuckets = Arrays.asList(
437-
1d * 1 * 1 * 5, //5s
438-
1d * 1 * 1 * 60, //1min
439-
1d * 1 * 10 * 60, //10min
440-
1d * 1 * 60 * 60, //1h
441-
1d * 12 * 60 * 60, //12h
442-
1d * 24 * 60 * 60 //24h
437+
1d * 1 * 1 * 5, //5s
438+
1d * 1 * 1 * 60, //1min
439+
1d * 1 * 10 * 60, //10min
440+
1d * 1 * 60 * 60, //1h
441+
1d * 12 * 60 * 60, //12h
442+
1d * 24 * 60 * 60 //24h
443443
);
444444

445445
List<Double> createTimeBuckets = Arrays.asList(
446-
(double) Duration.ofMillis(10).toMillis(), //10ms
447-
(double) Duration.ofMillis(100).toMillis(), //100ms
448-
(double) Duration.ofSeconds(1).toMillis(), //1s
449-
(double) Duration.ofSeconds(3).toMillis(), //3s
450-
(double) Duration.ofSeconds(5).toMillis() //5s
446+
(double) Duration.ofMillis(10).toMillis(), //10ms
447+
(double) Duration.ofMillis(100).toMillis(), //100ms
448+
(double) Duration.ofSeconds(1).toMillis(), //1s
449+
(double) Duration.ofSeconds(3).toMillis(), //3s
450+
(double) Duration.ofSeconds(5).toMillis() //5s
451451
);
452452
InstrumentSelector messageSizeSelector = InstrumentSelector.builder()
453453
.setType(InstrumentType.HISTOGRAM)
@@ -470,17 +470,17 @@ private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) {
470470
providerBuilder.registerView(commitLatencySelector, commitLatencyViewBuilder.build());
471471

472472
InstrumentSelector createTopicTimeSelector = InstrumentSelector.builder()
473-
.setType(InstrumentType.HISTOGRAM)
474-
.setName(HISTOGRAM_TOPIC_CREATE_EXECUTE_TIME)
475-
.build();
473+
.setType(InstrumentType.HISTOGRAM)
474+
.setName(HISTOGRAM_TOPIC_CREATE_EXECUTE_TIME)
475+
.build();
476476
InstrumentSelector createSubGroupTimeSelector = InstrumentSelector.builder()
477-
.setType(InstrumentType.HISTOGRAM)
478-
.setName(HISTOGRAM_CONSUMER_GROUP_CREATE_EXECUTE_TIME)
479-
.build();
477+
.setType(InstrumentType.HISTOGRAM)
478+
.setName(HISTOGRAM_CONSUMER_GROUP_CREATE_EXECUTE_TIME)
479+
.build();
480480
ViewBuilder createTopicTimeViewBuilder = View.builder()
481-
.setAggregation(Aggregation.explicitBucketHistogram(createTimeBuckets));
481+
.setAggregation(Aggregation.explicitBucketHistogram(createTimeBuckets));
482482
ViewBuilder createSubGroupTimeViewBuilder = View.builder()
483-
.setAggregation(Aggregation.explicitBucketHistogram(createTimeBuckets));
483+
.setAggregation(Aggregation.explicitBucketHistogram(createTimeBuckets));
484484
// To config the cardinalityLimit for openTelemetry metrics exporting.
485485
SdkMeterProviderUtil.setCardinalityLimit(createTopicTimeViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
486486
providerBuilder.registerView(createTopicTimeSelector, createTopicTimeViewBuilder.build());
@@ -588,16 +588,16 @@ private void initRequestMetrics() {
588588
.build();
589589

590590
topicCreateExecuteTime = brokerMeter.histogramBuilder(HISTOGRAM_TOPIC_CREATE_EXECUTE_TIME)
591-
.setDescription("The distribution of create topic time")
592-
.ofLongs()
593-
.setUnit("milliseconds")
594-
.build();
591+
.setDescription("The distribution of create topic time")
592+
.ofLongs()
593+
.setUnit("milliseconds")
594+
.build();
595595

596596
consumerGroupCreateExecuteTime = brokerMeter.histogramBuilder(HISTOGRAM_CONSUMER_GROUP_CREATE_EXECUTE_TIME)
597-
.setDescription("The distribution of create subscription time")
598-
.ofLongs()
599-
.setUnit("milliseconds")
600-
.build();
597+
.setDescription("The distribution of create subscription time")
598+
.ofLongs()
599+
.setUnit("milliseconds")
600+
.build();
601601
}
602602

603603
private void initConnectionMetrics() {
@@ -720,32 +720,33 @@ private void initTransactionMetrics() {
720720
}
721721

722722
commitMessagesTotal = brokerMeter.counterBuilder(COUNTER_COMMIT_MESSAGES_TOTAL)
723-
.setDescription("Total number of commit messages")
724-
.build();
723+
.setDescription("Total number of commit messages")
724+
.build();
725725

726726
rollBackMessagesTotal = brokerMeter.counterBuilder(COUNTER_ROLLBACK_MESSAGES_TOTAL)
727-
.setDescription("Total number of rollback messages")
728-
.build();
727+
.setDescription("Total number of rollback messages")
728+
.build();
729729

730730
transactionFinishLatency = brokerMeter.histogramBuilder(HISTOGRAM_FINISH_MSG_LATENCY)
731-
.setDescription("Transaction finish latency")
732-
.ofLongs()
733-
.setUnit("ms")
734-
.build();
731+
.setDescription("Transaction finish latency")
732+
.ofLongs()
733+
.setUnit("ms")
734+
.build();
735735

736736
halfMessages = brokerMeter.gaugeBuilder(GAUGE_HALF_MESSAGES)
737-
.setDescription("Half messages of all topics")
738-
.ofLongs()
739-
.buildWithCallback(measurement -> {
740-
brokerController.getTransactionalMessageService().getTransactionMetrics().getTransactionCounts()
741-
.forEach((topic, metric) -> {
742-
measurement.record(
743-
metric.getCount().get(),
744-
newAttributesBuilder().put(DefaultStoreMetricsConstant.LABEL_TOPIC, topic).build()
745-
);
746-
});
747-
});
737+
.setDescription("Half messages of all topics")
738+
.ofLongs()
739+
.buildWithCallback(measurement -> {
740+
brokerController.getTransactionalMessageService().getTransactionMetrics().getTransactionCounts()
741+
.forEach((topic, metric) -> {
742+
measurement.record(
743+
metric.getCount().get(),
744+
newAttributesBuilder().put(DefaultStoreMetricsConstant.LABEL_TOPIC, topic).build()
745+
);
746+
});
747+
});
748748
}
749+
749750
private void initOtherMetrics() {
750751
if (brokerConfig.isEnableRemotingMetrics()) {
751752
RemotingMetricsManager.initMetrics(brokerMeter, this::newAttributesBuilder);
@@ -759,19 +760,45 @@ private void initOtherMetrics() {
759760
}
760761

761762
public void shutdown() {
762-
if (brokerConfig.getMetricsExporterType() == MetricsExporterType.OTLP_GRPC) {
763-
periodicMetricReader.forceFlush();
764-
periodicMetricReader.shutdown();
765-
metricExporter.shutdown();
766-
}
767-
if (brokerConfig.getMetricsExporterType() == MetricsExporterType.PROM) {
768-
prometheusHttpServer.forceFlush();
769-
prometheusHttpServer.shutdown();
770-
}
771-
if (brokerConfig.getMetricsExporterType() == MetricsExporterType.LOG) {
772-
periodicMetricReader.forceFlush();
773-
periodicMetricReader.shutdown();
774-
loggingMetricExporter.shutdown();
763+
if (brokerConfig.isInBrokerContainer()) {
764+
// only rto need
765+
if (brokerConfig.getMetricsExporterType() == MetricsExporterType.OTLP_GRPC) {
766+
while (!periodicMetricReader.forceFlush().join(60, TimeUnit.SECONDS).isDone()) {
767+
}
768+
while (!periodicMetricReader.shutdown().join(60, TimeUnit.SECONDS).isSuccess()) {
769+
}
770+
while (!metricExporter.shutdown().join(60, TimeUnit.SECONDS).isSuccess()) {
771+
}
772+
}
773+
if (brokerConfig.getMetricsExporterType() == MetricsExporterType.PROM) {
774+
while (!prometheusHttpServer.forceFlush().join(60, TimeUnit.SECONDS).isDone()) {
775+
}
776+
while (!prometheusHttpServer.shutdown().join(60, TimeUnit.SECONDS).isSuccess()) {
777+
}
778+
}
779+
if (brokerConfig.getMetricsExporterType() == MetricsExporterType.LOG) {
780+
while (!periodicMetricReader.forceFlush().join(60, TimeUnit.SECONDS).isDone()) {
781+
}
782+
while (!periodicMetricReader.shutdown().join(60, TimeUnit.SECONDS).isSuccess()) {
783+
}
784+
while (!loggingMetricExporter.shutdown().join(60, TimeUnit.SECONDS).isSuccess()) {
785+
}
786+
}
787+
} else {
788+
if (brokerConfig.getMetricsExporterType() == MetricsExporterType.OTLP_GRPC) {
789+
periodicMetricReader.forceFlush();
790+
periodicMetricReader.shutdown();
791+
metricExporter.shutdown();
792+
}
793+
if (brokerConfig.getMetricsExporterType() == MetricsExporterType.PROM) {
794+
prometheusHttpServer.forceFlush();
795+
prometheusHttpServer.shutdown();
796+
}
797+
if (brokerConfig.getMetricsExporterType() == MetricsExporterType.LOG) {
798+
periodicMetricReader.forceFlush();
799+
periodicMetricReader.shutdown();
800+
loggingMetricExporter.shutdown();
801+
}
775802
}
776803
}
777804

0 commit comments

Comments
 (0)