Skip to content

Commit 1075dff

Browse files
committed
fix: incorrect min offset returned from TieredMessageStore when local store returns -1
1 parent ebf1595 commit 1075dff

File tree

2 files changed

+18
-1
lines changed

2 files changed

+18
-1
lines changed

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,9 @@ public long getMinOffsetInQueue(String topic, int queueId) {
310310
if (minOffsetInTieredStore < 0) {
311311
return minOffsetInNextStore;
312312
}
313+
if (minOffsetInNextStore < 0) {
314+
return minOffsetInTieredStore;
315+
}
313316
return Math.min(minOffsetInNextStore, minOffsetInTieredStore);
314317
}
315318

tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,21 @@ public void testGetMinOffsetInQueue() {
238238
Mockito.when(defaultStore.getMinOffsetInQueue(anyString(), anyInt())).thenReturn(100L);
239239
Assert.assertEquals(100L, currentStore.getMinOffsetInQueue(mq.getTopic(), mq.getQueueId()));
240240

241-
Mockito.when(flatFile.getConsumeQueueMinOffset()).thenReturn(10L);
241+
Assert.assertEquals(flatFile.getConsumeQueueMinOffset(),
242+
currentStore.getMinOffsetInQueue(mq.getTopic(), mq.getQueueId()));
243+
244+
FlatFileStore mockFlatFileStore = Mockito.mock(FlatFileStore.class);
245+
FlatMessageFile mockFlatFile = Mockito.mock(FlatMessageFile.class);
246+
Mockito.when(mockFlatFileStore.getFlatFile(any(MessageQueue.class))).thenReturn(mockFlatFile);
247+
Mockito.when(mockFlatFile.getConsumeQueueMinOffset()).thenReturn(10L);
248+
Mockito.when(defaultStore.getMinOffsetInQueue(anyString(), anyInt())).thenReturn(-1L);
249+
try {
250+
Field field = currentStore.getClass().getDeclaredField("flatFileStore");
251+
field.setAccessible(true);
252+
field.set(currentStore, mockFlatFileStore);
253+
} catch (NoSuchFieldException | IllegalAccessException e) {
254+
Assert.fail(e.getClass().getCanonicalName() + ": " + e.getMessage());
255+
}
242256
Assert.assertEquals(10L, currentStore.getMinOffsetInQueue(mq.getTopic(), mq.getQueueId()));
243257
}
244258

0 commit comments

Comments
 (0)