Skip to content

Commit 93f60db

Browse files
guyinyouguyinyou
andauthored
[ISSUE #9713] Improve data consistency in DefaultMappedFile flush method
- Move FLUSHED_POSITION_UPDATER.set() inside try block - Prevent false positive flush success when actual flush operation fails - Ensure data consistency and prevent potential data loss on system crash Previously, flushed position was updated even when flush operation failed, which could lead to data loss as the system would incorrectly assume data was persisted to disk when it was still in memory. Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
1 parent 1d9b02c commit 93f60db

1 file changed

Lines changed: 5 additions & 6 deletions

File tree

store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
123123
private static final SharedByteBuffer[] SHARED_BYTE_BUFFER;
124124

125125
protected RunningFlags runningFlags;
126+
126127
static class SharedByteBuffer {
127128
private final ReentrantLock lock;
128129
private final ByteBuffer buffer;
@@ -532,14 +533,13 @@ public boolean appendMessageUsingFileChannel(byte[] data) {
532533
*/
533534
@Override
534535
public int flush(final int flushLeastPages) {
536+
if (!isWriteable()) {
537+
return this.getFlushedPosition();
538+
}
535539
if (this.isAbleToFlush(flushLeastPages)) {
536540
if (this.hold()) {
537541
int value = getReadPosition();
538542

539-
if (!isWriteable()) {
540-
return this.getFlushedPosition();
541-
}
542-
543543
try {
544544
this.mappedByteBufferAccessCountSinceLastSwap++;
545545

@@ -555,14 +555,13 @@ public int flush(final int flushLeastPages) {
555555
}
556556
}
557557
this.lastFlushTime = System.currentTimeMillis();
558+
FLUSHED_POSITION_UPDATER.set(this, value);
558559
} catch (Throwable e) {
559560
if (e instanceof IOException) {
560561
getAndMakeNotWriteable();
561562
}
562563
log.error("Error occurred when force data to disk.", e);
563564
}
564-
565-
FLUSHED_POSITION_UPDATER.set(this, value);
566565
this.release();
567566
} else {
568567
log.warn("in flush, hold failed, flush offset = " + FLUSHED_POSITION_UPDATER.get(this));

0 commit comments

Comments
 (0)