Skip to content

Commit e5d3372

Browse files
authored
[ISSUE #10223] Not query the index of system topics in tiered storage (#10224)
1 parent 103b7bc commit e5d3372

3 files changed

Lines changed: 17 additions & 1 deletion

File tree

store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1501,7 +1501,8 @@ public QueryMessageResult queryMessage(String topic, String key, int maxNum, lon
15011501
return queryMessageResult;
15021502
}
15031503

1504-
@Override public CompletableFuture<QueryMessageResult> queryMessageAsync(String topic, String key,
1504+
@Override
1505+
public CompletableFuture<QueryMessageResult> queryMessageAsync(String topic, String key,
15051506
int maxNum, long begin, long end) {
15061507
return CompletableFuture.completedFuture(queryMessage(topic, key, maxNum, begin, end));
15071508
}

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public class MessageStoreFetcherImpl implements MessageStoreFetcher {
6161
private final TieredMessageStore messageStore;
6262
private final IndexService indexService;
6363
private final FlatFileStore flatFileStore;
64+
private final MessageStoreFilter topicFilter;
6465
private final long memoryMaxSize;
6566
private final Cache<String /* topic@queueId@offset */, SelectBufferResult> fetcherCache;
6667

@@ -78,6 +79,7 @@ public MessageStoreFetcherImpl(TieredMessageStore messageStore, MessageStoreConf
7879
this.messageStore = messageStore;
7980
this.indexService = indexService;
8081
this.metadataStore = flatFileStore.getMetadataStore();
82+
this.topicFilter = messageStore.getTopicFilter();
8183
this.memoryMaxSize =
8284
(long) (Runtime.getRuntime().maxMemory() * storeConfig.getReadAheadCacheSizeThresholdRate());
8385
this.fetcherCache = this.initCache(storeConfig);
@@ -437,6 +439,10 @@ public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, Bo
437439
public CompletableFuture<QueryMessageResult> queryMessageAsync(
438440
String topic, String key, int maxCount, long begin, long end) {
439441

442+
if (topicFilter.filterTopic(topic)) {
443+
return CompletableFuture.completedFuture(new QueryMessageResult());
444+
}
445+
440446
long topicId;
441447
try {
442448
TopicMetadata topicMetadata = metadataStore.getTopic(topic);

tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImplTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
import java.nio.ByteBuffer;
2222
import java.time.Duration;
2323
import java.util.concurrent.atomic.AtomicLong;
24+
import org.apache.commons.lang3.reflect.FieldUtils;
2425
import org.apache.rocketmq.common.BoundaryType;
2526
import org.apache.rocketmq.common.message.MessageQueue;
27+
import org.apache.rocketmq.common.topic.TopicValidator;
2628
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
2729
import org.apache.rocketmq.store.DefaultMessageFilter;
2830
import org.apache.rocketmq.store.GetMessageResult;
@@ -81,6 +83,9 @@ public void getMessageFromTieredStoreTest() throws Exception {
8183
groupName, mq.getTopic(), 0, 0, 32, null).join();
8284
Assert.assertEquals(GetMessageStatus.NO_MATCHED_LOGIC_QUEUE, getMessageResult.getStatus());
8385

86+
FieldUtils.writeField(fetcher,
87+
"topicFilter", new MessageStoreTopicFilter(storeConfig), true);
88+
8489
getMessageResult = fetcher.getMessageAsync(
8590
groupName, mq.getTopic(), mq.getQueueId(), 0, 32, null).join();
8691
Assert.assertEquals(GetMessageStatus.OFFSET_TOO_SMALL, getMessageResult.getStatus());
@@ -325,5 +330,9 @@ public void testQueryMessageAsync() throws Exception {
325330
queryMessageResult = fetcher.queryMessageAsync(
326331
mq.getTopic(), "uk", 120, 0L, System.currentTimeMillis()).join();
327332
Assert.assertEquals(100, queryMessageResult.getMessageBufferList().size());
333+
334+
queryMessageResult = fetcher.queryMessageAsync(TopicValidator.SYSTEM_TOPIC_PREFIX + mq.getTopic(),
335+
"uk", 120, 0L, System.currentTimeMillis()).join();
336+
Assert.assertEquals(0, queryMessageResult.getMessageBufferList().size());
328337
}
329338
}

0 commit comments

Comments
 (0)