Skip to content

Commit 932588d

Browse files
authored
[ISSUE #10201] Optimize queryOffset method overloads in IndexService (#10202)
1 parent 59033a3 commit 932588d

File tree

2 files changed

+49
-49
lines changed

2 files changed

+49
-49
lines changed

store/src/main/java/org/apache/rocketmq/store/index/IndexService.java

Lines changed: 3 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -167,49 +167,14 @@ public void destroy() {
167167
}
168168

169169
public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) {
170-
long indexLastUpdateTimestamp = 0;
171-
long indexLastUpdatePhyoffset = 0;
172-
maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());
173-
List<Long> phyOffsets = new ArrayList<>(maxNum);
174-
try {
175-
this.readWriteLock.readLock().lock();
176-
if (!this.indexFileList.isEmpty()) {
177-
for (int i = this.indexFileList.size(); i > 0; i--) {
178-
IndexFile f = this.indexFileList.get(i - 1);
179-
boolean lastFile = i == this.indexFileList.size();
180-
if (lastFile) {
181-
indexLastUpdateTimestamp = f.getEndTimestamp();
182-
indexLastUpdatePhyoffset = f.getEndPhyOffset();
183-
}
184-
185-
if (f.isTimeMatched(begin, end)) {
186-
187-
f.selectPhyOffset(phyOffsets, buildKey(topic, key), maxNum, begin, end);
188-
}
189-
190-
if (f.getBeginTimestamp() < begin) {
191-
break;
192-
}
193-
194-
if (phyOffsets.size() >= maxNum) {
195-
break;
196-
}
197-
}
198-
}
199-
} catch (Exception e) {
200-
LOGGER.error("queryMsg exception", e);
201-
} finally {
202-
this.readWriteLock.readLock().unlock();
203-
}
204-
205-
return new QueryOffsetResult(phyOffsets, indexLastUpdateTimestamp, indexLastUpdatePhyoffset);
170+
return queryOffset(topic, key, maxNum, begin, end, null);
206171
}
207172

208173
public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end, String indexType) {
209-
List<Long> phyOffsets = new ArrayList<>(maxNum);
210174
long indexLastUpdateTimestamp = 0;
211175
long indexLastUpdatePhyoffset = 0;
212176
maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());
177+
List<Long> phyOffsets = new ArrayList<>(maxNum);
213178
try {
214179
this.readWriteLock.readLock().lock();
215180
if (!this.indexFileList.isEmpty()) {
@@ -241,7 +206,7 @@ public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long
241206
}
242207
}
243208
} catch (Exception e) {
244-
LOGGER.error("queryMsg queryOffset exception", e);
209+
LOGGER.error("queryOffset exception", e);
245210
} finally {
246211
this.readWriteLock.readLock().unlock();
247212
}

store/src/test/java/org/apache/rocketmq/store/index/IndexServiceTest.java

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,29 +21,64 @@
2121
import org.apache.rocketmq.store.DefaultMessageStore;
2222
import org.apache.rocketmq.store.config.MessageStoreConfig;
2323
import org.apache.rocketmq.store.stats.BrokerStatsManager;
24+
import org.junit.Before;
2425
import org.junit.Test;
2526

27+
import java.util.Collections;
2628
import java.util.concurrent.ConcurrentHashMap;
2729

30+
import static org.junit.Assert.assertEquals;
31+
import static org.junit.Assert.assertNotNull;
2832
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
2933

30-
3134
public class IndexServiceTest {
3235

36+
private IndexService indexService;
37+
38+
@Before
39+
public void setUp() throws Exception {
40+
DefaultMessageStore store = new DefaultMessageStore(
41+
new MessageStoreConfig(),
42+
new BrokerStatsManager(new BrokerConfig()),
43+
null,
44+
new BrokerConfig(),
45+
new ConcurrentHashMap<>()
46+
);
47+
indexService = new IndexService(store);
48+
}
49+
3350
@Test
34-
public void testQueryOffsetThrow() throws Exception {
51+
public void testQueryOffsetThrow() {
3552
assertDoesNotThrow(() -> {
36-
DefaultMessageStore store = new DefaultMessageStore(
37-
new MessageStoreConfig(),
38-
new BrokerStatsManager(new BrokerConfig()),
39-
null,
40-
new BrokerConfig(),
41-
new ConcurrentHashMap<>()
42-
);
43-
44-
IndexService indexService = new IndexService(store);
4553
indexService.queryOffset("test", "", Integer.MAX_VALUE, 10, 100);
4654
});
4755
}
4856

57+
@Test
58+
public void testQueryOffsetWithoutIndexType() {
59+
QueryOffsetResult result = indexService.queryOffset("test", "testKey", 10, 0, 100);
60+
assertNotNull(result);
61+
assertEquals(Collections.emptyList(), result.getPhyOffsets());
62+
}
63+
64+
@Test
65+
public void testQueryOffsetWithIndexType() {
66+
QueryOffsetResult result = indexService.queryOffset("test", "testKey", 10, 0, 100, "TAG");
67+
assertNotNull(result);
68+
assertEquals(Collections.emptyList(), result.getPhyOffsets());
69+
}
70+
71+
@Test
72+
public void testQueryOffsetWithNullKey() {
73+
QueryOffsetResult result = indexService.queryOffset("test", null, 10, 0, 100);
74+
assertNotNull(result);
75+
assertEquals(Collections.emptyList(), result.getPhyOffsets());
76+
}
77+
78+
@Test
79+
public void testQueryOffsetWithZeroMaxNum() {
80+
QueryOffsetResult result = indexService.queryOffset("test", "testKey", 0, 0, 100);
81+
assertNotNull(result);
82+
assertEquals(Collections.emptyList(), result.getPhyOffsets());
83+
}
4984
}

0 commit comments

Comments
 (0)