Skip to content

Commit aa9ee1d

Browse files
authored
[ISSUE #9689] Fix the issue that master transfer epoch was not updated in time
1 parent fbe9f73 commit aa9ee1d

2 files changed

Lines changed: 16 additions & 1 deletion

File tree

remoting/src/main/java/org/apache/rocketmq/remoting/protocol/EpochEntry.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@
2121

2222
public class EpochEntry extends RemotingSerializable {
2323

24+
public static final long LAST_EPOCH_END_OFFSET = Long.MAX_VALUE;
2425
private int epoch;
2526
private long startOffset;
26-
private long endOffset = Long.MAX_VALUE;
27+
private long endOffset = LAST_EPOCH_END_OFFSET;
2728

2829
public EpochEntry(EpochEntry entry) {
2930
this.epoch = entry.getEpoch();

store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,20 @@ private void transferToSlave() throws Exception {
593593
return;
594594
}
595595

596+
// Check and update currentTransferEpochEndOffset
597+
if (AutoSwitchHAConnection.this.currentTransferEpochEndOffset == -1) {
598+
EpochEntry currentEpochEntry = AutoSwitchHAConnection.this.epochCache.getEntry(AutoSwitchHAConnection.this.currentTransferEpoch);
599+
if (currentEpochEntry != null) {
600+
if (currentEpochEntry.getEndOffset() != EpochEntry.LAST_EPOCH_END_OFFSET) {
601+
LOGGER.info("Update currentTransferEpochEndOffset from -1 to {}", currentEpochEntry.getEndOffset());
602+
AutoSwitchHAConnection.this.currentTransferEpochEndOffset = currentEpochEntry.getEndOffset();
603+
}
604+
} else {
605+
// we should never reach here
606+
LOGGER.warn("[BUG]Can't find currentTransferEpoch [{}] from epoch cache", currentTransferEpoch);
607+
}
608+
}
609+
596610
// We must ensure that the transmitted logs are within the same epoch
597611
// If currentEpochEndOffset == -1, means that currentTransferEpoch = last epoch, so the endOffset = Long.max
598612
final long currentEpochEndOffset = AutoSwitchHAConnection.this.currentTransferEpochEndOffset;

0 commit comments

Comments
 (0)