Skip to content

Commit 26480dd

Browse files
guyinyouguyinyou
andauthored
[ISSUE #10073] Fix StoreCheckpoint logicsMsgTimestamp to reflect flushed consume queue storetime (#10074)
* Fix StoreCheckpoint: set logicsMsgTimestamp only after CQ flush, use logicsMsgTempTimestamp for in-memory storetime Change-Id: I6085bf6efaef84168ece31d080481717465f2b13 * rename to tmpLogicsMsgTimestamp Change-Id: Ia65ca06751f765bdc2bf053c58e08789f4b2fb22 --------- Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
1 parent d254f79 commit 26480dd

File tree

4 files changed

+15
-4
lines changed

4 files changed

+15
-4
lines changed

store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -709,7 +709,7 @@ public void putMessagePositionInfoWrapper(DispatchRequest request) {
709709
this.messageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
710710
this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
711711
}
712-
this.messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
712+
this.messageStore.getStoreCheckpoint().setTmpLogicsMsgTimestamp(request.getStoreTimestamp());
713713
if (MultiDispatchUtils.checkMultiDispatchQueue(this.messageStore.getMessageStoreConfig(), request)) {
714714
multiDispatchLmqQueue(request, maxRetries);
715715
}

store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public class StoreCheckpoint {
3333
private final RandomAccessFile randomAccessFile;
3434
private final FileChannel fileChannel;
3535
private final MappedByteBuffer mappedByteBuffer;
36+
private volatile long tmpLogicsMsgTimestamp = 0;
3637
private volatile long physicMsgTimestamp = 0;
3738
private volatile long logicsMsgTimestamp = 0;
3839
private volatile long indexMsgTimestamp = 0;
@@ -112,6 +113,14 @@ public void setLogicsMsgTimestamp(long logicsMsgTimestamp) {
112113
this.logicsMsgTimestamp = logicsMsgTimestamp;
113114
}
114115

116+
public long getTmpLogicsMsgTimestamp() {
117+
return tmpLogicsMsgTimestamp;
118+
}
119+
120+
public void setTmpLogicsMsgTimestamp(long tmpLogicsMsgTimestamp) {
121+
this.tmpLogicsMsgTimestamp = tmpLogicsMsgTimestamp;
122+
}
123+
115124
public long getConfirmPhyOffset() {
116125
return confirmPhyOffset;
117126
}

store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,7 @@ public void putMessagePositionInfoWrapper(DispatchRequest request) {
536536
if (BrokerRole.SLAVE == this.messageStore.getMessageStoreConfig().getBrokerRole()) {
537537
this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
538538
}
539-
this.messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
539+
this.messageStore.getStoreCheckpoint().setTmpLogicsMsgTimestamp(request.getStoreTimestamp());
540540
return;
541541
} else {
542542
// XXX: warn and notify me

store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -648,23 +648,25 @@ private void doFlush(int retryTimes) {
648648
if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
649649
this.lastFlushTimestamp = currentTimeMillis;
650650
flushConsumeQueueLeastPages = 0;
651-
logicsMsgTimestamp = messageStore.getStoreCheckpoint().getLogicsMsgTimestamp();
651+
logicsMsgTimestamp = messageStore.getStoreCheckpoint().getTmpLogicsMsgTimestamp();
652652
}
653653

654+
boolean flushOK = true;
654655
for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : consumeQueueTable.values()) {
655656
for (ConsumeQueueInterface cq : maps.values()) {
656657
boolean result = false;
657658
for (int i = 0; i < retryTimes && !result; i++) {
658659
result = flush(cq, flushConsumeQueueLeastPages);
659660
}
661+
flushOK &= result;
660662
}
661663
}
662664

663665
if (messageStoreConfig.isEnableCompaction()) {
664666
messageStore.getCompactionStore().flush(flushConsumeQueueLeastPages);
665667
}
666668

667-
if (0 == flushConsumeQueueLeastPages) {
669+
if (flushOK && 0 == flushConsumeQueueLeastPages) {
668670
if (logicsMsgTimestamp > 0) {
669671
messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
670672
}

0 commit comments

Comments
 (0)