Skip to content

Commit 35a7474

Browse files
authored
[ISSUE #9802] Fix the issue of delete logic in tiered storage index service (#9803)
1 parent e7d930f commit 35a7474

2 files changed

Lines changed: 8 additions & 4 deletions

File tree

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,10 @@ public CompletableFuture<QueryMessageResult> queryMessageAsync(
442442
if (flatFile == null) {
443443
continue;
444444
}
445+
if (indexItem.getOffset() < flatFile.getCommitLogMinOffset() ||
446+
indexItem.getOffset() > flatFile.getCommitLogMaxOffset()) {
447+
continue;
448+
}
445449
CompletableFuture<SelectMappedBufferResult> getMessageFuture = flatFile
446450
.getCommitLogAsync(indexItem.getOffset(), indexItem.getSize())
447451
.thenApply(messageBuffer -> new SelectMappedBufferResult(

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.util.Map;
2929
import java.util.Set;
3030
import java.util.concurrent.CompletableFuture;
31-
import java.util.concurrent.ConcurrentHashMap;
3231
import java.util.concurrent.ConcurrentNavigableMap;
3332
import java.util.concurrent.ConcurrentSkipListMap;
3433
import java.util.concurrent.TimeUnit;
@@ -238,15 +237,15 @@ public CompletableFuture<List<IndexItem>> queryAsync(
238237
ConcurrentNavigableMap<Long, IndexFile> pendingMap =
239238
this.timeStoreTable.subMap(beginTime, true, endTime, true);
240239
List<CompletableFuture<Void>> futureList = new ArrayList<>(pendingMap.size());
241-
ConcurrentHashMap<String /* queueId-offset */, IndexItem> result = new ConcurrentHashMap<>();
240+
ConcurrentSkipListMap<String /* queueId-offset */, IndexItem> result = new ConcurrentSkipListMap<>();
242241

243242
for (Map.Entry<Long, IndexFile> entry : pendingMap.descendingMap().entrySet()) {
244243
CompletableFuture<Void> completableFuture = entry.getValue()
245244
.queryAsync(topic, key, maxCount, beginTime, endTime)
246245
.thenAccept(itemList -> itemList.forEach(indexItem -> {
247246
if (result.size() < maxCount) {
248247
result.put(String.format(
249-
"%d-%d", indexItem.getQueueId(), indexItem.getOffset()), indexItem);
248+
"%d-%20d", indexItem.getQueueId(), indexItem.getOffset()), indexItem);
250249
}
251250
}));
252251
futureList.add(completableFuture);
@@ -349,7 +348,8 @@ public void destroyExpiredFile(long expireTimestamp) {
349348
flatAppendFile.destroyExpiredFile(expireTimestamp);
350349
timeStoreTable.entrySet().removeIf(entry ->
351350
IndexFile.IndexStatusEnum.UPLOAD.equals(entry.getValue().getFileStatus()) &&
352-
entry.getKey() < flatAppendFile.getMinTimestamp());
351+
(flatAppendFile.getFileSegmentList().isEmpty() ||
352+
entry.getKey() < flatAppendFile.getMinTimestamp()));
353353
int tableSize = (int) timeStoreTable.entrySet().stream()
354354
.filter(entry -> IndexFile.IndexStatusEnum.UPLOAD.equals(entry.getValue().getFileStatus()))
355355
.count();

0 commit comments

Comments
 (0)