|
64 | 64 | import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService; |
65 | 65 | import org.apache.rocketmq.store.lock.AdaptiveBackOffSpinLockImpl; |
66 | 66 | import org.apache.rocketmq.store.logfile.MappedFile; |
| 67 | +import org.apache.rocketmq.store.queue.ConsumeQueueInterface; |
| 68 | +import org.apache.rocketmq.store.queue.CqUnit; |
| 69 | +import org.apache.rocketmq.store.queue.ReferredIterator; |
67 | 70 | import org.apache.rocketmq.store.util.LibC; |
68 | 71 | import org.rocksdb.RocksDBException; |
69 | 72 |
|
@@ -2442,16 +2445,15 @@ public boolean isMsgInColdArea(String group, String topic, int queueId, long off |
2442 | 2445 | return false; |
2443 | 2446 | } |
2444 | 2447 | try { |
2445 | | - ConsumeQueue consumeQueue = (ConsumeQueue) defaultMessageStore.findConsumeQueue(topic, queueId); |
| 2448 | + ConsumeQueueInterface consumeQueue = defaultMessageStore.findConsumeQueue(topic, queueId); |
2446 | 2449 | if (null == consumeQueue) { |
2447 | 2450 | return false; |
2448 | 2451 | } |
2449 | | - SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset); |
2450 | | - if (null == bufferConsumeQueue || null == bufferConsumeQueue.getByteBuffer()) { |
| 2452 | + ReferredIterator<CqUnit> bufferConsumeQueue = consumeQueue.iterateFrom(offset, 1); |
| 2453 | + if (null == bufferConsumeQueue || !bufferConsumeQueue.hasNext()) { |
2451 | 2454 | return false; |
2452 | 2455 | } |
2453 | | - long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); |
2454 | | - return defaultMessageStore.checkInColdAreaByCommitOffset(offsetPy, getMaxOffset()); |
| 2456 | + return defaultMessageStore.checkInColdAreaByCommitOffset(bufferConsumeQueue.next().getPos(), getMaxOffset()); |
2455 | 2457 | } catch (Exception e) { |
2456 | 2458 | log.error("isMsgInColdArea group: {}, topic: {}, queueId: {}, offset: {}", |
2457 | 2459 | group, topic, queueId, offset, e); |
|
0 commit comments