diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 41b2e3da3ed..de30d24a51f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -113,6 +113,7 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.store.timer.TimerMessageStore; import org.apache.rocketmq.store.util.PerfCounter; +import org.apache.rocketmq.store.metrics.StoreMetricsManager; import org.rocksdb.RocksDBException; public class DefaultMessageStore implements MessageStore { @@ -1573,6 +1574,7 @@ public long dispatchBehindMilliseconds() { return this.reputMessageService.behindMs(); } + @Override public long flushBehindBytes() { if (this.messageStoreConfig.isTransientStorePoolEnable()) { return this.commitLog.remainHowManyDataToCommit() + this.commitLog.remainHowManyDataToFlush(); @@ -3065,4 +3067,9 @@ public DefaultStoreMetricsManager getDefaultStoreMetricsManager() { return defaultStoreMetricsManager; } + @Override + public StoreMetricsManager getStoreMetricsManager() { + return defaultStoreMetricsManager; + } + } diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index 52c2de33fd3..0b927513e13 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -46,6 +46,7 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.store.timer.TimerMessageStore; import org.apache.rocketmq.store.util.PerfCounter; +import org.apache.rocketmq.store.metrics.StoreMetricsManager; import org.rocksdb.RocksDBException; /** @@ -511,6 +512,13 @@ CompletableFuture queryMessageAsync(final String topic, fina */ long dispatchBehindBytes(); + /** + * Get number of the bytes that have been stored in commit log and not yet flushed to disk. + * + * @return number of the bytes to flush. + */ + long flushBehindBytes(); + /** * Get number of the milliseconds that have been stored in commit log and not yet dispatched to consume queue. * @@ -907,6 +915,13 @@ void onCommitLogDispatch(DispatchRequest dispatchRequest, boolean doDispatch, Ma */ long getStateMachineVersion(); + /** + * Get store metrics manager + * + * @return store metrics manager + */ + StoreMetricsManager getStoreMetricsManager(); + /** * Check message and return size * diff --git a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java index e72609cc38b..6f2bc322542 100644 --- a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java +++ b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java @@ -36,7 +36,7 @@ import org.apache.rocketmq.common.metrics.NopLongCounter; import org.apache.rocketmq.common.metrics.NopLongHistogram; import org.apache.rocketmq.common.metrics.NopObservableLongGauge; -import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.timer.Slot; import org.apache.rocketmq.store.timer.TimerMessageStore; @@ -63,7 +63,7 @@ import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_TIMING_BOUND; import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_TOPIC; -public class DefaultStoreMetricsManager { +public class DefaultStoreMetricsManager implements StoreMetricsManager { private Supplier attributesBuilderSupplier; private MessageStoreConfig messageStoreConfig; @@ -109,7 +109,7 @@ public List> getMetricsView() { } public void init(Meter meter, Supplier attributesBuilderSupplier, - DefaultMessageStore messageStore) { + MessageStore messageStore) { // Also add some metrics for rocksdb's monitoring. this.rocksDBStoreMetricsManager.init(meter, attributesBuilderSupplier, messageStore.getQueueStore()); @@ -324,10 +324,6 @@ public void setAttributesBuilderSupplier(Supplier attributesB this.attributesBuilderSupplier = attributesBuilderSupplier; } - public void setMessageStoreConfig(MessageStoreConfig messageStoreConfig) { - this.messageStoreConfig = messageStoreConfig; - } - public RocksDBStoreMetricsManager getRocksDBStoreMetricsManager() { return rocksDBStoreMetricsManager; } diff --git a/store/src/main/java/org/apache/rocketmq/store/metrics/StoreMetricsManager.java b/store/src/main/java/org/apache/rocketmq/store/metrics/StoreMetricsManager.java new file mode 100644 index 00000000000..c2bc751eb0e --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/metrics/StoreMetricsManager.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store.metrics; + +import io.opentelemetry.api.common.AttributesBuilder; +import java.util.List; +import java.util.function.Supplier; +import org.apache.rocketmq.common.Pair; +import org.apache.rocketmq.store.MessageStore; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.sdk.metrics.InstrumentSelector; +import io.opentelemetry.sdk.metrics.ViewBuilder; + +/** + * Store metrics manager interface for different message store implementations. + * This interface provides a unified way to access metrics functionality + * regardless of the underlying message store type. + */ +public interface StoreMetricsManager { + + /** + * Initialize metrics with the given meter and attributes builder supplier. + * + * @param meter OpenTelemetry meter + * @param attributesBuilderSupplier Metrics attributes builder supplier + * @param messageStore The message store instance + */ + void init(Meter meter, Supplier attributesBuilderSupplier, MessageStore messageStore); + + /** + * Get metrics view configuration. + * + * @return List of instrument selector and view builder pairs + */ + List> getMetricsView(); + +} diff --git a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java index 19ace1c8e3a..673f045bf7f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java @@ -61,6 +61,7 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.store.timer.TimerMessageStore; import org.apache.rocketmq.store.util.PerfCounter; +import org.apache.rocketmq.store.metrics.StoreMetricsManager; import org.rocksdb.RocksDBException; public abstract class AbstractPluginMessageStore implements MessageStore { @@ -293,6 +294,11 @@ public long dispatchBehindBytes() { return next.dispatchBehindBytes(); } + @Override + public long flushBehindBytes() { + return next.flushBehindBytes(); + } + @Override public long dispatchBehindMilliseconds() { return next.dispatchBehindMilliseconds(); @@ -666,4 +672,9 @@ public MessageStore getNext() { public MessageStoreStateMachine getStateMachine() { return next.getStateMachine(); } + + @Override + public StoreMetricsManager getStoreMetricsManager() { + return next.getStoreMetricsManager(); + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java index 80184422e0a..8b995fbd709 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java @@ -67,6 +67,7 @@ import org.apache.rocketmq.store.logfile.MappedFile; import org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant; import org.apache.rocketmq.store.metrics.DefaultStoreMetricsManager; +import org.apache.rocketmq.store.metrics.StoreMetricsManager; import org.apache.rocketmq.store.queue.ConsumeQueueInterface; import org.apache.rocketmq.store.queue.CqUnit; import org.apache.rocketmq.store.queue.ReferredIterator; @@ -767,11 +768,12 @@ public boolean enqueue(int queueId) { } } // Record timer message set latency - if (messageStore instanceof DefaultMessageStore) { - DefaultStoreMetricsManager metricsManager = ((DefaultMessageStore) messageStore).getDefaultStoreMetricsManager(); - Attributes attributes = metricsManager.newAttributesBuilder() + StoreMetricsManager metricsManager = messageStore.getStoreMetricsManager(); + if (metricsManager instanceof DefaultStoreMetricsManager) { + DefaultStoreMetricsManager defaultMetricsManager = (DefaultStoreMetricsManager) metricsManager; + Attributes attributes = defaultMetricsManager.newAttributesBuilder() .put(DefaultStoreMetricsConstant.LABEL_TOPIC, msgExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC)).build(); - metricsManager.getTimerMessageSetLatency().record((delayedTime - msgExt.getBornTimestamp()) / 1000, attributes); + defaultMetricsManager.getTimerMessageSetLatency().record((delayedTime - msgExt.getBornTimestamp()) / 1000, attributes); } } } catch (Exception e) { @@ -1440,8 +1442,9 @@ protected List fetchTimerRequests() throws InterruptedException { protected void putMessageToTimerWheel(TimerRequest req) { try { perfCounterTicks.startTick(ENQUEUE_PUT); - if (messageStore instanceof DefaultMessageStore) { - ((DefaultMessageStore) messageStore).getDefaultStoreMetricsManager().incTimerEnqueueCount(getRealTopic(req.getMsg())); + StoreMetricsManager metricsManager = messageStore.getStoreMetricsManager(); + if (metricsManager instanceof DefaultStoreMetricsManager) { + ((DefaultStoreMetricsManager) metricsManager).incTimerEnqueueCount(getRealTopic(req.getMsg())); } if (shouldRunningDequeue && req.getDelayTime() < currWriteTimeMs) { req.setEnqueueTime(Long.MAX_VALUE); @@ -1589,8 +1592,9 @@ public void run() { perfCounterTicks.startTick(DEQUEUE_PUT); MessageExt msgExt = tr.getMsg(); - if (messageStore instanceof DefaultMessageStore) { - ((DefaultMessageStore) messageStore).getDefaultStoreMetricsManager().incTimerDequeueCount(getRealTopic(msgExt)); + StoreMetricsManager metricsManager = messageStore.getStoreMetricsManager(); + if (metricsManager instanceof DefaultStoreMetricsManager) { + ((DefaultStoreMetricsManager) metricsManager).incTimerDequeueCount(getRealTopic(msgExt)); } if (tr.getEnqueueTime() == Long.MAX_VALUE) {