Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,16 +172,17 @@ private boolean mmapOperation() {
long beginTime = System.currentTimeMillis();

MappedFile mappedFile;
boolean writeWithoutMmap = messageStore.getMessageStoreConfig().isWriteWithoutMmap();
if (messageStore.isTransientStorePoolEnable()) {
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {
log.warn("Use default implementation.");
mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool(), writeWithoutMmap);
}
} else {
mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize());
mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize(), writeWithoutMmap);
}

long elapsedTime = UtilAll.computeElapsedTimeMilliseconds(beginTime);
Expand All @@ -195,7 +196,9 @@ private boolean mmapOperation() {
if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
.getMappedFileSizeCommitLog()
&&
this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()
&&
!this.messageStore.getMessageStoreConfig().isWriteWithoutMmap()) {
mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
}
Expand Down
3 changes: 2 additions & 1 deletion store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ public CommitLog(final DefaultMessageStore messageStore) {
} else {
this.mappedFileQueue = new MappedFileQueue(storePath,
messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
messageStore.getAllocateMappedFileService());
messageStore.getAllocateMappedFileService(),
messageStore.getMessageStoreConfig().isWriteWithoutMmap());
}

this.defaultMessageStore = messageStore;
Expand Down
10 changes: 8 additions & 2 deletions store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,12 @@ public ConsumeQueue(final String topic, final int queueId, final String storePat
+ File.separator + topic
+ File.separator + queueId;

this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);
boolean writeWithoutMmap = false;
if (messageStore.getMessageStoreConfig() != null) {
writeWithoutMmap = messageStore.getMessageStoreConfig().isWriteWithoutMmap();
}

this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null, writeWithoutMmap);

this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);

Expand All @@ -108,7 +113,8 @@ public ConsumeQueue(final String topic, final int queueId, final String storePat
queueId,
StorePathConfigHelper.getStorePathConsumeQueueExt(messageStore.getMessageStoreConfig().getStorePathRootDir()),
messageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueueExt(),
messageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt()
messageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt(),
writeWithoutMmap
);
}
}
Expand Down
36 changes: 36 additions & 0 deletions store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,42 @@ public ConsumeQueueExt(final String topic,
}
}

/**
* Constructor with writeWithoutMmap support.
*
* @param topic topic
* @param queueId id of queue
* @param storePath root dir of files to store.
* @param mappedFileSize file size
* @param bitMapLength bit map length.
* @param writeWithoutMmap whether to use RandomAccessFile instead of MappedByteBuffer
*/
public ConsumeQueueExt(final String topic,
final int queueId,
final String storePath,
final int mappedFileSize,
final int bitMapLength,
final boolean writeWithoutMmap) {

this.storePath = storePath;
this.mappedFileSize = mappedFileSize;

this.topic = topic;
this.queueId = queueId;

String queueDir = this.storePath
+ File.separator + topic
+ File.separator + queueId;

this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null, writeWithoutMmap);

if (bitMapLength > 0) {
this.tempContainer = ByteBuffer.allocate(
bitMapLength / Byte.SIZE
);
}
}

public long getTotalSize() {
return this.mappedFileQueue.getTotalFileSize();
}
Expand Down
17 changes: 15 additions & 2 deletions store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public class MappedFileQueue implements Swappable {
protected long committedWhere = 0;

protected volatile long storeTimestamp = 0;

/**
* Configuration flag to use RandomAccessFile instead of MappedByteBuffer for writing
*/
protected boolean writeWithoutMmap = false;

public MappedFileQueue(final String storePath, int mappedFileSize,
AllocateMappedFileService allocateMappedFileService) {
Expand All @@ -61,6 +66,14 @@ public MappedFileQueue(final String storePath, int mappedFileSize,
this.allocateMappedFileService = allocateMappedFileService;
}

public MappedFileQueue(final String storePath, int mappedFileSize,
AllocateMappedFileService allocateMappedFileService, boolean writeWithoutMmap) {
this.storePath = storePath;
this.mappedFileSize = mappedFileSize;
this.allocateMappedFileService = allocateMappedFileService;
this.writeWithoutMmap = writeWithoutMmap;
}

public void checkSelf() {
List<MappedFile> mappedFiles = new ArrayList<>(this.mappedFiles);
if (!mappedFiles.isEmpty()) {
Expand Down Expand Up @@ -266,7 +279,7 @@ public boolean doLoad(List<File> files) {
}

try {
MappedFile mappedFile = new DefaultMappedFile(file.getPath(), mappedFileSize);
MappedFile mappedFile = new DefaultMappedFile(file.getPath(), mappedFileSize, writeWithoutMmap);

mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
Expand Down Expand Up @@ -356,7 +369,7 @@ protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFile
nextNextFilePath, this.mappedFileSize);
} else {
try {
mappedFile = new DefaultMappedFile(nextFilePath, this.mappedFileSize);
mappedFile = new DefaultMappedFile(nextFilePath, this.mappedFileSize, this.writeWithoutMmap);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public class MultiPathMappedFileQueue extends MappedFileQueue {
public MultiPathMappedFileQueue(MessageStoreConfig messageStoreConfig, int mappedFileSize,
AllocateMappedFileService allocateMappedFileService,
Supplier<Set<String>> fullStorePathsSupplier) {
super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize, allocateMappedFileService);
super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize, allocateMappedFileService,
messageStoreConfig.isWriteWithoutMmap());
this.config = messageStoreConfig;
this.fullStorePathsSupplier = fullStorePathsSupplier;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,13 @@ public class MessageStoreConfig {
private int transientStorePoolSize = 5;
private boolean fastFailIfNoBufferInStorePool = false;

/**
* When true, use RandomAccessFile for writing instead of MappedByteBuffer.
* This can be useful for certain scenarios where mmap is not desired.
*/
@ImportantField
private boolean writeWithoutMmap = false;

// DLedger message store config
private boolean enableDLegerCommitLog = false;
private String dLegerGroup;
Expand Down Expand Up @@ -1140,6 +1147,14 @@ public void setTransientStorePoolEnable(final boolean transientStorePoolEnable)
this.transientStorePoolEnable = transientStorePoolEnable;
}

public boolean isWriteWithoutMmap() {
return writeWithoutMmap;
}

public void setWriteWithoutMmap(final boolean writeWithoutMmap) {
this.writeWithoutMmap = writeWithoutMmap;
}

public int getTransientStorePoolSize() {
return transientStorePoolSize;
}
Expand Down
Loading
Loading