Skip to content

Commit 4ae0294

Browse files
guyinyouguyinyou
andauthored
[ISSUE #9693] Add writeWithoutMmap configuration to prevent JVM crash when device becomes read-only (#9694)
* add "writeWithoutMmap" * fix npe --------- Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
1 parent edaffe6 commit 4ae0294

13 files changed

Lines changed: 1052 additions & 37 deletions

store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,16 +172,17 @@ private boolean mmapOperation() {
172172
long beginTime = System.currentTimeMillis();
173173

174174
MappedFile mappedFile;
175+
boolean writeWithoutMmap = messageStore.getMessageStoreConfig().isWriteWithoutMmap();
175176
if (messageStore.isTransientStorePoolEnable()) {
176177
try {
177178
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
178179
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
179180
} catch (RuntimeException e) {
180181
log.warn("Use default implementation.");
181-
mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
182+
mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool(), writeWithoutMmap);
182183
}
183184
} else {
184-
mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize());
185+
mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize(), writeWithoutMmap);
185186
}
186187

187188
long elapsedTime = UtilAll.computeElapsedTimeMilliseconds(beginTime);
@@ -195,7 +196,9 @@ private boolean mmapOperation() {
195196
if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
196197
.getMappedFileSizeCommitLog()
197198
&&
198-
this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
199+
this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()
200+
&&
201+
!this.messageStore.getMessageStoreConfig().isWriteWithoutMmap()) {
199202
mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
200203
this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
201204
}

store/src/main/java/org/apache/rocketmq/store/CommitLog.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,8 @@ public CommitLog(final DefaultMessageStore messageStore) {
118118
} else {
119119
this.mappedFileQueue = new MappedFileQueue(storePath,
120120
messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
121-
messageStore.getAllocateMappedFileService());
121+
messageStore.getAllocateMappedFileService(),
122+
messageStore.getMessageStoreConfig().isWriteWithoutMmap());
122123
}
123124

124125
this.defaultMessageStore = messageStore;

store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,12 @@ public ConsumeQueue(final String topic, final int queueId, final String storePat
9898
+ File.separator + topic
9999
+ File.separator + queueId;
100100

101-
this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);
101+
boolean writeWithoutMmap = false;
102+
if (messageStore.getMessageStoreConfig() != null) {
103+
writeWithoutMmap = messageStore.getMessageStoreConfig().isWriteWithoutMmap();
104+
}
105+
106+
this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null, writeWithoutMmap);
102107

103108
this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
104109

@@ -108,7 +113,8 @@ public ConsumeQueue(final String topic, final int queueId, final String storePat
108113
queueId,
109114
StorePathConfigHelper.getStorePathConsumeQueueExt(messageStore.getMessageStoreConfig().getStorePathRootDir()),
110115
messageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueueExt(),
111-
messageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt()
116+
messageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt(),
117+
writeWithoutMmap
112118
);
113119
}
114120
}

store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,42 @@ public ConsumeQueueExt(final String topic,
9090
}
9191
}
9292

93+
/**
94+
* Constructor with writeWithoutMmap support.
95+
*
96+
* @param topic topic
97+
* @param queueId id of queue
98+
* @param storePath root dir of files to store.
99+
* @param mappedFileSize file size
100+
* @param bitMapLength bit map length.
101+
* @param writeWithoutMmap whether to use RandomAccessFile instead of MappedByteBuffer
102+
*/
103+
public ConsumeQueueExt(final String topic,
104+
final int queueId,
105+
final String storePath,
106+
final int mappedFileSize,
107+
final int bitMapLength,
108+
final boolean writeWithoutMmap) {
109+
110+
this.storePath = storePath;
111+
this.mappedFileSize = mappedFileSize;
112+
113+
this.topic = topic;
114+
this.queueId = queueId;
115+
116+
String queueDir = this.storePath
117+
+ File.separator + topic
118+
+ File.separator + queueId;
119+
120+
this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null, writeWithoutMmap);
121+
122+
if (bitMapLength > 0) {
123+
this.tempContainer = ByteBuffer.allocate(
124+
bitMapLength / Byte.SIZE
125+
);
126+
}
127+
}
128+
93129
public long getTotalSize() {
94130
return this.mappedFileQueue.getTotalFileSize();
95131
}

