|
25 | 25 | import java.util.concurrent.CompletableFuture; |
26 | 26 | import java.util.concurrent.TimeUnit; |
27 | 27 | import org.apache.rocketmq.common.BoundaryType; |
| 28 | +import org.apache.rocketmq.common.MixAll; |
28 | 29 | import org.apache.rocketmq.common.message.MessageQueue; |
29 | 30 | import org.apache.rocketmq.store.GetMessageResult; |
30 | 31 | import org.apache.rocketmq.store.GetMessageStatus; |
@@ -52,6 +53,7 @@ public class MessageStoreFetcherImpl implements MessageStoreFetcher { |
52 | 53 | private static final Logger log = LoggerFactory.getLogger(MessageStoreUtil.TIERED_STORE_LOGGER_NAME); |
53 | 54 |
|
54 | 55 | protected static final String CACHE_KEY_FORMAT = "%s@%d@%d"; |
| 56 | + protected static final String FETCHER_GROUP_NAME = MixAll.CID_RMQ_SYS_PREFIX + "FETCHER_TIMESTAMP"; |
55 | 57 |
|
56 | 58 | private final String brokerName; |
57 | 59 | private final MetadataStore metadataStore; |
@@ -389,18 +391,37 @@ public CompletableFuture<Long> getMessageStoreTimeStampAsync(String topic, int q |
389 | 391 | return CompletableFuture.completedFuture(-1L); |
390 | 392 | } |
391 | 393 |
|
392 | | - return flatFile.getConsumeQueueAsync(queueOffset) |
393 | | - .thenComposeAsync(cqItem -> { |
394 | | - long commitLogOffset = MessageFormatUtil.getCommitLogOffsetFromItem(cqItem); |
395 | | - int size = MessageFormatUtil.getSizeFromItem(cqItem); |
396 | | - return flatFile.getCommitLogAsync(commitLogOffset, size); |
397 | | - }, messageStore.getStoreExecutor().bufferFetchExecutor) |
398 | | - .thenApply(MessageFormatUtil::getStoreTimeStamp) |
399 | | - .exceptionally(e -> { |
400 | | - log.error("MessageStoreFetcherImpl#getMessageStoreTimeStampAsync: " + |
401 | | - "get or decode message failed, topic={}, queue={}, offset={}", topic, queueId, queueOffset, e); |
402 | | - return -1L; |
403 | | - }); |
| 394 | + // The Metrics thread frequently retrieves the storage timestamp of the latest message; |
| 395 | + // as an alternative, return the queue's saved timestamp here. |
| 396 | + if (queueOffset + 1L == flatFile.getConsumeQueueCommitOffset()) { |
| 397 | + long timestamp = flatFile.getMaxStoreTimestamp(); |
| 398 | + return CompletableFuture.completedFuture(timestamp == Long.MAX_VALUE ? -1L : timestamp); |
| 399 | + } |
| 400 | + |
| 401 | + CompletableFuture<Long> future = new CompletableFuture<>(); |
| 402 | + try { |
| 403 | + this.getMessageAsync(FETCHER_GROUP_NAME, topic, queueId, queueOffset, 1, null) |
| 404 | + .whenComplete((result, e) -> { |
| 405 | + if (e != null) { |
| 406 | + log.error("MessageStoreFetcherImpl#getMessageStoreTimeStampAsync: " + |
| 407 | + "Get or decode message failed, topic={}, queue={}, offset={}", topic, queueId, queueOffset, e); |
| 408 | + future.completeExceptionally(e); |
| 409 | + return; |
| 410 | + } |
| 411 | + if (result != null && result.getMessageBufferList() != null |
| 412 | + && !result.getMessageBufferList().isEmpty()) { |
| 413 | + long timestamp = MessageFormatUtil.getStoreTimeStamp(result.getMessageBufferList().get(0)); |
| 414 | + log.info("MessageStoreFetcherImpl#getMessageStoreTimeStampAsync: " + |
| 415 | + "topic={}, queue={}, offset={}, timestamp={}", topic, queueId, queueOffset, timestamp); |
| 416 | + future.complete(timestamp); |
| 417 | + } else { |
| 418 | + future.complete(-1L); |
| 419 | + } |
| 420 | + }); |
| 421 | + } catch (Throwable t) { |
| 422 | + future.completeExceptionally(t); |
| 423 | + } |
| 424 | + return future; |
404 | 425 | } |
405 | 426 |
|
406 | 427 | @Override |
|
0 commit comments