Skip to content

Commit 1e5232d

Browse files
lizhiminsRongtongJin
authored andcommitted
[ISSUE #9213] Fix get the earliest time error when data is clean up in tiered storage (#9214)
* [ISSUE #9213] Fix get the earliest time error when data is clean up in tiered storag
1 parent 7f80aea commit 1e5232d

4 files changed

Lines changed: 22 additions & 27 deletions

File tree

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
public class TieredMessageStore extends AbstractPluginMessageStore {
6363

6464
protected static final Logger log = LoggerFactory.getLogger(MessageStoreUtil.TIERED_STORE_LOGGER_NAME);
65+
protected static final long MIN_STORE_TIME = -1L;
6566

6667
protected final String brokerName;
6768
protected final MessageStore defaultStore;
@@ -310,24 +311,21 @@ public long getEarliestMessageTime(String topic, int queueId) {
310311
return getEarliestMessageTimeAsync(topic, queueId).join();
311312
}
312313

314+
/**
315+
* In the original design, getting the earliest time of the first message
316+
* would generate two RPC requests. However, using the timestamp stored in the metadata
317+
* avoids these requests, although this approach might introduce some level of inaccuracy.
318+
*/
313319
@Override
314320
public CompletableFuture<Long> getEarliestMessageTimeAsync(String topic, int queueId) {
315-
long nextEarliestMessageTime = next.getEarliestMessageTime(topic, queueId);
316-
long finalNextEarliestMessageTime = nextEarliestMessageTime > 0 ? nextEarliestMessageTime : Long.MAX_VALUE;
317-
Stopwatch stopwatch = Stopwatch.createStarted();
321+
long localMinTime = next.getEarliestMessageTime(topic, queueId);
318322
return fetcher.getEarliestMessageTimeAsync(topic, queueId)
319-
.thenApply(time -> {
320-
Attributes latencyAttributes = TieredStoreMetricsManager.newAttributesBuilder()
321-
.put(TieredStoreMetricsConstant.LABEL_OPERATION, TieredStoreMetricsConstant.OPERATION_API_GET_EARLIEST_MESSAGE_TIME)
322-
.put(TieredStoreMetricsConstant.LABEL_TOPIC, topic)
323-
.build();
324-
TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), latencyAttributes);
325-
if (time < 0) {
326-
log.debug("GetEarliestMessageTimeAsync failed, try to get earliest message time from next store: topic: {}, queue: {}",
327-
topic, queueId);
328-
return finalNextEarliestMessageTime != Long.MAX_VALUE ? finalNextEarliestMessageTime : -1;
323+
.thenApply(remoteMinTime -> {
324+
if (localMinTime > MIN_STORE_TIME && remoteMinTime > MIN_STORE_TIME) {
325+
return Math.min(localMinTime, remoteMinTime);
329326
}
330-
return Math.min(finalNextEarliestMessageTime, time);
327+
return localMinTime > MIN_STORE_TIME ? localMinTime :
328+
(remoteMinTime > MIN_STORE_TIME ? remoteMinTime : MIN_STORE_TIME);
331329
});
332330
}
333331

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -375,14 +375,7 @@ public CompletableFuture<GetMessageResult> getMessageAsync(
375375
@Override
376376
public CompletableFuture<Long> getEarliestMessageTimeAsync(String topic, int queueId) {
377377
FlatMessageFile flatFile = flatFileStore.getFlatFile(new MessageQueue(topic, brokerName, queueId));
378-
if (flatFile == null) {
379-
return CompletableFuture.completedFuture(-1L);
380-
}
381-
382-
// read from timestamp to timestamp + length
383-
int length = MessageFormatUtil.STORE_TIMESTAMP_POSITION + 8;
384-
return flatFile.getCommitLogAsync(flatFile.getCommitLogMinOffset(), length)
385-
.thenApply(MessageFormatUtil::getStoreTimeStamp);
378+
return CompletableFuture.completedFuture(flatFile == null ? -1L : flatFile.getMinStoreTimestamp());
386379
}
387380

388381
@Override

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -178,16 +178,20 @@ public AppendResult appendConsumeQueue(DispatchRequest request) {
178178
return consumeQueue.append(buffer, request.getStoreTimestamp());
179179
}
180180

181-
182-
183181
@Override
184182
public void release() {
185-
186183
}
187184

188185
@Override
189186
public long getMinStoreTimestamp() {
190-
return commitLog.getMinTimestamp();
187+
long minStoreTime = -1L;
188+
if (Long.MAX_VALUE != commitLog.getMinTimestamp()) {
189+
minStoreTime = Math.max(minStoreTime, commitLog.getMinTimestamp());
190+
}
191+
if (Long.MAX_VALUE != consumeQueue.getMinTimestamp()) {
192+
minStoreTime = Math.max(minStoreTime, consumeQueue.getMinTimestamp());
193+
}
194+
return minStoreTime;
191195
}
192196

193197
@Override

tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ public void testGetMinOffsetInQueue() {
246246
@Test
247247
public void testGetEarliestMessageTimeAsync() {
248248
when(fetcher.getEarliestMessageTimeAsync(anyString(), anyInt())).thenReturn(CompletableFuture.completedFuture(1L));
249-
Assert.assertEquals(1, (long) currentStore.getEarliestMessageTimeAsync(mq.getTopic(), mq.getQueueId()).join());
249+
Assert.assertEquals(0, (long) currentStore.getEarliestMessageTimeAsync(mq.getTopic(), mq.getQueueId()).join());
250250

251251
when(fetcher.getEarliestMessageTimeAsync(anyString(), anyInt())).thenReturn(CompletableFuture.completedFuture(-1L));
252252
when(defaultStore.getEarliestMessageTime(anyString(), anyInt())).thenReturn(2L);

0 commit comments

Comments
 (0)