diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/EpochEntry.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/EpochEntry.java index 4ff81760adb..9e8f1aab427 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/EpochEntry.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/EpochEntry.java @@ -21,9 +21,10 @@ public class EpochEntry extends RemotingSerializable { + public static final long LAST_EPOCH_END_OFFSET = Long.MAX_VALUE; private int epoch; private long startOffset; - private long endOffset = Long.MAX_VALUE; + private long endOffset = LAST_EPOCH_END_OFFSET; public EpochEntry(EpochEntry entry) { this.epoch = entry.getEpoch(); diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java index 440cd3c7a50..cc55937aebb 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java @@ -593,6 +593,20 @@ private void transferToSlave() throws Exception { return; } + // Check and update currentTransferEpochEndOffset + if (AutoSwitchHAConnection.this.currentTransferEpochEndOffset == -1) { + EpochEntry currentEpochEntry = AutoSwitchHAConnection.this.epochCache.getEntry(AutoSwitchHAConnection.this.currentTransferEpoch); + if (currentEpochEntry != null) { + if (currentEpochEntry.getEndOffset() != EpochEntry.LAST_EPOCH_END_OFFSET) { + LOGGER.info("Update currentTransferEpochEndOffset from -1 to {}", currentEpochEntry.getEndOffset()); + AutoSwitchHAConnection.this.currentTransferEpochEndOffset = currentEpochEntry.getEndOffset(); + } + } else { + // we should never reach here + LOGGER.warn("[BUG]Can't find currentTransferEpoch [{}] from epoch cache", currentTransferEpoch); + } + } + // We must ensure that the transmitted logs are within the same epoch // If currentEpochEndOffset == -1, means that currentTransferEpoch = last epoch, so the endOffset = Long.max final long currentEpochEndOffset = AutoSwitchHAConnection.this.currentTransferEpochEndOffset;