Skip to content

Commit 1d9b02c

Browse files
guyinyouguyinyou
andauthored
[ISSUE #9707] Integrate RunningFlags with MappedFile system for better error handling and state management (#9708)
* Add RunningFlags support to MappedFileQueue - Integrate RunningFlags throughout MappedFileQueue hierarchy - Add writeable state checking and error handling in DefaultMappedFile - Update MappedFile interface and constructors to support RunningFlags - Implement proper error state management during flush operations * fix ut * fix ut --------- Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
1 parent d0e31a2 commit 1d9b02c

6 files changed

Lines changed: 79 additions & 26 deletions

File tree

store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,13 +176,13 @@ private boolean mmapOperation() {
176176
if (messageStore.isTransientStorePoolEnable()) {
177177
try {
178178
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
179-
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
179+
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getRunningFlags(), messageStore.getTransientStorePool());
180180
} catch (RuntimeException e) {
181181
log.warn("Use default implementation.");
182-
mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool(), writeWithoutMmap);
182+
mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize(), messageStore.getRunningFlags(), messageStore.getTransientStorePool(), writeWithoutMmap);
183183
}
184184
} else {
185-
mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize(), writeWithoutMmap);
185+
mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize(), messageStore.getRunningFlags(), writeWithoutMmap);
186186
}
187187

188188
long elapsedTime = UtilAll.computeElapsedTimeMilliseconds(beginTime);

store/src/main/java/org/apache/rocketmq/store/CommitLog.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,12 @@ public CommitLog(final DefaultMessageStore messageStore) {
114114
if (storePath.contains(MixAll.MULTI_PATH_SPLITTER)) {
115115
this.mappedFileQueue = new MultiPathMappedFileQueue(messageStore.getMessageStoreConfig(),
116116
messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
117-
messageStore.getAllocateMappedFileService(), this::getFullStorePaths);
117+
messageStore.getAllocateMappedFileService(), this::getFullStorePaths, messageStore.getRunningFlags());
118118
} else {
119119
this.mappedFileQueue = new MappedFileQueue(storePath,
120120
messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
121121
messageStore.getAllocateMappedFileService(),
122+
messageStore.getRunningFlags(),
122123
messageStore.getMessageStoreConfig().isWriteWithoutMmap());
123124
}
124125

