Skip to content

Commit 7b4f6c6

Browse files
committed
[ISSUE #9912] Reduce excessive requests for consumer offset timestamps in tiered storage
Signed-off-by: terrance.lzm <terrance.lzm@alibaba-inc.com>
1 parent 4c66580 commit 7b4f6c6

File tree

2 files changed

+34
-19
lines changed

2 files changed

+34
-19
lines changed

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -362,8 +362,7 @@ public long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueu
362362
}
363363

364364
@Override
365-
public CompletableFuture<Long> getMessageStoreTimeStampAsync(String topic, int queueId,
366-
long consumeQueueOffset) {
365+
public CompletableFuture<Long> getMessageStoreTimeStampAsync(String topic, int queueId, long consumeQueueOffset) {
367366
if (fetchFromCurrentStore(topic, queueId, consumeQueueOffset)) {
368367
Stopwatch stopwatch = Stopwatch.createStarted();
369368
return fetcher.getMessageStoreTimeStampAsync(topic, queueId, consumeQueueOffset)
@@ -374,11 +373,6 @@ public CompletableFuture<Long> getMessageStoreTimeStampAsync(String topic, int q
374373
.put(TieredStoreMetricsConstant.LABEL_TOPIC, topic)
375374
.build();
376375
TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), latencyAttributes);
377-
if (time == -1) {
378-
log.debug("GetEarliestMessageTimeAsync failed, try to get message time from next store, topic: {}, queue: {}, queue offset: {}",
379-
topic, queueId, consumeQueueOffset);
380-
return next.getMessageStoreTimeStamp(topic, queueId, consumeQueueOffset);
381-
}
382376
return time;
383377
});
384378
}

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.CompletableFuture;
2626
import java.util.concurrent.TimeUnit;
2727
import org.apache.rocketmq.common.BoundaryType;
28+
import org.apache.rocketmq.common.MixAll;
2829
import org.apache.rocketmq.common.message.MessageQueue;
2930
import org.apache.rocketmq.store.GetMessageResult;
3031
import org.apache.rocketmq.store.GetMessageStatus;
@@ -52,6 +53,7 @@ public class MessageStoreFetcherImpl implements MessageStoreFetcher {
5253
private static final Logger log = LoggerFactory.getLogger(MessageStoreUtil.TIERED_STORE_LOGGER_NAME);
5354

5455
protected static final String CACHE_KEY_FORMAT = "%s@%d@%d";
56+
protected static final String FETCHER_GROUP_NAME = MixAll.CID_RMQ_SYS_PREFIX + "FETCHER_TIMESTAMP";
5557

5658
private final String brokerName;
5759
private final MetadataStore metadataStore;
@@ -389,18 +391,37 @@ public CompletableFuture<Long> getMessageStoreTimeStampAsync(String topic, int q
389391
return CompletableFuture.completedFuture(-1L);
390392
}
391393

392-
return flatFile.getConsumeQueueAsync(queueOffset)
393-
.thenComposeAsync(cqItem -> {
394-
long commitLogOffset = MessageFormatUtil.getCommitLogOffsetFromItem(cqItem);
395-
int size = MessageFormatUtil.getSizeFromItem(cqItem);
396-
return flatFile.getCommitLogAsync(commitLogOffset, size);
397-
}, messageStore.getStoreExecutor().bufferFetchExecutor)
398-
.thenApply(MessageFormatUtil::getStoreTimeStamp)
399-
.exceptionally(e -> {
400-
log.error("MessageStoreFetcherImpl#getMessageStoreTimeStampAsync: " +
401-
"get or decode message failed, topic={}, queue={}, offset={}", topic, queueId, queueOffset, e);
402-
return -1L;
403-
});
394+
// The Metrics thread frequently retrieves the storage timestamp of the latest message;
395+
// as an alternative, return the queue's saved timestamp here.
396+
if (queueOffset + 1L == flatFile.getConsumeQueueCommitOffset()) {
397+
long timestamp = flatFile.getMaxStoreTimestamp();
398+
return CompletableFuture.completedFuture(timestamp == Long.MAX_VALUE ? -1L : timestamp);
399+
}
400+
401+
CompletableFuture<Long> future = new CompletableFuture<>();
402+
try {
403+
this.getMessageAsync(FETCHER_GROUP_NAME, topic, queueId, queueOffset, 1, null)
404+
.whenComplete((result, e) -> {
405+
if (e != null) {
406+
log.error("MessageStoreFetcherImpl#getMessageStoreTimeStampAsync: " +
407+
"Get or decode message failed, topic={}, queue={}, offset={}", topic, queueId, queueOffset, e);
408+
future.completeExceptionally(e);
409+
return;
410+
}
411+
if (result != null && result.getMessageBufferList() != null
412+
&& !result.getMessageBufferList().isEmpty()) {
413+
long timestamp = MessageFormatUtil.getStoreTimeStamp(result.getMessageBufferList().get(0));
414+
log.info("MessageStoreFetcherImpl#getMessageStoreTimeStampAsync: " +
415+
"topic={}, queue={}, offset={}, timestamp={}", topic, queueId, queueOffset, timestamp);
416+
future.complete(timestamp);
417+
} else {
418+
future.complete(-1L);
419+
}
420+
});
421+
} catch (Throwable t) {
422+
future.completeExceptionally(t);
423+
}
424+
return future;
404425
}
405426

406427
@Override

0 commit comments

Comments
 (0)