Skip to content

Commit d197028

Browse files
authored
refactor: abstract StoreMetricsManager interface to support non-DefaultMessageStore (#9727)
- Create StoreMetricsManager interface to define unified metrics management methods - Add getStoreMetricsManager() method to MessageStore interface - Add flushBehindBytes() method to MessageStore interface to fix missing method calls - Refactor DefaultStoreMetricsManager to implement StoreMetricsManager interface - Update all MessageStore implementations to support the new interface - Modify TimerMessageStore to use getStoreMetricsManager() instead of direct type checking This refactoring resolves the issue where incTimerDequeueCount and incTimerEnqueueCount methods cannot be called when messageStore is not a DefaultMessageStore, improving code extensibility and decoupling.
1 parent a4da5f5 commit d197028

6 files changed

Lines changed: 99 additions & 15 deletions

File tree

store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@
113113
import org.apache.rocketmq.store.stats.BrokerStatsManager;
114114
import org.apache.rocketmq.store.timer.TimerMessageStore;
115115
import org.apache.rocketmq.store.util.PerfCounter;
116+
import org.apache.rocketmq.store.metrics.StoreMetricsManager;
116117
import org.rocksdb.RocksDBException;
117118

118119
public class DefaultMessageStore implements MessageStore {
@@ -1573,6 +1574,7 @@ public long dispatchBehindMilliseconds() {
15731574
return this.reputMessageService.behindMs();
15741575
}
15751576

1577+
@Override
15761578
public long flushBehindBytes() {
15771579
if (this.messageStoreConfig.isTransientStorePoolEnable()) {
15781580
return this.commitLog.remainHowManyDataToCommit() + this.commitLog.remainHowManyDataToFlush();
@@ -3065,4 +3067,9 @@ public DefaultStoreMetricsManager getDefaultStoreMetricsManager() {
30653067
return defaultStoreMetricsManager;
30663068
}
30673069

3070+
@Override
3071+
public StoreMetricsManager getStoreMetricsManager() {
3072+
return defaultStoreMetricsManager;
3073+
}
3074+
30683075
}

store/src/main/java/org/apache/rocketmq/store/MessageStore.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.rocketmq.store.stats.BrokerStatsManager;
4747
import org.apache.rocketmq.store.timer.TimerMessageStore;
4848
import org.apache.rocketmq.store.util.PerfCounter;
49+
import org.apache.rocketmq.store.metrics.StoreMetricsManager;
4950
import org.rocksdb.RocksDBException;
5051

5152
/**
@@ -511,6 +512,13 @@ CompletableFuture<QueryMessageResult> queryMessageAsync(final String topic, fina
511512
*/
512513
long dispatchBehindBytes();
513514

515+
/**
516+
* Get number of the bytes that have been stored in commit log and not yet flushed to disk.
517+
*
518+
* @return number of the bytes to flush.
519+
*/
520+
long flushBehindBytes();
521+
514522
/**
515523
* Get number of the milliseconds that have been stored in commit log and not yet dispatched to consume queue.
516524
*
@@ -907,6 +915,13 @@ void onCommitLogDispatch(DispatchRequest dispatchRequest, boolean doDispatch, Ma
907915
*/
908916
long getStateMachineVersion();
909917

918+
/**
919+
* Get store metrics manager
920+
*
921+
* @return store metrics manager
922+
*/
923+
StoreMetricsManager getStoreMetricsManager();
924+
910925
/**
911926
* Check message and return size
912927
*

store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import org.apache.rocketmq.common.metrics.NopLongCounter;
3737
import org.apache.rocketmq.common.metrics.NopLongHistogram;
3838
import org.apache.rocketmq.common.metrics.NopObservableLongGauge;
39-
import org.apache.rocketmq.store.DefaultMessageStore;
39+
import org.apache.rocketmq.store.MessageStore;
4040
import org.apache.rocketmq.store.config.MessageStoreConfig;
4141
import org.apache.rocketmq.store.timer.Slot;
4242
import org.apache.rocketmq.store.timer.TimerMessageStore;
@@ -63,7 +63,7 @@
6363
import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_TIMING_BOUND;
6464
import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_TOPIC;
6565

66-
public class DefaultStoreMetricsManager {
66+
public class DefaultStoreMetricsManager implements StoreMetricsManager {
6767
private Supplier<AttributesBuilder> attributesBuilderSupplier;
6868
private MessageStoreConfig messageStoreConfig;
6969

@@ -109,7 +109,7 @@ public List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
109109
}
110110

111111
public void init(Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier,
112-
DefaultMessageStore messageStore) {
112+
MessageStore messageStore) {
113113

114114
// Also add some metrics for rocksdb's monitoring.
115115
this.rocksDBStoreMetricsManager.init(meter, attributesBuilderSupplier, messageStore.getQueueStore());
@@ -324,10 +324,6 @@ public void setAttributesBuilderSupplier(Supplier<AttributesBuilder> attributesB
324324
this.attributesBuilderSupplier = attributesBuilderSupplier;
325325
}
326326

327-
public void setMessageStoreConfig(MessageStoreConfig messageStoreConfig) {
328-
this.messageStoreConfig = messageStoreConfig;
329-
}
330-
331327
public RocksDBStoreMetricsManager getRocksDBStoreMetricsManager() {
332328
return rocksDBStoreMetricsManager;
333329
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.store.metrics;
18+
19+
import io.opentelemetry.api.common.AttributesBuilder;
20+
import java.util.List;
21+
import java.util.function.Supplier;
22+
import org.apache.rocketmq.common.Pair;
23+
import org.apache.rocketmq.store.MessageStore;
24+
import io.opentelemetry.api.metrics.Meter;
25+
import io.opentelemetry.sdk.metrics.InstrumentSelector;
26+
import io.opentelemetry.sdk.metrics.ViewBuilder;
27+
28+
/**
29+
* Store metrics manager interface for different message store implementations.
30+
* This interface provides a unified way to access metrics functionality
31+
* regardless of the underlying message store type.
32+
*/
33+
public interface StoreMetricsManager {
34+
35+
/**
36+
* Initialize metrics with the given meter and attributes builder supplier.
37+
*
38+
* @param meter OpenTelemetry meter
39+
* @param attributesBuilderSupplier Metrics attributes builder supplier
40+
* @param messageStore The message store instance
41+
*/
42+
void init(Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier, MessageStore messageStore);
43+
44+
/**
45+
* Get metrics view configuration.
46+
*
47+
* @return List of instrument selector and view builder pairs
48+
*/
49+
List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView();
50+
51+
}

store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.apache.rocketmq.store.stats.BrokerStatsManager;
6262
import org.apache.rocketmq.store.timer.TimerMessageStore;
6363
import org.apache.rocketmq.store.util.PerfCounter;
64+
import org.apache.rocketmq.store.metrics.StoreMetricsManager;
6465
import org.rocksdb.RocksDBException;
6566

6667
public abstract class AbstractPluginMessageStore implements MessageStore {
@@ -293,6 +294,11 @@ public long dispatchBehindBytes() {
293294
return next.dispatchBehindBytes();
294295
}
295296

297+
@Override
298+
public long flushBehindBytes() {
299+
return next.flushBehindBytes();
300+
}
301+
296302
@Override
297303
public long dispatchBehindMilliseconds() {
298304
return next.dispatchBehindMilliseconds();
@@ -666,4 +672,9 @@ public MessageStore getNext() {
666672
public MessageStoreStateMachine getStateMachine() {
667673
return next.getStateMachine();
668674
}
675+
676+
@Override
677+
public StoreMetricsManager getStoreMetricsManager() {
678+
return next.getStoreMetricsManager();
679+
}
669680
}

store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.apache.rocketmq.store.logfile.MappedFile;
6868
import org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant;
6969
import org.apache.rocketmq.store.metrics.DefaultStoreMetricsManager;
70+
import org.apache.rocketmq.store.metrics.StoreMetricsManager;
7071
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
7172
import org.apache.rocketmq.store.queue.CqUnit;
7273
import org.apache.rocketmq.store.queue.ReferredIterator;
@@ -767,11 +768,12 @@ public boolean enqueue(int queueId) {
767768
}
768769
}
769770
// Record timer message set latency
770-
if (messageStore instanceof DefaultMessageStore) {
771-
DefaultStoreMetricsManager metricsManager = ((DefaultMessageStore) messageStore).getDefaultStoreMetricsManager();
772-
Attributes attributes = metricsManager.newAttributesBuilder()
771+
StoreMetricsManager metricsManager = messageStore.getStoreMetricsManager();
772+
if (metricsManager instanceof DefaultStoreMetricsManager) {
773+
DefaultStoreMetricsManager defaultMetricsManager = (DefaultStoreMetricsManager) metricsManager;
774+
Attributes attributes = defaultMetricsManager.newAttributesBuilder()
773775
.put(DefaultStoreMetricsConstant.LABEL_TOPIC, msgExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC)).build();
774-
metricsManager.getTimerMessageSetLatency().record((delayedTime - msgExt.getBornTimestamp()) / 1000, attributes);
776+
defaultMetricsManager.getTimerMessageSetLatency().record((delayedTime - msgExt.getBornTimestamp()) / 1000, attributes);
775777
}
776778
}
777779
} catch (Exception e) {
@@ -1440,8 +1442,9 @@ protected List<TimerRequest> fetchTimerRequests() throws InterruptedException {
14401442
protected void putMessageToTimerWheel(TimerRequest req) {
14411443
try {
14421444
perfCounterTicks.startTick(ENQUEUE_PUT);
1443-
if (messageStore instanceof DefaultMessageStore) {
1444-
((DefaultMessageStore) messageStore).getDefaultStoreMetricsManager().incTimerEnqueueCount(getRealTopic(req.getMsg()));
1445+
StoreMetricsManager metricsManager = messageStore.getStoreMetricsManager();
1446+
if (metricsManager instanceof DefaultStoreMetricsManager) {
1447+
((DefaultStoreMetricsManager) metricsManager).incTimerEnqueueCount(getRealTopic(req.getMsg()));
14451448
}
14461449
if (shouldRunningDequeue && req.getDelayTime() < currWriteTimeMs) {
14471450
req.setEnqueueTime(Long.MAX_VALUE);
@@ -1589,8 +1592,9 @@ public void run() {
15891592
perfCounterTicks.startTick(DEQUEUE_PUT);
15901593

15911594
MessageExt msgExt = tr.getMsg();
1592-
if (messageStore instanceof DefaultMessageStore) {
1593-
((DefaultMessageStore) messageStore).getDefaultStoreMetricsManager().incTimerDequeueCount(getRealTopic(msgExt));
1595+
StoreMetricsManager metricsManager = messageStore.getStoreMetricsManager();
1596+
if (metricsManager instanceof DefaultStoreMetricsManager) {
1597+
((DefaultStoreMetricsManager) metricsManager).incTimerDequeueCount(getRealTopic(msgExt));
15941598
}
15951599

15961600
if (tr.getEnqueueTime() == Long.MAX_VALUE) {

0 commit comments

Comments
 (0)