From e47d18502f5a977542027b2742b25532469fe073 Mon Sep 17 00:00:00 2001 From: dingshuangxi888 Date: Tue, 29 Apr 2025 15:02:20 +0800 Subject: [PATCH 1/4] Fix bugs for query index in tieredstore. --- .../tieredstore/index/IndexStoreService.java | 39 +++++++++++-------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java index 75c61dcb382..fd1311431f8 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java @@ -18,6 +18,20 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.store.logfile.DefaultMappedFile; +import org.apache.rocketmq.store.logfile.MappedFile; +import org.apache.rocketmq.tieredstore.MessageStoreConfig; +import org.apache.rocketmq.tieredstore.common.AppendResult; +import org.apache.rocketmq.tieredstore.file.FlatAppendFile; +import org.apache.rocketmq.tieredstore.file.FlatFileFactory; +import org.apache.rocketmq.tieredstore.provider.FileSegment; +import org.apache.rocketmq.tieredstore.util.MessageStoreUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.File; import java.nio.ByteBuffer; import java.nio.file.Paths; @@ -35,19 +49,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.common.ServiceThread; -import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.store.logfile.DefaultMappedFile; -import org.apache.rocketmq.store.logfile.MappedFile; -import org.apache.rocketmq.tieredstore.MessageStoreConfig; -import org.apache.rocketmq.tieredstore.common.AppendResult; -import org.apache.rocketmq.tieredstore.file.FlatAppendFile; -import org.apache.rocketmq.tieredstore.file.FlatFileFactory; -import org.apache.rocketmq.tieredstore.provider.FileSegment; -import org.apache.rocketmq.tieredstore.util.MessageStoreUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class IndexStoreService extends ServiceThread implements IndexService { @@ -224,15 +225,17 @@ public AppendResult putKey( @Override public CompletableFuture> queryAsync( - String topic, String key, int maxCount, long beginTime, long endTime) { + String topic, String key, int maxCount, long beginTime, long endTime) { + + if (beginTime > endTime) { + return CompletableFuture.completedFuture(new ArrayList<>()); + } CompletableFuture> future = new CompletableFuture<>(); try { readWriteLock.readLock().lock(); - long firstFileTimeStamp = this.timeStoreTable.lowerKey(beginTime) == null ? - this.timeStoreTable.firstKey() : this.timeStoreTable.lowerKey(beginTime); ConcurrentNavigableMap pendingMap = - this.timeStoreTable.subMap(firstFileTimeStamp, true, endTime, true); + this.timeStoreTable.subMap(beginTime, true, endTime, true); List> futureList = new ArrayList<>(pendingMap.size()); ConcurrentHashMap result = new ConcurrentHashMap<>(); @@ -260,6 +263,8 @@ public CompletableFuture> queryAsync( } }); } catch (Exception e) { + log.error("IndexStoreService#queryAsync, topicId={}, key={}, maxCount={}, timestamp={}-{}", + topic, key, maxCount, beginTime, endTime, e); future.completeExceptionally(e); } finally { readWriteLock.readLock().unlock(); From a02a24b93b5e3c31075ee580e387ef5f661ec362 Mon Sep 17 00:00:00 2001 From: dingshuangxi888 Date: Tue, 29 Apr 2025 15:05:04 +0800 Subject: [PATCH 2/4] Fix bugs for query index in tieredstore. --- .../apache/rocketmq/tieredstore/index/IndexStoreService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java index fd1311431f8..75adebc12a9 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java @@ -225,7 +225,7 @@ public AppendResult putKey( @Override public CompletableFuture> queryAsync( - String topic, String key, int maxCount, long beginTime, long endTime) { + String topic, String key, int maxCount, long beginTime, long endTime) { if (beginTime > endTime) { return CompletableFuture.completedFuture(new ArrayList<>()); @@ -264,7 +264,7 @@ public CompletableFuture> queryAsync( }); } catch (Exception e) { log.error("IndexStoreService#queryAsync, topicId={}, key={}, maxCount={}, timestamp={}-{}", - topic, key, maxCount, beginTime, endTime, e); + topic, key, maxCount, beginTime, endTime, e); future.completeExceptionally(e); } finally { readWriteLock.readLock().unlock(); From 1235760c26d0a0b468d76ef5e29e609ecd9fae63 Mon Sep 17 00:00:00 2001 From: dingshuangxi888 Date: Tue, 29 Apr 2025 15:05:31 +0800 Subject: [PATCH 3/4] Fix bugs for query index in tieredstore. --- .../apache/rocketmq/tieredstore/index/IndexStoreService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java index 75adebc12a9..6311df5a7c7 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java @@ -235,7 +235,7 @@ public CompletableFuture> queryAsync( try { readWriteLock.readLock().lock(); ConcurrentNavigableMap pendingMap = - this.timeStoreTable.subMap(beginTime, true, endTime, true); + this.timeStoreTable.subMap(beginTime, true, endTime, true); List> futureList = new ArrayList<>(pendingMap.size()); ConcurrentHashMap result = new ConcurrentHashMap<>(); From 6ae0f6570deb80d7fc945c0f13b0dae21cfbf405 Mon Sep 17 00:00:00 2001 From: dingshuangxi888 Date: Tue, 29 Apr 2025 15:07:12 +0800 Subject: [PATCH 4/4] Fix bugs for query index in tieredstore. --- .../tieredstore/index/IndexStoreService.java | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java index 6311df5a7c7..7fe645da0f6 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java @@ -18,20 +18,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; -import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.common.ServiceThread; -import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.store.logfile.DefaultMappedFile; -import org.apache.rocketmq.store.logfile.MappedFile; -import org.apache.rocketmq.tieredstore.MessageStoreConfig; -import org.apache.rocketmq.tieredstore.common.AppendResult; -import org.apache.rocketmq.tieredstore.file.FlatAppendFile; -import org.apache.rocketmq.tieredstore.file.FlatFileFactory; -import org.apache.rocketmq.tieredstore.provider.FileSegment; -import org.apache.rocketmq.tieredstore.util.MessageStoreUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.File; import java.nio.ByteBuffer; import java.nio.file.Paths; @@ -49,6 +35,19 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.store.logfile.DefaultMappedFile; +import org.apache.rocketmq.store.logfile.MappedFile; +import org.apache.rocketmq.tieredstore.MessageStoreConfig; +import org.apache.rocketmq.tieredstore.common.AppendResult; +import org.apache.rocketmq.tieredstore.file.FlatAppendFile; +import org.apache.rocketmq.tieredstore.file.FlatFileFactory; +import org.apache.rocketmq.tieredstore.provider.FileSegment; +import org.apache.rocketmq.tieredstore.util.MessageStoreUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class IndexStoreService extends ServiceThread implements IndexService {