Skip to content

Commit 74ab3ae

Browse files
authored
[ISSUE #9648] Fix getOffsetInQueueByTime missing boundaryType in tieredMessageStore (#9649)
* [ISSUE #9648] Fix getOffsetInQueueByTime missing in tieredMessageStore * Update TieredMessageStoreTest.java * Delete inappropriate UT * Remove unused import
1 parent 2954655 commit 74ab3ae

3 files changed

Lines changed: 30 additions & 45 deletions

File tree

store/src/test/java/org/apache/rocketmq/store/MessageStoreStateMachineTest.java

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import static org.junit.jupiter.api.Assertions.assertEquals;
2121
import static org.junit.jupiter.api.Assertions.assertThrows;
22-
import static org.junit.jupiter.api.Assertions.assertTrue;
2322
import static org.mockito.ArgumentMatchers.anyString;
2423
import static org.mockito.ArgumentMatchers.eq;
2524
import static org.mockito.Mockito.anyLong;
@@ -107,41 +106,4 @@ public void testGetCurrentState() {
107106
// Verify the current state
108107
assertEquals(MessageStoreState.INIT, stateMachine.getCurrentState());
109108
}
110-
111-
/**
112-
* Test getTotalRunningTimeMs method.
113-
*/
114-
@Test
115-
public void testGetTotalRunningTimeMs() {
116-
// Sleep for a short duration to simulate elapsed time
117-
try {
118-
Thread.sleep(100);
119-
} catch (InterruptedException e) {
120-
Thread.currentThread().interrupt();
121-
}
122-
123-
// Verify the total running time is approximately correct
124-
long totalTime = stateMachine.getTotalRunningTimeMs();
125-
assertTrue(totalTime >= 100 && totalTime < 200);
126-
}
127-
128-
/**
129-
* Test getCurrentStateRunningTimeMs method.
130-
*/
131-
@Test
132-
public void testGetCurrentStateRunningTimeMs() {
133-
// Perform a state transition
134-
stateMachine.transitTo(MessageStoreState.LOAD_COMMITLOG_OK);
135-
136-
// Sleep for a short duration to simulate elapsed time
137-
try {
138-
Thread.sleep(100);
139-
} catch (InterruptedException e) {
140-
Thread.currentThread().interrupt();
141-
}
142-
143-
// Verify the current state running time is approximately correct
144-
long currentStateTime = stateMachine.getCurrentStateRunningTimeMs();
145-
assertTrue(currentStateTime >= 100 && currentStateTime < 200);
146-
}
147109
}

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -370,11 +370,11 @@ public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, Bo
370370
.build();
371371
TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), latencyAttributes);
372372
if (offsetInTieredStore == -1L && !isForce) {
373-
return next.getOffsetInQueueByTime(topic, queueId, timestamp);
373+
return next.getOffsetInQueueByTime(topic, queueId, timestamp, boundaryType);
374374
}
375375
return offsetInTieredStore;
376376
}
377-
return next.getOffsetInQueueByTime(topic, queueId, timestamp);
377+
return next.getOffsetInQueueByTime(topic, queueId, timestamp, boundaryType);
378378
}
379379

380380
@Override

tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@
6262
import static org.mockito.ArgumentMatchers.anyInt;
6363
import static org.mockito.ArgumentMatchers.anyLong;
6464
import static org.mockito.ArgumentMatchers.anyString;
65-
import static org.mockito.ArgumentMatchers.eq;
6665
import static org.mockito.Mockito.when;
6766

6867
public class TieredMessageStoreTest {
@@ -275,19 +274,43 @@ public void testGetMessageStoreTimeStampAsync() {
275274

276275
@Test
277276
public void testGetOffsetInQueueByTime() {
277+
final long earliestMsgTime = 100L;
278278
Properties properties = new Properties();
279279
properties.setProperty("tieredStorageLevel", "FORCE");
280280
configuration.update(properties);
281281

282-
Mockito.when(fetcher.getOffsetInQueueByTime(anyString(), anyInt(), anyLong(), eq(BoundaryType.LOWER))).thenReturn(1L);
283-
Mockito.when(defaultStore.getOffsetInQueueByTime(anyString(), anyInt(), anyLong())).thenReturn(2L);
284-
Mockito.when(defaultStore.getEarliestMessageTime()).thenReturn(100L);
282+
Mockito.when(fetcher.getOffsetInQueueByTime(anyString(), anyInt(), anyLong(), any(BoundaryType.class)))
283+
.thenAnswer(ivk -> ivk.getArgument(3, BoundaryType.class) == BoundaryType.LOWER ? 1L : 2L);
284+
Mockito.when(defaultStore.getOffsetInQueueByTime(anyString(), anyInt(), anyLong(), any(BoundaryType.class)))
285+
.thenAnswer(ivk -> {
286+
long time = ivk.getArgument(2, Long.class);
287+
if (time < earliestMsgTime) {
288+
return -1L;
289+
}
290+
return ivk.getArgument(3, BoundaryType.class) == BoundaryType.LOWER ? 3L : 4L;
291+
});
292+
Mockito.when(defaultStore.getEarliestMessageTime()).thenReturn(earliestMsgTime);
293+
294+
// Message not in disk, but force, found in tired storage.
285295
Assert.assertEquals(1L, currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 1000, BoundaryType.LOWER));
296+
Assert.assertEquals(2L, currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 1000, BoundaryType.UPPER));
297+
// Message in disk, and force, found in tired storage.
286298
Assert.assertEquals(1L, currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 0, BoundaryType.LOWER));
299+
Assert.assertEquals(2L, currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 0, BoundaryType.UPPER));
287300

288-
Mockito.when(fetcher.getOffsetInQueueByTime(anyString(), anyInt(), anyLong(), eq(BoundaryType.LOWER))).thenReturn(-1L);
301+
// Message in disk, but force, and not found in tired storage.
302+
Mockito.when(fetcher.getOffsetInQueueByTime(anyString(), anyInt(), anyLong(), any(BoundaryType.class))).thenReturn(-1L);
289303
Assert.assertEquals(-1L, currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 0));
290304
Assert.assertEquals(-1L, currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 0, BoundaryType.LOWER));
305+
306+
properties.setProperty("tieredStorageLevel", "NOT_IN_DISK");
307+
configuration.update(properties);
308+
// Message not in disk, and not found in tired storage.
309+
Assert.assertEquals(-1L, currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 0, BoundaryType.LOWER));
310+
Assert.assertEquals(-1L, currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 0, BoundaryType.UPPER));
311+
// Message in disk, and found in disk.
312+
Assert.assertEquals(3L, currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 1000, BoundaryType.LOWER));
313+
Assert.assertEquals(4L, currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 1000, BoundaryType.UPPER));
291314
}
292315

293316
@Test

0 commit comments

Comments
 (0)