store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,24 +53,35 @@ public class MappedFileQueue implements Swappable {
5353
protected long committedWhere = 0;
5454

5555
protected volatile long storeTimestamp = 0;
56-
56+
57+
protected RunningFlags runningFlags;
58+
5759
/**
5860
* Configuration flag to use RandomAccessFile instead of MappedByteBuffer for writing
5961
*/
6062
protected boolean writeWithoutMmap = false;
6163

6264
public MappedFileQueue(final String storePath, int mappedFileSize,
6365
AllocateMappedFileService allocateMappedFileService) {
64-
this.storePath = storePath;
65-
this.mappedFileSize = mappedFileSize;
66-
this.allocateMappedFileService = allocateMappedFileService;
66+
this(storePath, mappedFileSize, allocateMappedFileService, null, false);
67+
}
68+
69+
public MappedFileQueue(final String storePath, int mappedFileSize,
70+
AllocateMappedFileService allocateMappedFileService, RunningFlags runningFlags) {
71+
this(storePath, mappedFileSize, allocateMappedFileService, runningFlags, false);
6772
}
6873

6974
public MappedFileQueue(final String storePath, int mappedFileSize,
7075
AllocateMappedFileService allocateMappedFileService, boolean writeWithoutMmap) {
76+
this(storePath, mappedFileSize, allocateMappedFileService, null, writeWithoutMmap);
77+
}
78+
79+
public MappedFileQueue(final String storePath, int mappedFileSize,
80+
AllocateMappedFileService allocateMappedFileService, RunningFlags runningFlags, boolean writeWithoutMmap) {
7181
this.storePath = storePath;
7282
this.mappedFileSize = mappedFileSize;
7383
this.allocateMappedFileService = allocateMappedFileService;
84+
this.runningFlags = runningFlags;
7485
this.writeWithoutMmap = writeWithoutMmap;
7586
}
7687

@@ -279,7 +290,7 @@ public boolean doLoad(List<File> files) {
279290
}
280291

281292
try {
282-
MappedFile mappedFile = new DefaultMappedFile(file.getPath(), mappedFileSize, writeWithoutMmap);
293+
MappedFile mappedFile = new DefaultMappedFile(file.getPath(), mappedFileSize, runningFlags, writeWithoutMmap);
283294

284295
mappedFile.setWrotePosition(this.mappedFileSize);
285296
mappedFile.setFlushedPosition(this.mappedFileSize);
@@ -369,7 +380,7 @@ protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFile
369380
nextNextFilePath, this.mappedFileSize);
370381
} else {
371382
try {
372-
mappedFile = new DefaultMappedFile(nextFilePath, this.mappedFileSize, this.writeWithoutMmap);
383+
mappedFile = new DefaultMappedFile(nextFilePath, this.mappedFileSize, runningFlags, this.writeWithoutMmap);
373384
} catch (IOException e) {
374385
log.error("create mappedFile exception", e);
375386
}

store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,15 @@ public class MultiPathMappedFileQueue extends MappedFileQueue {
3636
private final MessageStoreConfig config;
3737
private final Supplier<Set<String>> fullStorePathsSupplier;
3838

39+
public MultiPathMappedFileQueue(MessageStoreConfig messageStoreConfig, int mappedFileSize,
40+
AllocateMappedFileService allocateMappedFileService,
41+
Supplier<Set<String>> fullStorePathsSupplier) {
42+
this(messageStoreConfig, mappedFileSize, allocateMappedFileService, fullStorePathsSupplier, null);
43+
}
3944
public MultiPathMappedFileQueue(MessageStoreConfig messageStoreConfig, int mappedFileSize,
4045
AllocateMappedFileService allocateMappedFileService,
41-
Supplier<Set<String>> fullStorePathsSupplier) {
42-
super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize, allocateMappedFileService,
46+
Supplier<Set<String>> fullStorePathsSupplier, RunningFlags runningFlags) {
47+
super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize, allocateMappedFileService, runningFlags,
4348
messageStoreConfig.isWriteWithoutMmap());
4449
this.config = messageStoreConfig;
4550
this.fullStorePathsSupplier = fullStorePathsSupplier;

store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.rocketmq.store.AppendMessageStatus;
5454
import org.apache.rocketmq.store.CompactionAppendMsgCallback;
5555
import org.apache.rocketmq.store.PutMessageContext;
56+
import org.apache.rocketmq.store.RunningFlags;
5657
import org.apache.rocketmq.store.SelectMappedBufferResult;
5758
import org.apache.rocketmq.store.TransientStorePool;
5859
import org.apache.rocketmq.store.config.FlushDiskType;
@@ -121,6 +122,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
121122
private static int maxSharedNum = 16;
122123
private static final SharedByteBuffer[] SHARED_BYTE_BUFFER;
123124

125+
protected RunningFlags runningFlags;
124126
static class SharedByteBuffer {
125127
private final ReentrantLock lock;
126128
private final ByteBuffer buffer;
@@ -173,24 +175,36 @@ public DefaultMappedFile() {
173175
}
174176

175177
public DefaultMappedFile(final String fileName, final int fileSize) throws IOException {
176-
init(fileName, fileSize);
178+
this(fileName, fileSize, null);
177179
}
178180

179-
public DefaultMappedFile(final String fileName, final int fileSize,
181+
public DefaultMappedFile(final String fileName, final int fileSize, boolean writeWithoutMmap) throws IOException {
182+
this(fileName, fileSize, null, null, writeWithoutMmap);
183+
}
184+
185+
public DefaultMappedFile(final String fileName, final int fileSize, RunningFlags runningFlags) throws IOException {
186+
this(fileName, fileSize, runningFlags, null, false);
187+
}
188+
189+
public DefaultMappedFile(final String fileName, final int fileSize, final RunningFlags runningFlags,
180190
final TransientStorePool transientStorePool) throws IOException {
181-
init(fileName, fileSize, transientStorePool);
191+
this(fileName, fileSize, runningFlags, transientStorePool, false);
182192
}
183193

184-
public DefaultMappedFile(final String fileName, final int fileSize,
194+
public DefaultMappedFile(final String fileName, final int fileSize, final RunningFlags runningFlags,
185195
final boolean writeWithoutMmap) throws IOException {
186-
this.writeWithoutMmap = writeWithoutMmap;
187-
init(fileName, fileSize);
196+
this(fileName, fileSize, runningFlags, null, writeWithoutMmap);
188197
}
189198

190199
public DefaultMappedFile(final String fileName, final int fileSize,
200+
final TransientStorePool transientStorePool, final boolean writeWithoutMmap) throws IOException {
201+
this(fileName, fileSize, null, transientStorePool, writeWithoutMmap);
202+
}
203+
204+
public DefaultMappedFile(final String fileName, final int fileSize, final RunningFlags runningFlags,
191205
final TransientStorePool transientStorePool, final boolean writeWithoutMmap) throws IOException {
192206
this.writeWithoutMmap = writeWithoutMmap;
193-
init(fileName, fileSize, transientStorePool);
207+
init(fileName, fileSize, runningFlags, transientStorePool);
194208
}
195209

196210
public static int getTotalMappedFiles() {
@@ -202,30 +216,30 @@ public static long getTotalMappedVirtualMemory() {
202216
}
203217

204218
@Override
205-
public void init(final String fileName, final int fileSize,
219+
public void init(final String fileName, final int fileSize, final RunningFlags runningFlags,
206220
final TransientStorePool transientStorePool) throws IOException {
207-
init(fileName, fileSize);
221+
init(fileName, fileSize, runningFlags);
208222
if (transientStorePool != null) {
209223
this.writeBuffer = transientStorePool.borrowBuffer();
210224
this.transientStorePool = transientStorePool;
211225
}
212226
}
213227

214-
private void init(final String fileName, final int fileSize) throws IOException {
228+
private void init(final String fileName, final int fileSize, final RunningFlags runningFlags) throws IOException {
215229
this.fileName = fileName;
216230
this.fileSize = fileSize;
217231
this.file = new File(fileName);
218232
this.fileFromOffset = Long.parseLong(this.file.getName());
233+
this.runningFlags = runningFlags;
219234
boolean ok = false;
220235

221236
UtilAll.ensureDirOK(this.file.getParent());
222237

223238
try {
224-
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
239+
this.randomAccessFile = new RandomAccessFile(this.file, "rw");
240+
this.fileChannel = this.randomAccessFile.getChannel();
225241

226242
if (writeWithoutMmap) {
227-
// Use RandomAccessFile for writing instead of MappedByteBuffer
228-
this.randomAccessFile = new RandomAccessFile(this.file, "rw");
229243
// Still create MappedByteBuffer for reading operations
230244
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_ONLY, 0, fileSize);
231245
} else {
@@ -522,6 +536,10 @@ public int flush(final int flushLeastPages) {
522536
if (this.hold()) {
523537
int value = getReadPosition();
524538

539+
if (!isWriteable()) {
540+
return this.getFlushedPosition();
541+
}
542+
525543
try {
526544
this.mappedByteBufferAccessCountSinceLastSwap++;
527545

@@ -538,6 +556,9 @@ public int flush(final int flushLeastPages) {
538556
}
539557
this.lastFlushTime = System.currentTimeMillis();
540558
} catch (Throwable e) {
559+
if (e instanceof IOException) {
560+
getAndMakeNotWriteable();
561+
}
541562
log.error("Error occurred when force data to disk.", e);
542563
}
543564

@@ -597,6 +618,20 @@ protected void commit0() {
597618
}
598619
}
599620

621+
public boolean getAndMakeNotWriteable() {
622+
if (runningFlags == null) {
623+
return false;
624+
}
625+
return runningFlags.getAndMakeNotWriteable();
626+
}
627+
628+
public boolean isWriteable() {
629+
if (runningFlags == null) {
630+
return true;
631+
}
632+
return runningFlags.isWriteable();
633+
}
634+
600635
private boolean isAbleToFlush(final int flushLeastPages) {
601636
int flush = FLUSHED_POSITION_UPDATER.get(this);
602637
int write = getReadPosition();

store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.rocketmq.store.AppendMessageResult;
2929
import org.apache.rocketmq.store.CompactionAppendMsgCallback;
3030
import org.apache.rocketmq.store.PutMessageContext;
31+
import org.apache.rocketmq.store.RunningFlags;
3132
import org.apache.rocketmq.store.SelectMappedBufferResult;
3233
import org.apache.rocketmq.store.TransientStorePool;
3334
import org.apache.rocketmq.store.config.FlushDiskType;
@@ -368,7 +369,7 @@ public interface MappedFile {
368369
* @param transientStorePool transient store pool
369370
* @throws IOException
370371
*/
371-
void init(String fileName, int fileSize, TransientStorePool transientStorePool) throws IOException;
372+
void init(String fileName, int fileSize, RunningFlags runningFlags, TransientStorePool transientStorePool) throws IOException;
372373

373374
Iterator<SelectMappedBufferResult> iterator(int pos);
374375

0 commit comments

Comments
 (0)