Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.util.PerfCounter;
import org.apache.rocketmq.store.metrics.StoreMetricsManager;
import org.rocksdb.RocksDBException;

public class DefaultMessageStore implements MessageStore {
Expand Down Expand Up @@ -1573,6 +1574,7 @@ public long dispatchBehindMilliseconds() {
return this.reputMessageService.behindMs();
}

@Override
public long flushBehindBytes() {
if (this.messageStoreConfig.isTransientStorePoolEnable()) {
return this.commitLog.remainHowManyDataToCommit() + this.commitLog.remainHowManyDataToFlush();
Expand Down Expand Up @@ -3065,4 +3067,9 @@ public DefaultStoreMetricsManager getDefaultStoreMetricsManager() {
return defaultStoreMetricsManager;
}

@Override
public StoreMetricsManager getStoreMetricsManager() {
return defaultStoreMetricsManager;
}

}
15 changes: 15 additions & 0 deletions store/src/main/java/org/apache/rocketmq/store/MessageStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.util.PerfCounter;
import org.apache.rocketmq.store.metrics.StoreMetricsManager;
import org.rocksdb.RocksDBException;

/**
Expand Down Expand Up @@ -511,6 +512,13 @@ CompletableFuture<QueryMessageResult> queryMessageAsync(final String topic, fina
*/
long dispatchBehindBytes();

/**
* Get number of the bytes that have been stored in commit log and not yet flushed to disk.
*
* @return number of the bytes to flush.
*/
long flushBehindBytes();

/**
* Get number of the milliseconds that have been stored in commit log and not yet dispatched to consume queue.
*
Expand Down Expand Up @@ -907,6 +915,13 @@ void onCommitLogDispatch(DispatchRequest dispatchRequest, boolean doDispatch, Ma
*/
long getStateMachineVersion();

/**
* Get store metrics manager
*
* @return store metrics manager
*/
StoreMetricsManager getStoreMetricsManager();

/**
* Check message and return size
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.rocketmq.common.metrics.NopLongCounter;
import org.apache.rocketmq.common.metrics.NopLongHistogram;
import org.apache.rocketmq.common.metrics.NopObservableLongGauge;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.timer.Slot;
import org.apache.rocketmq.store.timer.TimerMessageStore;
Expand All @@ -63,7 +63,7 @@
import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_TIMING_BOUND;
import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_TOPIC;

public class DefaultStoreMetricsManager {
public class DefaultStoreMetricsManager implements StoreMetricsManager {
private Supplier<AttributesBuilder> attributesBuilderSupplier;
private MessageStoreConfig messageStoreConfig;

Expand Down Expand Up @@ -109,7 +109,7 @@ public List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
}

public void init(Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier,
DefaultMessageStore messageStore) {
MessageStore messageStore) {

// Also add some metrics for rocksdb's monitoring.
this.rocksDBStoreMetricsManager.init(meter, attributesBuilderSupplier, messageStore.getQueueStore());
Expand Down Expand Up @@ -324,10 +324,6 @@ public void setAttributesBuilderSupplier(Supplier<AttributesBuilder> attributesB
this.attributesBuilderSupplier = attributesBuilderSupplier;
}

public void setMessageStoreConfig(MessageStoreConfig messageStoreConfig) {
this.messageStoreConfig = messageStoreConfig;
}

public RocksDBStoreMetricsManager getRocksDBStoreMetricsManager() {
return rocksDBStoreMetricsManager;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store.metrics;

import io.opentelemetry.api.common.AttributesBuilder;
import java.util.List;
import java.util.function.Supplier;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.store.MessageStore;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
import io.opentelemetry.sdk.metrics.ViewBuilder;

/**
* Store metrics manager interface for different message store implementations.
* This interface provides a unified way to access metrics functionality
* regardless of the underlying message store type.
*/
public interface StoreMetricsManager {

/**
* Initialize metrics with the given meter and attributes builder supplier.
*
* @param meter OpenTelemetry meter
* @param attributesBuilderSupplier Metrics attributes builder supplier
* @param messageStore The message store instance
*/
void init(Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier, MessageStore messageStore);

/**
* Get metrics view configuration.
*
* @return List of instrument selector and view builder pairs
*/
List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView();

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.util.PerfCounter;
import org.apache.rocketmq.store.metrics.StoreMetricsManager;
import org.rocksdb.RocksDBException;

public abstract class AbstractPluginMessageStore implements MessageStore {
Expand Down Expand Up @@ -293,6 +294,11 @@ public long dispatchBehindBytes() {
return next.dispatchBehindBytes();
}

@Override
public long flushBehindBytes() {
return next.flushBehindBytes();
}

@Override
public long dispatchBehindMilliseconds() {
return next.dispatchBehindMilliseconds();
Expand Down Expand Up @@ -666,4 +672,9 @@ public MessageStore getNext() {
public MessageStoreStateMachine getStateMachine() {
return next.getStateMachine();
}

@Override
public StoreMetricsManager getStoreMetricsManager() {
return next.getStoreMetricsManager();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant;
import org.apache.rocketmq.store.metrics.DefaultStoreMetricsManager;
import org.apache.rocketmq.store.metrics.StoreMetricsManager;
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.store.queue.ReferredIterator;
Expand Down Expand Up @@ -767,11 +768,12 @@ public boolean enqueue(int queueId) {
}
}
// Record timer message set latency
if (messageStore instanceof DefaultMessageStore) {
DefaultStoreMetricsManager metricsManager = ((DefaultMessageStore) messageStore).getDefaultStoreMetricsManager();
Attributes attributes = metricsManager.newAttributesBuilder()
StoreMetricsManager metricsManager = messageStore.getStoreMetricsManager();
if (metricsManager instanceof DefaultStoreMetricsManager) {
DefaultStoreMetricsManager defaultMetricsManager = (DefaultStoreMetricsManager) metricsManager;
Attributes attributes = defaultMetricsManager.newAttributesBuilder()
.put(DefaultStoreMetricsConstant.LABEL_TOPIC, msgExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC)).build();
metricsManager.getTimerMessageSetLatency().record((delayedTime - msgExt.getBornTimestamp()) / 1000, attributes);
defaultMetricsManager.getTimerMessageSetLatency().record((delayedTime - msgExt.getBornTimestamp()) / 1000, attributes);
}
}
} catch (Exception e) {
Expand Down Expand Up @@ -1440,8 +1442,9 @@ protected List<TimerRequest> fetchTimerRequests() throws InterruptedException {
protected void putMessageToTimerWheel(TimerRequest req) {
try {
perfCounterTicks.startTick(ENQUEUE_PUT);
if (messageStore instanceof DefaultMessageStore) {
((DefaultMessageStore) messageStore).getDefaultStoreMetricsManager().incTimerEnqueueCount(getRealTopic(req.getMsg()));
StoreMetricsManager metricsManager = messageStore.getStoreMetricsManager();
if (metricsManager instanceof DefaultStoreMetricsManager) {
((DefaultStoreMetricsManager) metricsManager).incTimerEnqueueCount(getRealTopic(req.getMsg()));
}
if (shouldRunningDequeue && req.getDelayTime() < currWriteTimeMs) {
req.setEnqueueTime(Long.MAX_VALUE);
Expand Down Expand Up @@ -1589,8 +1592,9 @@ public void run() {
perfCounterTicks.startTick(DEQUEUE_PUT);

MessageExt msgExt = tr.getMsg();
if (messageStore instanceof DefaultMessageStore) {
((DefaultMessageStore) messageStore).getDefaultStoreMetricsManager().incTimerDequeueCount(getRealTopic(msgExt));
StoreMetricsManager metricsManager = messageStore.getStoreMetricsManager();
if (metricsManager instanceof DefaultStoreMetricsManager) {
((DefaultStoreMetricsManager) metricsManager).incTimerDequeueCount(getRealTopic(msgExt));
}

if (tr.getEnqueueTime() == Long.MAX_VALUE) {
Expand Down
Loading