diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 2a77ede32af..1d16165c047 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -709,7 +709,7 @@ public void putMessagePositionInfoWrapper(DispatchRequest request) { this.messageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) { this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp()); } - this.messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp()); + this.messageStore.getStoreCheckpoint().setTmpLogicsMsgTimestamp(request.getStoreTimestamp()); if (MultiDispatchUtils.checkMultiDispatchQueue(this.messageStore.getMessageStoreConfig(), request)) { multiDispatchLmqQueue(request, maxRetries); } diff --git a/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java b/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java index b4518f18f80..3a8027267ca 100644 --- a/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java +++ b/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java @@ -33,6 +33,7 @@ public class StoreCheckpoint { private final RandomAccessFile randomAccessFile; private final FileChannel fileChannel; private final MappedByteBuffer mappedByteBuffer; + private volatile long tmpLogicsMsgTimestamp = 0; private volatile long physicMsgTimestamp = 0; private volatile long logicsMsgTimestamp = 0; private volatile long indexMsgTimestamp = 0; @@ -112,6 +113,14 @@ public void setLogicsMsgTimestamp(long logicsMsgTimestamp) { this.logicsMsgTimestamp = logicsMsgTimestamp; } + public long getTmpLogicsMsgTimestamp() { + return tmpLogicsMsgTimestamp; + } + + public void setTmpLogicsMsgTimestamp(long tmpLogicsMsgTimestamp) { + this.tmpLogicsMsgTimestamp = tmpLogicsMsgTimestamp; + } + public long getConfirmPhyOffset() { return confirmPhyOffset; } diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java index 7ad29ff538b..7bfb09928f5 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java @@ -536,7 +536,7 @@ public void putMessagePositionInfoWrapper(DispatchRequest request) { if (BrokerRole.SLAVE == this.messageStore.getMessageStoreConfig().getBrokerRole()) { this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp()); } - this.messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp()); + this.messageStore.getStoreCheckpoint().setTmpLogicsMsgTimestamp(request.getStoreTimestamp()); return; } else { // XXX: warn and notify me diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java index d5d096becd9..992bfb668cf 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java @@ -648,15 +648,17 @@ private void doFlush(int retryTimes) { if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) { this.lastFlushTimestamp = currentTimeMillis; flushConsumeQueueLeastPages = 0; - logicsMsgTimestamp = messageStore.getStoreCheckpoint().getLogicsMsgTimestamp(); + logicsMsgTimestamp = messageStore.getStoreCheckpoint().getTmpLogicsMsgTimestamp(); } + boolean flushOK = true; for (ConcurrentMap maps : consumeQueueTable.values()) { for (ConsumeQueueInterface cq : maps.values()) { boolean result = false; for (int i = 0; i < retryTimes && !result; i++) { result = flush(cq, flushConsumeQueueLeastPages); } + flushOK &= result; } } @@ -664,7 +666,7 @@ private void doFlush(int retryTimes) { messageStore.getCompactionStore().flush(flushConsumeQueueLeastPages); } - if (0 == flushConsumeQueueLeastPages) { + if (flushOK && 0 == flushConsumeQueueLeastPages) { if (logicsMsgTimestamp > 0) { messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp); }