store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@ public class MappedFileQueue implements Swappable {
5353
protected long committedWhere = 0;
5454

5555
protected volatile long storeTimestamp = 0;
56+
57+
/**
58+
* Configuration flag to use RandomAccessFile instead of MappedByteBuffer for writing
59+
*/
60+
protected boolean writeWithoutMmap = false;
5661

5762
public MappedFileQueue(final String storePath, int mappedFileSize,
5863
AllocateMappedFileService allocateMappedFileService) {
@@ -61,6 +66,14 @@ public MappedFileQueue(final String storePath, int mappedFileSize,
6166
this.allocateMappedFileService = allocateMappedFileService;
6267
}
6368

69+
public MappedFileQueue(final String storePath, int mappedFileSize,
70+
AllocateMappedFileService allocateMappedFileService, boolean writeWithoutMmap) {
71+
this.storePath = storePath;
72+
this.mappedFileSize = mappedFileSize;
73+
this.allocateMappedFileService = allocateMappedFileService;
74+
this.writeWithoutMmap = writeWithoutMmap;
75+
}
76+
6477
public void checkSelf() {
6578
List<MappedFile> mappedFiles = new ArrayList<>(this.mappedFiles);
6679
if (!mappedFiles.isEmpty()) {
@@ -266,7 +279,7 @@ public boolean doLoad(List<File> files) {
266279
}
267280

268281
try {
269-
MappedFile mappedFile = new DefaultMappedFile(file.getPath(), mappedFileSize);
282+
MappedFile mappedFile = new DefaultMappedFile(file.getPath(), mappedFileSize, writeWithoutMmap);
270283

271284
mappedFile.setWrotePosition(this.mappedFileSize);
272285
mappedFile.setFlushedPosition(this.mappedFileSize);
@@ -356,7 +369,7 @@ protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFile
356369
nextNextFilePath, this.mappedFileSize);
357370
} else {
358371
try {
359-
mappedFile = new DefaultMappedFile(nextFilePath, this.mappedFileSize);
372+
mappedFile = new DefaultMappedFile(nextFilePath, this.mappedFileSize, this.writeWithoutMmap);
360373
} catch (IOException e) {
361374
log.error("create mappedFile exception", e);
362375
}

store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ public class MultiPathMappedFileQueue extends MappedFileQueue {
3939
public MultiPathMappedFileQueue(MessageStoreConfig messageStoreConfig, int mappedFileSize,
4040
AllocateMappedFileService allocateMappedFileService,
4141
Supplier<Set<String>> fullStorePathsSupplier) {
42-
super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize, allocateMappedFileService);
42+
super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize, allocateMappedFileService,
43+
messageStoreConfig.isWriteWithoutMmap());
4344
this.config = messageStoreConfig;
4445
this.fullStorePathsSupplier = fullStorePathsSupplier;
4546
}

store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,13 @@ public class MessageStoreConfig {
238238
private int transientStorePoolSize = 5;
239239
private boolean fastFailIfNoBufferInStorePool = false;
240240

241+
/**
242+
* When true, use RandomAccessFile for writing instead of MappedByteBuffer.
243+
* This can be useful for certain scenarios where mmap is not desired.
244+
*/
245+
@ImportantField
246+
private boolean writeWithoutMmap = false;
247+
241248
// DLedger message store config
242249
private boolean enableDLegerCommitLog = false;
243250
private String dLegerGroup;
@@ -1147,6 +1154,14 @@ public void setTransientStorePoolEnable(final boolean transientStorePoolEnable)
11471154
this.transientStorePoolEnable = transientStorePoolEnable;
11481155
}
11491156

1157+
public boolean isWriteWithoutMmap() {
1158+
return writeWithoutMmap;
1159+
}
1160+
1161+
public void setWriteWithoutMmap(final boolean writeWithoutMmap) {
1162+
this.writeWithoutMmap = writeWithoutMmap;
1163+
}
1164+
11501165
public int getTransientStorePoolSize() {
11511166
return transientStorePoolSize;
11521167
}

0 commit comments

Comments
 (0)