diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java index d9cd602a65c..a56fa461573 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java +++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java @@ -172,16 +172,17 @@ private boolean mmapOperation() { long beginTime = System.currentTimeMillis(); MappedFile mappedFile; + boolean writeWithoutMmap = messageStore.getMessageStoreConfig().isWriteWithoutMmap(); if (messageStore.isTransientStorePoolEnable()) { try { mappedFile = ServiceLoader.load(MappedFile.class).iterator().next(); mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool()); } catch (RuntimeException e) { log.warn("Use default implementation."); - mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool()); + mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool(), writeWithoutMmap); } } else { - mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize()); + mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize(), writeWithoutMmap); } long elapsedTime = UtilAll.computeElapsedTimeMilliseconds(beginTime); @@ -195,7 +196,9 @@ private boolean mmapOperation() { if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig() .getMappedFileSizeCommitLog() && - this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { + this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable() + && + !this.messageStore.getMessageStoreConfig().isWriteWithoutMmap()) { mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(), this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile()); } diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 2e3dbbadc3a..58fdd3507e9 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -118,7 +118,8 @@ public CommitLog(final DefaultMessageStore messageStore) { } else { this.mappedFileQueue = new MappedFileQueue(storePath, messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), - messageStore.getAllocateMappedFileService()); + messageStore.getAllocateMappedFileService(), + messageStore.getMessageStoreConfig().isWriteWithoutMmap()); } this.defaultMessageStore = messageStore; diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 2850299b7d6..02f90cef1df 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -98,7 +98,12 @@ public ConsumeQueue(final String topic, final int queueId, final String storePat + File.separator + topic + File.separator + queueId; - this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null); + boolean writeWithoutMmap = false; + if (messageStore.getMessageStoreConfig() != null) { + writeWithoutMmap = messageStore.getMessageStoreConfig().isWriteWithoutMmap(); + } + + this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null, writeWithoutMmap); this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE); @@ -108,7 +113,8 @@ public ConsumeQueue(final String topic, final int queueId, final String storePat queueId, StorePathConfigHelper.getStorePathConsumeQueueExt(messageStore.getMessageStoreConfig().getStorePathRootDir()), messageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueueExt(), - messageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt() + messageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt(), + writeWithoutMmap ); } } diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java index 3f266378df3..641f672bba6 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java @@ -90,6 +90,42 @@ public ConsumeQueueExt(final String topic, } } + /** + * Constructor with writeWithoutMmap support. + * + * @param topic topic + * @param queueId id of queue + * @param storePath root dir of files to store. + * @param mappedFileSize file size + * @param bitMapLength bit map length. + * @param writeWithoutMmap whether to use RandomAccessFile instead of MappedByteBuffer + */ + public ConsumeQueueExt(final String topic, + final int queueId, + final String storePath, + final int mappedFileSize, + final int bitMapLength, + final boolean writeWithoutMmap) { + + this.storePath = storePath; + this.mappedFileSize = mappedFileSize; + + this.topic = topic; + this.queueId = queueId; + + String queueDir = this.storePath + + File.separator + topic + + File.separator + queueId; + + this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null, writeWithoutMmap); + + if (bitMapLength > 0) { + this.tempContainer = ByteBuffer.allocate( + bitMapLength / Byte.SIZE + ); + } + } + public long getTotalSize() { return this.mappedFileQueue.getTotalFileSize(); } diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java index e32c16a82a8..320e8421549 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java @@ -53,6 +53,11 @@ public class MappedFileQueue implements Swappable { protected long committedWhere = 0; protected volatile long storeTimestamp = 0; + + /** + * Configuration flag to use RandomAccessFile instead of MappedByteBuffer for writing + */ + protected boolean writeWithoutMmap = false; public MappedFileQueue(final String storePath, int mappedFileSize, AllocateMappedFileService allocateMappedFileService) { @@ -61,6 +66,14 @@ public MappedFileQueue(final String storePath, int mappedFileSize, this.allocateMappedFileService = allocateMappedFileService; } + public MappedFileQueue(final String storePath, int mappedFileSize, + AllocateMappedFileService allocateMappedFileService, boolean writeWithoutMmap) { + this.storePath = storePath; + this.mappedFileSize = mappedFileSize; + this.allocateMappedFileService = allocateMappedFileService; + this.writeWithoutMmap = writeWithoutMmap; + } + public void checkSelf() { List mappedFiles = new ArrayList<>(this.mappedFiles); if (!mappedFiles.isEmpty()) { @@ -266,7 +279,7 @@ public boolean doLoad(List files) { } try { - MappedFile mappedFile = new DefaultMappedFile(file.getPath(), mappedFileSize); + MappedFile mappedFile = new DefaultMappedFile(file.getPath(), mappedFileSize, writeWithoutMmap); mappedFile.setWrotePosition(this.mappedFileSize); mappedFile.setFlushedPosition(this.mappedFileSize); @@ -356,7 +369,7 @@ protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFile nextNextFilePath, this.mappedFileSize); } else { try { - mappedFile = new DefaultMappedFile(nextFilePath, this.mappedFileSize); + mappedFile = new DefaultMappedFile(nextFilePath, this.mappedFileSize, this.writeWithoutMmap); } catch (IOException e) { log.error("create mappedFile exception", e); } diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java index 8ff050dfe3b..72ec8820a6d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java @@ -39,7 +39,8 @@ public class MultiPathMappedFileQueue extends MappedFileQueue { public MultiPathMappedFileQueue(MessageStoreConfig messageStoreConfig, int mappedFileSize, AllocateMappedFileService allocateMappedFileService, Supplier> fullStorePathsSupplier) { - super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize, allocateMappedFileService); + super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize, allocateMappedFileService, + messageStoreConfig.isWriteWithoutMmap()); this.config = messageStoreConfig; this.fullStorePathsSupplier = fullStorePathsSupplier; } diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 60f6a90381c..efd8a274c2d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -238,6 +238,13 @@ public class MessageStoreConfig { private int transientStorePoolSize = 5; private boolean fastFailIfNoBufferInStorePool = false; + /** + * When true, use RandomAccessFile for writing instead of MappedByteBuffer. + * This can be useful for certain scenarios where mmap is not desired. + */ + @ImportantField + private boolean writeWithoutMmap = false; + // DLedger message store config private boolean enableDLegerCommitLog = false; private String dLegerGroup; @@ -1140,6 +1147,14 @@ public void setTransientStorePoolEnable(final boolean transientStorePoolEnable) this.transientStorePoolEnable = transientStorePoolEnable; } + public boolean isWriteWithoutMmap() { + return writeWithoutMmap; + } + + public void setWriteWithoutMmap(final boolean writeWithoutMmap) { + this.writeWithoutMmap = writeWithoutMmap; + } + public int getTransientStorePoolSize() { return transientStorePoolSize; } diff --git a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java index c490d093a16..b2d89108b4b 100644 --- a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java @@ -33,9 +33,11 @@ import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.util.Iterator; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import org.apache.commons.lang3.SystemUtils; import org.apache.rocketmq.common.UtilAll; @@ -79,11 +81,19 @@ public class DefaultMappedFile extends AbstractMappedFile { protected volatile int flushedPosition; protected int fileSize; protected FileChannel fileChannel; + /** + * RandomAccessFile for writing when writeWithoutMmap is enabled + */ + protected RandomAccessFile randomAccessFile = null; /** * Message will put to here first, and then reput to FileChannel if writeBuffer is not null. */ protected ByteBuffer writeBuffer = null; protected TransientStorePool transientStorePool = null; + /** + * Configuration flag to use RandomAccessFile instead of MappedByteBuffer for writing + */ + protected boolean writeWithoutMmap = false; protected String fileName; protected long fileFromOffset; protected File file; @@ -108,6 +118,28 @@ public class DefaultMappedFile extends AbstractMappedFile { */ private long stopTimestamp = -1; + private static int maxSharedNum = 16; + private static final SharedByteBuffer[] SHARED_BYTE_BUFFER; + + static class SharedByteBuffer { + private final ReentrantLock lock; + private final ByteBuffer buffer; + + public SharedByteBuffer(int size) { + this.lock = new ReentrantLock(); + this.buffer = ByteBuffer.allocate(size); + } + + public void release() { + this.lock.unlock(); + } + + public ByteBuffer acquire() { + this.lock.lock(); + return buffer; + } + } + static { WROTE_POSITION_UPDATER = AtomicIntegerFieldUpdater.newUpdater(DefaultMappedFile.class, "wrotePosition"); COMMITTED_POSITION_UPDATER = AtomicIntegerFieldUpdater.newUpdater(DefaultMappedFile.class, "committedPosition"); @@ -124,6 +156,17 @@ public class DefaultMappedFile extends AbstractMappedFile { } } IS_LOADED_METHOD = isLoaded0method; + + SHARED_BYTE_BUFFER = new SharedByteBuffer[maxSharedNum]; + for (int i = 0; i < maxSharedNum; i++) { + SHARED_BYTE_BUFFER[i] = new SharedByteBuffer(4 * 1024 * 1024); + } + } + + private static SharedByteBuffer borrowSharedByteBuffer() { + int idx = ThreadLocalRandom.current().nextInt(maxSharedNum); + SharedByteBuffer buffer = SHARED_BYTE_BUFFER[idx]; + return buffer; } public DefaultMappedFile() { @@ -138,6 +181,18 @@ public DefaultMappedFile(final String fileName, final int fileSize, init(fileName, fileSize, transientStorePool); } + public DefaultMappedFile(final String fileName, final int fileSize, + final boolean writeWithoutMmap) throws IOException { + this.writeWithoutMmap = writeWithoutMmap; + init(fileName, fileSize); + } + + public DefaultMappedFile(final String fileName, final int fileSize, + final TransientStorePool transientStorePool, final boolean writeWithoutMmap) throws IOException { + this.writeWithoutMmap = writeWithoutMmap; + init(fileName, fileSize, transientStorePool); + } + public static int getTotalMappedFiles() { return TOTAL_MAPPED_FILES.get(); } @@ -150,8 +205,10 @@ public static long getTotalMappedVirtualMemory() { public void init(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException { init(fileName, fileSize); - this.writeBuffer = transientStorePool.borrowBuffer(); - this.transientStorePool = transientStorePool; + if (transientStorePool != null) { + this.writeBuffer = transientStorePool.borrowBuffer(); + this.transientStorePool = transientStorePool; + } } private void init(final String fileName, final int fileSize) throws IOException { @@ -165,7 +222,17 @@ private void init(final String fileName, final int fileSize) throws IOException try { this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); - this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize); + + if (writeWithoutMmap) { + // Use RandomAccessFile for writing instead of MappedByteBuffer + this.randomAccessFile = new RandomAccessFile(this.file, "rw"); + // Still create MappedByteBuffer for reading operations + this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_ONLY, 0, fileSize); + } else { + // Use MappedByteBuffer for both reading and writing (default behavior) + this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize); + } + TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize); TOTAL_MAPPED_FILES.incrementAndGet(); ok = true; @@ -179,6 +246,9 @@ private void init(final String fileName, final int fileSize) throws IOException if (!ok && this.fileChannel != null) { this.fileChannel.close(); } + if (!ok && this.randomAccessFile != null) { + this.randomAccessFile.close(); + } } } @@ -243,10 +313,35 @@ public AppendMessageResult appendMessage(final ByteBuffer byteBufferMsg, final C assert cb != null; int currentPos = WROTE_POSITION_UPDATER.get(this); + long fileFromOffset = this.getFileFromOffset(); + if (currentPos < this.fileSize) { - ByteBuffer byteBuffer = appendMessageBuffer().slice(); - byteBuffer.position(currentPos); - AppendMessageResult result = cb.doAppend(byteBuffer, this.fileFromOffset, this.fileSize - currentPos, byteBufferMsg); + SharedByteBuffer sharedByteBuffer = null; + ByteBuffer byteBuffer; + if (writeWithoutMmap && randomAccessFile != null) { + sharedByteBuffer = borrowSharedByteBuffer(); + byteBuffer = sharedByteBuffer.acquire(); + byteBuffer.position(0).limit(byteBuffer.capacity()); + fileFromOffset += currentPos; + } else { + byteBuffer = appendMessageBuffer().slice(); + byteBuffer.position(currentPos); + } + + AppendMessageResult result = cb.doAppend(byteBuffer, fileFromOffset, this.fileSize - currentPos, byteBufferMsg); + + if (sharedByteBuffer != null) { + try { + randomAccessFile.seek(currentPos); + randomAccessFile.write(byteBuffer.array(), 0, result.getWroteBytes()); + } catch (Throwable t) { + log.error("Failed to write to mappedFile {}", this.fileName, t); + return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); + } finally { + sharedByteBuffer.release(); + } + } + WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes()); this.storeTimestamp = result.getStoreTimestamp(); return result; @@ -273,22 +368,46 @@ public AppendMessageResult appendMessagesInner(final MessageExt messageExt, fina assert cb != null; int currentPos = WROTE_POSITION_UPDATER.get(this); + long fileFromOffset = this.getFileFromOffset(); if (currentPos < this.fileSize) { - ByteBuffer byteBuffer = appendMessageBuffer().slice(); - byteBuffer.position(currentPos); + SharedByteBuffer sharedByteBuffer = null; + ByteBuffer byteBuffer; + if (writeWithoutMmap && randomAccessFile != null) { + sharedByteBuffer = borrowSharedByteBuffer(); + byteBuffer = sharedByteBuffer.acquire(); + byteBuffer.position(0).limit(byteBuffer.capacity()); + fileFromOffset += currentPos; + } else { + byteBuffer = appendMessageBuffer().slice(); + byteBuffer.position(currentPos); + } + AppendMessageResult result; if (messageExt instanceof MessageExtBatch && !((MessageExtBatch) messageExt).isInnerBatch()) { // traditional batch message - result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, + result = cb.doAppend(fileFromOffset, byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt, putMessageContext); } else if (messageExt instanceof MessageExtBrokerInner) { // traditional single message or newly introduced inner-batch message - result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, + result = cb.doAppend(fileFromOffset, byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt, putMessageContext); } else { return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } + + if (sharedByteBuffer != null) { + try { + randomAccessFile.seek(currentPos); + randomAccessFile.write(byteBuffer.array(), 0, result.getWroteBytes()); + } catch (Throwable t) { + log.error("Failed to write to mappedFile {}", this.fileName, t); + return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); + } finally { + sharedByteBuffer.release(); + } + } + WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes()); this.storeTimestamp = result.getStoreTimestamp(); return result; @@ -319,15 +438,25 @@ public boolean appendMessage(ByteBuffer data) { if ((currentPos + remaining) <= this.fileSize) { try { - this.fileChannel.position(currentPos); - while (data.hasRemaining()) { - this.fileChannel.write(data); + if (writeWithoutMmap && randomAccessFile != null) { + // Use RandomAccessFile for writing + randomAccessFile.seek(currentPos); + byte[] buffer = new byte[remaining]; + data.get(buffer); + randomAccessFile.write(buffer); + } else { + // Use FileChannel for writing (default behavior) + this.fileChannel.position(currentPos); + while (data.hasRemaining()) { + this.fileChannel.write(data); + } } + WROTE_POSITION_UPDATER.addAndGet(this, remaining); + return true; } catch (Throwable e) { log.error("Error occurred when append message to mappedFile.", e); + return false; } - WROTE_POSITION_UPDATER.addAndGet(this, remaining); - return true; } return false; } @@ -344,14 +473,22 @@ public boolean appendMessage(final byte[] data, final int offset, final int leng if ((currentPos + length) <= this.fileSize) { try { - ByteBuffer buf = this.mappedByteBuffer.slice(); - buf.position(currentPos); - buf.put(data, offset, length); + if (writeWithoutMmap && randomAccessFile != null) { + // Use RandomAccessFile for writing + randomAccessFile.seek(currentPos); + randomAccessFile.write(data, offset, length); + } else { + // Use MappedByteBuffer for writing (default behavior) + ByteBuffer buf = this.mappedByteBuffer.slice(); + buf.position(currentPos); + buf.put(data, offset, length); + } + WROTE_POSITION_UPDATER.addAndGet(this, length); + return true; } catch (Throwable e) { log.error("Error occurred when append message to mappedFile.", e); + return false; } - WROTE_POSITION_UPDATER.addAndGet(this, length); - return true; } return false; @@ -365,11 +502,12 @@ public boolean appendMessageUsingFileChannel(byte[] data) { try { this.fileChannel.position(currentPos); this.fileChannel.write(ByteBuffer.wrap(data, 0, data.length)); + WROTE_POSITION_UPDATER.addAndGet(this, data.length); + return true; } catch (Throwable e) { log.error("Error occurred when append message to mappedFile.", e); + return false; } - WROTE_POSITION_UPDATER.addAndGet(this, data.length); - return true; } return false; @@ -387,11 +525,16 @@ public int flush(final int flushLeastPages) { try { this.mappedByteBufferAccessCountSinceLastSwap++; - //We only append data to fileChannel or mappedByteBuffer, never both. - if (writeBuffer != null || this.fileChannel.position() != 0) { - this.fileChannel.force(false); + if (writeWithoutMmap && randomAccessFile != null) { + // Use RandomAccessFile for flushing + randomAccessFile.getChannel().force(false); } else { - this.mappedByteBuffer.force(); + //We only append data to fileChannel or mappedByteBuffer, never both. + if (writeBuffer != null || this.fileChannel.position() != 0) { + this.fileChannel.force(false); + } else { + this.mappedByteBuffer.force(); + } } this.lastFlushTime = System.currentTimeMillis(); } catch (Throwable e) { @@ -574,6 +717,11 @@ public boolean destroy(final long intervalForcibly) { this.fileChannel.close(); log.info("close file channel " + this.fileName + " OK"); + if (this.randomAccessFile != null) { + this.randomAccessFile.close(); + log.info("close random access file " + this.fileName + " OK"); + } + long beginTime = System.currentTimeMillis(); boolean result = this.file.delete(); log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java index 21181faf3b1..3f1dc237d6b 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java @@ -105,12 +105,19 @@ public BatchConsumeQueue( this.topic = topic; this.queueId = queueId; + boolean writeWithoutMmap = false; + if (messageStore.getMessageStoreConfig() != null) { + writeWithoutMmap = messageStore.getMessageStoreConfig().isWriteWithoutMmap(); + } + if (StringUtils.isBlank(subfolder)) { String queueDir = this.storePath + File.separator + topic + File.separator + queueId; - this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null); + this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null, + writeWithoutMmap); } else { String queueDir = this.storePath + File.separator + topic + File.separator + queueId + File.separator + subfolder; - this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null); + this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null, + writeWithoutMmap); } this.byteBufferItem = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE); diff --git a/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileConcurrencyTest.java b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileConcurrencyTest.java new file mode 100644 index 00000000000..06f94727d6f --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileConcurrencyTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store.logfile; + +import java.io.File; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.rocketmq.common.UtilAll; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class DefaultMappedFileConcurrencyTest { + + private String storePath; + private String fileName; + private int fileSize = 1024 * 1024; // 1MB + private static final int THREAD_COUNT = 10; + private static final int OPERATIONS_PER_THREAD = 100; + + @Before + public void setUp() throws Exception { + storePath = System.getProperty("user.home") + File.separator + "unitteststore" + System.currentTimeMillis(); + fileName = storePath + File.separator + "00000000000000000000"; + UtilAll.ensureDirOK(storePath); + } + + @After + public void tearDown() throws Exception { + UtilAll.deleteFile(new File(storePath)); + } + + @Test + public void testConcurrentWriteWithoutMmap() throws Exception { + DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, fileSize, true); + + try { + ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); + CountDownLatch latch = new CountDownLatch(THREAD_COUNT); + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger errorCount = new AtomicInteger(0); + + for (int i = 0; i < THREAD_COUNT; i++) { + final int threadId = i; + executor.submit(() -> { + try { + for (int j = 0; j < OPERATIONS_PER_THREAD; j++) { + String data = String.format("Thread-%d-Operation-%d", threadId, j); + byte[] bytes = data.getBytes(); + + boolean result = mappedFile.appendMessage(bytes); + if (result) { + successCount.incrementAndGet(); + } else { + errorCount.incrementAndGet(); + } + } + } catch (Exception e) { + errorCount.incrementAndGet(); + e.printStackTrace(); + } finally { + latch.countDown(); + } + }); + } + + latch.await(); + executor.shutdown(); + + // Success count: successCount.get() + // Error count: errorCount.get() + // Final wrote position: mappedFile.getWrotePosition() + + // All operations should succeed + Assert.assertEquals("All write operations should succeed", + THREAD_COUNT * OPERATIONS_PER_THREAD, successCount.get()); + Assert.assertEquals("No errors should occur", 0, errorCount.get()); + + } finally { + mappedFile.destroy(0); + } + } + + @Test + public void testConcurrentWriteWithMmap() throws Exception { + DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, fileSize, false); + + try { + ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); + CountDownLatch latch = new CountDownLatch(THREAD_COUNT); + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger errorCount = new AtomicInteger(0); + + for (int i = 0; i < THREAD_COUNT; i++) { + final int threadId = i; + executor.submit(() -> { + try { + for (int j = 0; j < OPERATIONS_PER_THREAD; j++) { + String data = String.format("Thread-%d-Operation-%d", threadId, j); + byte[] bytes = data.getBytes(); + + boolean result = mappedFile.appendMessage(bytes); + if (result) { + successCount.incrementAndGet(); + } else { + errorCount.incrementAndGet(); + } + } + } catch (Exception e) { + errorCount.incrementAndGet(); + e.printStackTrace(); + } finally { + latch.countDown(); + } + }); + } + + latch.await(); + executor.shutdown(); + + // Success count: successCount.get() + // Error count: errorCount.get() + // Final wrote position: mappedFile.getWrotePosition() + + // All operations should succeed + Assert.assertEquals("All write operations should succeed", + THREAD_COUNT * OPERATIONS_PER_THREAD, successCount.get()); + Assert.assertEquals("No errors should occur", 0, errorCount.get()); + + } finally { + mappedFile.destroy(0); + } + } + + @Test + public void testConcurrentFlush() throws Exception { + DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, fileSize, true); + + try { + // Write some data first + for (int i = 0; i < 100; i++) { + String data = "Test data " + i; + mappedFile.appendMessage(data.getBytes()); + } + + ExecutorService executor = Executors.newFixedThreadPool(5); + CountDownLatch latch = new CountDownLatch(5); + AtomicInteger flushCount = new AtomicInteger(0); + + for (int i = 0; i < 5; i++) { + executor.submit(() -> { + try { + int flushed = mappedFile.flush(0); + if (flushed > 0) { + flushCount.incrementAndGet(); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + latch.countDown(); + } + }); + } + + latch.await(); + executor.shutdown(); + + Assert.assertTrue("At least one flush should succeed", flushCount.get() > 0); + + } finally { + mappedFile.destroy(0); + } + } +} diff --git a/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileErrorHandlingTest.java b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileErrorHandlingTest.java new file mode 100644 index 00000000000..649e8071cc6 --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileErrorHandlingTest.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store.logfile; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.MessageExtBrokerInner; +import org.apache.rocketmq.common.message.MessageExtBatch; +import org.apache.rocketmq.store.AppendMessageCallback; +import org.apache.rocketmq.store.AppendMessageResult; +import org.apache.rocketmq.store.AppendMessageStatus; +import org.apache.rocketmq.store.CompactionAppendMsgCallback; +import org.apache.rocketmq.store.PutMessageContext; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class DefaultMappedFileErrorHandlingTest { + + private String storePath; + private String fileName; + private int fileSize = 1024 * 1024; // 1MB + + @Before + public void setUp() throws Exception { + storePath = System.getProperty("user.home") + File.separator + "unitteststore" + System.currentTimeMillis(); + fileName = storePath + File.separator + "00000000000000000000"; + UtilAll.ensureDirOK(storePath); + } + + @After + public void tearDown() throws Exception { + UtilAll.deleteFile(new File(storePath)); + } + + @Test + public void testAppendMessageCallbackErrorHandling() throws IOException { + DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, fileSize, true); + + try { + // Test with a callback that returns an error + AppendMessageCallback errorCallback = new AppendMessageCallback() { + @Override + public AppendMessageResult doAppend(long fileFromOffset, ByteBuffer byteBuffer, + int maxBlank, MessageExtBrokerInner msg, + PutMessageContext putMessageContext) { + return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); + } + + @Override + public AppendMessageResult doAppend(long fileFromOffset, ByteBuffer byteBuffer, + int maxBlank, MessageExtBatch messageExtBatch, + PutMessageContext putMessageContext) { + return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); + } + }; + + // Create a mock message + MessageExtBrokerInner msg = new MessageExtBrokerInner(); + msg.setBody("test message".getBytes()); + + AppendMessageResult result = mappedFile.appendMessage(msg, errorCallback, new PutMessageContext("test-topic")); + + Assert.assertEquals("Should return error status", + AppendMessageStatus.UNKNOWN_ERROR, result.getStatus()); + + } finally { + mappedFile.destroy(0); + } + } + + @Test + public void testCompactionAppendMsgCallbackErrorHandling() throws IOException { + DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, fileSize, true); + + try { + // Test with a callback that returns an error + CompactionAppendMsgCallback errorCallback = new CompactionAppendMsgCallback() { + @Override + public AppendMessageResult doAppend(ByteBuffer bbDest, long fileFromOffset, + int maxBlank, ByteBuffer bbSrc) { + return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); + } + }; + + ByteBuffer testBuffer = ByteBuffer.wrap("test data".getBytes()); + AppendMessageResult result = mappedFile.appendMessage(testBuffer, errorCallback); + + Assert.assertEquals("Should return error status", + AppendMessageStatus.UNKNOWN_ERROR, result.getStatus()); + + } finally { + mappedFile.destroy(0); + } + } + + @Test + public void testWriteWithoutMmapWithNullRandomAccessFile() throws IOException { + DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, fileSize, true); + + try { + // Simulate the case where randomAccessFile is null + // This should fall back to normal behavior + byte[] testData = "test data".getBytes(); + boolean result = mappedFile.appendMessage(testData); + + // Should still work, but using MappedByteBuffer + Assert.assertTrue("Should still work with null RandomAccessFile", result); + + } finally { + mappedFile.destroy(0); + } + } + + @Test + public void testLargeDataWrite() throws IOException { + DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, fileSize, true); + + try { + // Test writing data that's close to the file size limit + byte[] largeData = new byte[fileSize - 100]; // Leave some space + for (int i = 0; i < largeData.length; i++) { + largeData[i] = (byte) (i % 256); + } + + boolean result = mappedFile.appendMessage(largeData); + Assert.assertTrue("Should successfully write large data", result); + Assert.assertEquals("Wrote position should match data size", + largeData.length, mappedFile.getWrotePosition()); + + } finally { + mappedFile.destroy(0); + } + } + + @Test + public void testWriteBeyondFileSize() throws IOException { + DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, fileSize, true); + + try { + // Fill the file almost completely + byte[] data = new byte[fileSize - 10]; + boolean result = mappedFile.appendMessage(data); + Assert.assertTrue("Should successfully write data", result); + + // Try to write more data than remaining space + byte[] overflowData = new byte[20]; // More than remaining 10 bytes + result = mappedFile.appendMessage(overflowData); + Assert.assertFalse("Should fail to write beyond file size", result); + + } finally { + mappedFile.destroy(0); + } + } + + @Test + public void testFlushErrorHandling() throws IOException { + DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, fileSize, true); + + try { + // Write some data + byte[] testData = "test data for flush".getBytes(); + mappedFile.appendMessage(testData); + + // Flush should succeed + int flushedPosition = mappedFile.flush(0); + Assert.assertTrue("Flush should succeed", flushedPosition > 0); + + } finally { + mappedFile.destroy(0); + } + } + + @Test + public void testAppendMessageWithOffset() throws IOException { + DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, fileSize, true); + + try { + byte[] testData = "Hello, RocketMQ!".getBytes(); + + // Test with valid offset + boolean result = mappedFile.appendMessage(testData, 0, testData.length); + Assert.assertTrue("Should successfully append with valid offset", result); + + // Test with invalid offset (beyond array length) + result = mappedFile.appendMessage(testData, testData.length + 1, 1); + Assert.assertFalse("Should fail with invalid offset", result); + + } finally { + mappedFile.destroy(0); + } + } +} diff --git a/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFilePerformanceTest.java b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFilePerformanceTest.java new file mode 100644 index 00000000000..b958487add4 --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFilePerformanceTest.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store.logfile; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.rocketmq.common.UtilAll; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class DefaultMappedFilePerformanceTest { + + private String storePath; + private String fileName; + private int fileSize = 10 * 1024 * 1024; // 10MB + private static final int WRITE_COUNT = 10000; + private static final int DATA_SIZE = 1024; // 1KB per write + + @Before + public void setUp() throws Exception { + storePath = System.getProperty("user.home") + File.separator + "unitteststore" + System.currentTimeMillis(); + fileName = storePath + File.separator + "00000000000000000000"; + UtilAll.ensureDirOK(storePath); + } + + @After + public void tearDown() throws Exception { + UtilAll.deleteFile(new File(storePath)); + } + + @Test + public void testWriteWithoutMmapPerformance() throws IOException { + DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, fileSize, true); + + try { + byte[] testData = new byte[DATA_SIZE]; + for (int i = 0; i < testData.length; i++) { + testData[i] = (byte) (i % 256); + } + + long startTime = System.currentTimeMillis(); + + for (int i = 0; i < WRITE_COUNT; i++) { + boolean result = mappedFile.appendMessage(testData); + Assert.assertTrue("Write should succeed", result); + } + + long endTime = System.currentTimeMillis(); + long duration = endTime - startTime; + + // WriteWithoutMmap Performance: + // Writes: WRITE_COUNT + // Data size per write: DATA_SIZE bytes + // Total data: (WRITE_COUNT * DATA_SIZE / 1024) KB + // Duration: duration ms + // Throughput: (WRITE_COUNT * DATA_SIZE / 1024.0 / duration * 1000) KB/s + + Assert.assertEquals("Wrote position should match expected", + WRITE_COUNT * DATA_SIZE, mappedFile.getWrotePosition()); + + } finally { + mappedFile.destroy(0); + } + } + + @Test + public void testWriteWithMmapPerformance() throws IOException { + DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, fileSize, false); + + try { + byte[] testData = new byte[DATA_SIZE]; + for (int i = 0; i < testData.length; i++) { + testData[i] = (byte) (i % 256); + } + + long startTime = System.currentTimeMillis(); + + for (int i = 0; i < WRITE_COUNT; i++) { + boolean result = mappedFile.appendMessage(testData); + Assert.assertTrue("Write should succeed", result); + } + + long endTime = System.currentTimeMillis(); + long duration = endTime - startTime; + + // WriteWithMmap Performance: + // Writes: WRITE_COUNT + // Data size per write: DATA_SIZE bytes + // Total data: (WRITE_COUNT * DATA_SIZE / 1024) KB + // Duration: duration ms + // Throughput: (WRITE_COUNT * DATA_SIZE / 1024.0 / duration * 1000) KB/s + + Assert.assertEquals("Wrote position should match expected", + WRITE_COUNT * DATA_SIZE, mappedFile.getWrotePosition()); + + } finally { + mappedFile.destroy(0); + } + } + + @Test + public void testFlushPerformance() throws IOException { + DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, fileSize, true); + + try { + // Write some data first + byte[] testData = new byte[DATA_SIZE]; + for (int i = 0; i < testData.length; i++) { + testData[i] = (byte) (i % 256); + } + + for (int i = 0; i < 1000; i++) { + mappedFile.appendMessage(testData); + } + + long startTime = System.currentTimeMillis(); + + int flushedPosition = mappedFile.flush(0); + + long endTime = System.currentTimeMillis(); + long duration = endTime - startTime; + + // Flush Performance: + // Flushed position: flushedPosition + // Duration: duration ms + + Assert.assertTrue("Flush should succeed", flushedPosition > 0); + + } finally { + mappedFile.destroy(0); + } + } + + @Test + public void testByteBufferWritePerformance() throws IOException { + DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, fileSize, true); + + try { + ByteBuffer testBuffer = ByteBuffer.allocate(DATA_SIZE); + for (int i = 0; i < DATA_SIZE; i++) { + testBuffer.put((byte) (i % 256)); + } + + long startTime = System.currentTimeMillis(); + + for (int i = 0; i < WRITE_COUNT; i++) { + testBuffer.rewind(); + boolean result = mappedFile.appendMessage(testBuffer); + Assert.assertTrue("Write should succeed", result); + } + + long endTime = System.currentTimeMillis(); + long duration = endTime - startTime; + + // ByteBuffer Write Performance: + // Writes: WRITE_COUNT + // Data size per write: DATA_SIZE bytes + // Total data: (WRITE_COUNT * DATA_SIZE / 1024) KB + // Duration: duration ms + // Throughput: (WRITE_COUNT * DATA_SIZE / 1024.0 / duration * 1000) KB/s + + Assert.assertEquals("Wrote position should match expected", + WRITE_COUNT * DATA_SIZE, mappedFile.getWrotePosition()); + + } finally { + mappedFile.destroy(0); + } + } + + @Test + public void testMixedWriteOperations() throws IOException { + DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, fileSize, true); + + try { + byte[] testData = new byte[DATA_SIZE]; + for (int i = 0; i < testData.length; i++) { + testData[i] = (byte) (i % 256); + } + + long startTime = System.currentTimeMillis(); + + // Mix of different write operations + for (int i = 0; i < WRITE_COUNT / 4; i++) { + // appendMessage(byte[]) + boolean result1 = mappedFile.appendMessage(testData); + Assert.assertTrue("Write should succeed", result1); + + // appendMessage(byte[], offset, length) + boolean result2 = mappedFile.appendMessage(testData, 0, testData.length); + Assert.assertTrue("Write should succeed", result2); + + // appendMessage(ByteBuffer) + ByteBuffer buffer = ByteBuffer.wrap(testData); + boolean result3 = mappedFile.appendMessage(buffer); + Assert.assertTrue("Write should succeed", result3); + + // appendMessageUsingFileChannel(byte[]) + boolean result4 = mappedFile.appendMessageUsingFileChannel(testData); + Assert.assertTrue("Write should succeed", result4); + } + + long endTime = System.currentTimeMillis(); + long duration = endTime - startTime; + + // Mixed Write Operations Performance: + // Total operations: WRITE_COUNT + // Data size per operation: DATA_SIZE bytes + // Total data: (WRITE_COUNT * DATA_SIZE / 1024) KB + // Duration: duration ms + // Throughput: (WRITE_COUNT * DATA_SIZE / 1024.0 / duration * 1000) KB/s + + Assert.assertEquals("Wrote position should match expected", + WRITE_COUNT * DATA_SIZE, mappedFile.getWrotePosition()); + + } finally { + mappedFile.destroy(0); + } + } +} diff --git a/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileWriteWithoutMmapTest.java b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileWriteWithoutMmapTest.java new file mode 100644 index 00000000000..79bca016e4a --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileWriteWithoutMmapTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store.logfile; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.store.TransientStorePool; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class DefaultMappedFileWriteWithoutMmapTest { + + private String storePath; + private String fileName; + private int fileSize = 1024 * 1024; // 1MB + + @Before + public void setUp() throws Exception { + storePath = System.getProperty("user.home") + File.separator + "unitteststore" + System.currentTimeMillis(); + fileName = storePath + File.separator + "00000000000000000000"; + UtilAll.ensureDirOK(storePath); + } + + @After + public void tearDown() throws Exception { + UtilAll.deleteFile(new File(storePath)); + } + + @Test + public void testWriteWithoutMmapEnabled() throws IOException { + // Test with writeWithoutMmap = true + DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, fileSize, true); + + try { + // Test appendMessage with byte array + byte[] testData = "Hello, RocketMQ!".getBytes(); + boolean result = mappedFile.appendMessage(testData); + Assert.assertTrue("Should successfully append message", result); + Assert.assertEquals("Wrote position should be updated", testData.length, mappedFile.getWrotePosition()); + + // Test appendMessage with ByteBuffer + ByteBuffer buffer = ByteBuffer.wrap("Test ByteBuffer".getBytes()); + result = mappedFile.appendMessage(buffer); + Assert.assertTrue("Should successfully append ByteBuffer", result); + Assert.assertEquals("Wrote position should be updated", testData.length + "Test ByteBuffer".length(), mappedFile.getWrotePosition()); + + // Test flush + int flushedPosition = mappedFile.flush(0); + Assert.assertTrue("Flush should succeed", flushedPosition > 0); + + } finally { + mappedFile.destroy(0); + } + } + + @Test + public void testWriteWithoutMmapDisabled() throws IOException { + // Test with writeWithoutMmap = false (default behavior) + DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, fileSize, false); + + try { + // Test appendMessage with byte array + byte[] testData = "Hello, RocketMQ!".getBytes(); + boolean result = mappedFile.appendMessage(testData); + Assert.assertTrue("Should successfully append message", result); + Assert.assertEquals("Wrote position should be updated", testData.length, mappedFile.getWrotePosition()); + + // Test appendMessage with ByteBuffer + ByteBuffer buffer = ByteBuffer.wrap("Test ByteBuffer".getBytes()); + result = mappedFile.appendMessage(buffer); + Assert.assertTrue("Should successfully append ByteBuffer", result); + Assert.assertEquals("Wrote position should be updated", testData.length + "Test ByteBuffer".length(), mappedFile.getWrotePosition()); + + // Test flush + int flushedPosition = mappedFile.flush(0); + Assert.assertTrue("Flush should succeed", flushedPosition > 0); + + } finally { + mappedFile.destroy(0); + } + } + + @Test + public void testWriteWithoutMmapWithTransientStorePool() throws IOException { + // Test with writeWithoutMmap = true and TransientStorePool + TransientStorePool transientStorePool = new TransientStorePool(5, fileSize); + DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, fileSize, transientStorePool, true); + + try { + // Test appendMessage with byte array + byte[] testData = "Hello, RocketMQ with TransientStorePool!".getBytes(); + boolean result = mappedFile.appendMessage(testData); + Assert.assertTrue("Should successfully append message", result); + Assert.assertEquals("Wrote position should be updated", testData.length, mappedFile.getWrotePosition()); + + } finally { + mappedFile.destroy(0); + } + } + + @Test + public void testAppendMessageWithOffset() throws IOException { + DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, fileSize, true); + + try { + byte[] testData = "Hello, RocketMQ with offset!".getBytes(); + boolean result = mappedFile.appendMessage(testData, 0, testData.length); + Assert.assertTrue("Should successfully append message with offset", result); + Assert.assertEquals("Wrote position should be updated", testData.length, mappedFile.getWrotePosition()); + + } finally { + mappedFile.destroy(0); + } + } + + @Test + public void testAppendMessageUsingFileChannel() throws IOException { + DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, fileSize, true); + + try { + byte[] testData = "Hello, RocketMQ using FileChannel!".getBytes(); + boolean result = mappedFile.appendMessageUsingFileChannel(testData); + Assert.assertTrue("Should successfully append message using FileChannel", result); + Assert.assertEquals("Wrote position should be updated", testData.length, mappedFile.getWrotePosition()); + + } finally { + mappedFile.destroy(0); + } + } +}