diff --git a/store/src/test/java/org/apache/rocketmq/store/MessageStoreStateMachineTest.java b/store/src/test/java/org/apache/rocketmq/store/MessageStoreStateMachineTest.java index d00074dbcbc..333e419681a 100644 --- a/store/src/test/java/org/apache/rocketmq/store/MessageStoreStateMachineTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/MessageStoreStateMachineTest.java @@ -19,7 +19,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.anyLong; @@ -107,41 +106,4 @@ public void testGetCurrentState() { // Verify the current state assertEquals(MessageStoreState.INIT, stateMachine.getCurrentState()); } - - /** - * Test getTotalRunningTimeMs method. - */ - @Test - public void testGetTotalRunningTimeMs() { - // Sleep for a short duration to simulate elapsed time - try { - Thread.sleep(100); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - - // Verify the total running time is approximately correct - long totalTime = stateMachine.getTotalRunningTimeMs(); - assertTrue(totalTime >= 100 && totalTime < 200); - } - - /** - * Test getCurrentStateRunningTimeMs method. - */ - @Test - public void testGetCurrentStateRunningTimeMs() { - // Perform a state transition - stateMachine.transitTo(MessageStoreState.LOAD_COMMITLOG_OK); - - // Sleep for a short duration to simulate elapsed time - try { - Thread.sleep(100); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - - // Verify the current state running time is approximately correct - long currentStateTime = stateMachine.getCurrentStateRunningTimeMs(); - assertTrue(currentStateTime >= 100 && currentStateTime < 200); - } } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java index f1c935d00b7..19b587fa327 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java @@ -370,11 +370,11 @@ public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, Bo .build(); TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), latencyAttributes); if (offsetInTieredStore == -1L && !isForce) { - return next.getOffsetInQueueByTime(topic, queueId, timestamp); + return next.getOffsetInQueueByTime(topic, queueId, timestamp, boundaryType); } return offsetInTieredStore; } - return next.getOffsetInQueueByTime(topic, queueId, timestamp); + return next.getOffsetInQueueByTime(topic, queueId, timestamp, boundaryType); } @Override diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java index bb259ae811a..1a0240681c0 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java @@ -62,7 +62,6 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.when; public class TieredMessageStoreTest { @@ -275,19 +274,43 @@ public void testGetMessageStoreTimeStampAsync() { @Test public void testGetOffsetInQueueByTime() { + final long earliestMsgTime = 100L; Properties properties = new Properties(); properties.setProperty("tieredStorageLevel", "FORCE"); configuration.update(properties); - Mockito.when(fetcher.getOffsetInQueueByTime(anyString(), anyInt(), anyLong(), eq(BoundaryType.LOWER))).thenReturn(1L); - Mockito.when(defaultStore.getOffsetInQueueByTime(anyString(), anyInt(), anyLong())).thenReturn(2L); - Mockito.when(defaultStore.getEarliestMessageTime()).thenReturn(100L); + Mockito.when(fetcher.getOffsetInQueueByTime(anyString(), anyInt(), anyLong(), any(BoundaryType.class))) + .thenAnswer(ivk -> ivk.getArgument(3, BoundaryType.class) == BoundaryType.LOWER ? 1L : 2L); + Mockito.when(defaultStore.getOffsetInQueueByTime(anyString(), anyInt(), anyLong(), any(BoundaryType.class))) + .thenAnswer(ivk -> { + long time = ivk.getArgument(2, Long.class); + if (time < earliestMsgTime) { + return -1L; + } + return ivk.getArgument(3, BoundaryType.class) == BoundaryType.LOWER ? 3L : 4L; + }); + Mockito.when(defaultStore.getEarliestMessageTime()).thenReturn(earliestMsgTime); + + // Message not in disk, but force, found in tired storage. Assert.assertEquals(1L, currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 1000, BoundaryType.LOWER)); + Assert.assertEquals(2L, currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 1000, BoundaryType.UPPER)); + // Message in disk, and force, found in tired storage. Assert.assertEquals(1L, currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 0, BoundaryType.LOWER)); + Assert.assertEquals(2L, currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 0, BoundaryType.UPPER)); - Mockito.when(fetcher.getOffsetInQueueByTime(anyString(), anyInt(), anyLong(), eq(BoundaryType.LOWER))).thenReturn(-1L); + // Message in disk, but force, and not found in tired storage. + Mockito.when(fetcher.getOffsetInQueueByTime(anyString(), anyInt(), anyLong(), any(BoundaryType.class))).thenReturn(-1L); Assert.assertEquals(-1L, currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 0)); Assert.assertEquals(-1L, currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 0, BoundaryType.LOWER)); + + properties.setProperty("tieredStorageLevel", "NOT_IN_DISK"); + configuration.update(properties); + // Message not in disk, and not found in tired storage. + Assert.assertEquals(-1L, currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 0, BoundaryType.LOWER)); + Assert.assertEquals(-1L, currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 0, BoundaryType.UPPER)); + // Message in disk, and found in disk. + Assert.assertEquals(3L, currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 1000, BoundaryType.LOWER)); + Assert.assertEquals(4L, currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 1000, BoundaryType.UPPER)); } @Test