Skip to content

Commit 2017630

Browse files
RongtongJinRongtongJin
andauthored
[ISSUE #9834] Support writeWithoutMmap in IndexStoreFile (#9835)
* Use FileChannel for writing when writeWithoutMmap is enabled in IndexStoreFile Change-Id: I23ac2a4dfb8286cd8c3e51aeeb2d54d91136bc03 Co-developed-by: Cursor <noreply@cursor.com> * Use FileChannel for writing when writeWithoutMmap is enabled in IndexStoreFile Change-Id: I6a541d13c81a16f39b88ac91ce770717f60d64ff Co-developed-by: Cursor <noreply@cursor.com> * Remove unnecessary rewind() call in IndexStoreFile when using FileChannel Change-Id: Id6deca99d34736761fd1d937f5eb5e75506ba1cc Co-developed-by: Cursor <noreply@cursor.com> * Add parameterized test for writeWithoutMmap in IndexStoreFileTest Change-Id: I1ce92e4f728a018ff7bd3160995f395b480345ea Co-developed-by: Cursor <noreply@cursor.com> * Fix compactToNewFile to read data from file when using FileChannel Change-Id: I9058b08f1bbcf646b5e7913260a5eefe853b9b16 Co-developed-by: Cursor <noreply@cursor.com> * Fix newBuffer position and limit in compactToNewFile Change-Id: I1a0100b6159d636a8db1124e3de416b3a2e62bc5 Co-developed-by: Cursor <noreply@cursor.com> * Update test parameter order in IndexStoreFileTest Change-Id: Ie01656574c1866badf726ead58a8ad2ca3d01cf7 Co-developed-by: Cursor <noreply@cursor.com> --------- Co-authored-by: RongtongJin <user@example.com>
1 parent e60b67a commit 2017630

4 files changed

Lines changed: 116 additions & 19 deletions

File tree

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ public boolean check(TieredStorageLevel targetLevel) {
129129
private String objectStoreBucket = "";
130130
private String objectStoreAccessKey = "";
131131
private String objectStoreSecretKey = "";
132+
private boolean writeWithoutMmap = false;
132133

133134
public static String localHostName() {
134135
try {
@@ -418,4 +419,12 @@ public void setObjectStoreSecretKey(String objectStoreSecretKey) {
418419
public String getObjectStoreEndpoint() {
419420
return objectStoreEndpoint;
420421
}
422+
423+
public boolean isWriteWithoutMmap() {
424+
return writeWithoutMmap;
425+
}
426+
427+
public void setWriteWithoutMmap(boolean writeWithoutMmap) {
428+
this.writeWithoutMmap = writeWithoutMmap;
429+
}
421430
}

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public TieredMessageStore(MessageStorePluginContext context, MessageStore next)
8383
this.storeConfig = new MessageStoreConfig();
8484
this.context = context;
8585
this.context.registerConfiguration(this.storeConfig);
86+
this.storeConfig.setWriteWithoutMmap(context.getMessageStoreConfig().isWriteWithoutMmap());
8687
this.brokerName = this.storeConfig.getBrokerName();
8788
this.defaultStore = next;
8889

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java

Lines changed: 88 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.IOException;
2121
import java.nio.ByteBuffer;
2222
import java.nio.MappedByteBuffer;
23+
import java.nio.channels.FileChannel;
2324
import java.nio.file.Paths;
2425
import java.util.ArrayList;
2526
import java.util.Collections;
@@ -90,20 +91,26 @@ public class IndexStoreFile implements IndexFile {
9091
private final AtomicInteger hashSlotCount = new AtomicInteger(0);
9192
private final AtomicInteger indexItemCount = new AtomicInteger(0);
9293

94+
private final boolean writeWithoutMmap;
9395
private MappedFile mappedFile;
9496
private ByteBuffer byteBuffer;
9597
private MappedFile compactMappedFile;
9698
private FileSegment fileSegment;
9799

100+
private FileChannel fileChannel;
101+
98102
public IndexStoreFile(MessageStoreConfig storeConfig, long timestamp) throws IOException {
103+
this.writeWithoutMmap = storeConfig.isWriteWithoutMmap();
99104
this.hashSlotMaxCount = storeConfig.getTieredStoreIndexFileMaxHashSlotNum();
100105
this.indexItemMaxCount = storeConfig.getTieredStoreIndexFileMaxIndexNum();
101106
this.fileStatus = new AtomicReference<>(UNSEALED);
102107
this.fileReadWriteLock = new ReentrantReadWriteLock();
103108
this.mappedFile = new DefaultMappedFile(
104109
Paths.get(storeConfig.getStorePathRootDir(), FILE_DIRECTORY_NAME, String.valueOf(timestamp)).toString(),
105-
this.getItemPosition(indexItemMaxCount));
110+
this.getItemPosition(indexItemMaxCount),
111+
this.writeWithoutMmap);
106112
this.byteBuffer = this.mappedFile.getMappedByteBuffer();
113+
this.fileChannel = this.mappedFile.getFileChannel();
107114

108115
this.beginTimestamp.set(timestamp);
109116
this.endTimestamp.set(byteBuffer.getLong(INDEX_BEGIN_TIME_STAMP));
@@ -113,6 +120,7 @@ public IndexStoreFile(MessageStoreConfig storeConfig, long timestamp) throws IOE
113120
}
114121

115122
public IndexStoreFile(MessageStoreConfig storeConfig, FileSegment fileSegment) {
123+
this.writeWithoutMmap = storeConfig.isWriteWithoutMmap();
116124
this.fileSegment = fileSegment;
117125
this.fileStatus = new AtomicReference<>(UPLOAD);
118126
this.fileReadWriteLock = new ReentrantReadWriteLock();
@@ -157,12 +165,31 @@ protected int hashCode(String keyStr) {
157165
return (keyHash < 0) ? -keyHash : keyHash;
158166
}
159167

160-
protected void flushNewMetadata(ByteBuffer byteBuffer, boolean end) {
161-
byteBuffer.putInt(INDEX_MAGIC_CODE, !end ? BEGIN_MAGIC_CODE : END_MAGIC_CODE);
162-
byteBuffer.putLong(INDEX_BEGIN_TIME_STAMP, this.beginTimestamp.get());
163-
byteBuffer.putLong(INDEX_END_TIME_STAMP, this.endTimestamp.get());
164-
byteBuffer.putInt(INDEX_SLOT_COUNT, this.hashSlotCount.get());
165-
byteBuffer.putInt(INDEX_ITEM_INDEX, this.indexItemCount.get());
168+
protected void flushNewMetadata(ByteBuffer byteBuffer, boolean end) throws IOException {
169+
flushNewMetadata(byteBuffer, end, null);
170+
}
171+
172+
protected void flushNewMetadata(ByteBuffer byteBuffer, boolean end, FileChannel channel) throws IOException {
173+
FileChannel targetChannel = channel != null ? channel : fileChannel;
174+
if (writeWithoutMmap && targetChannel != null) {
175+
// Use FileChannel for writing
176+
ByteBuffer writeBuffer = ByteBuffer.allocate(INDEX_HEADER_SIZE);
177+
writeBuffer.putInt(!end ? BEGIN_MAGIC_CODE : END_MAGIC_CODE);
178+
writeBuffer.putLong(this.beginTimestamp.get());
179+
writeBuffer.putLong(this.endTimestamp.get());
180+
writeBuffer.putInt(this.hashSlotCount.get());
181+
writeBuffer.putInt(this.indexItemCount.get());
182+
writeBuffer.flip();
183+
targetChannel.position(INDEX_MAGIC_CODE);
184+
targetChannel.write(writeBuffer);
185+
} else {
186+
// Use ByteBuffer for writing
187+
byteBuffer.putInt(INDEX_MAGIC_CODE, !end ? BEGIN_MAGIC_CODE : END_MAGIC_CODE);
188+
byteBuffer.putLong(INDEX_BEGIN_TIME_STAMP, this.beginTimestamp.get());
189+
byteBuffer.putLong(INDEX_END_TIME_STAMP, this.endTimestamp.get());
190+
byteBuffer.putInt(INDEX_SLOT_COUNT, this.hashSlotCount.get());
191+
byteBuffer.putInt(INDEX_ITEM_INDEX, this.indexItemCount.get());
192+
}
166193
}
167194

168195
protected int getSlotPosition(int slotIndex) {
@@ -215,9 +242,26 @@ public AppendResult putKey(
215242
IndexItem indexItem = new IndexItem(
216243
topicId, queueId, offset, size, hashCode, timeDiff, slotOldValue);
217244
int itemIndex = this.indexItemCount.incrementAndGet();
218-
this.byteBuffer.position(this.getItemPosition(itemIndex));
219-
this.byteBuffer.put(indexItem.getByteBuffer());
220-
this.byteBuffer.putInt(slotPosition, itemIndex);
245+
int itemPosition = this.getItemPosition(itemIndex);
246+
247+
if (writeWithoutMmap && fileChannel != null) {
248+
// Use FileChannel for writing
249+
ByteBuffer itemBuffer = indexItem.getByteBuffer();
250+
fileChannel.position(itemPosition);
251+
fileChannel.write(itemBuffer);
252+
253+
ByteBuffer slotBuffer = ByteBuffer.allocate(Integer.BYTES);
254+
slotBuffer.putInt(0, itemIndex);
255+
slotBuffer.position(0);
256+
slotBuffer.limit(Integer.BYTES);
257+
fileChannel.position(slotPosition);
258+
fileChannel.write(slotBuffer);
259+
} else {
260+
// Use ByteBuffer for writing
261+
this.byteBuffer.position(itemPosition);
262+
this.byteBuffer.put(indexItem.getByteBuffer());
263+
this.byteBuffer.putInt(slotPosition, itemIndex);
264+
}
221265

222266
if (slotOldValue <= INVALID_INDEX) {
223267
this.hashSlotCount.incrementAndGet();
@@ -231,7 +275,7 @@ public AppendResult putKey(
231275
this.getTimestamp(), topic, key, hashCode % this.hashSlotMaxCount, itemIndex, slotOldValue, indexItem);
232276
}
233277
return AppendResult.SUCCESS;
234-
} catch (Exception e) {
278+
} catch (Throwable e) {
235279
log.error("IndexStoreFile put key error, topic: {}, topicId: {}, queueId: {}, keySet: {}, offset: {}, " +
236280
"size: {}, timestamp: {}", topic, topicId, queueId, keySet, offset, size, timestamp, e);
237281
} finally {
@@ -392,7 +436,7 @@ public ByteBuffer doCompaction() {
392436
buffer = compactToNewFile();
393437
log.debug("IndexStoreFile do compaction, timestamp: {}, file size: {}, cost: {}ms",
394438
this.getTimestamp(), buffer.capacity(), stopwatch.elapsed(TimeUnit.MICROSECONDS));
395-
} catch (Exception e) {
439+
} catch (Throwable e) {
396440
log.error("IndexStoreFile do compaction, timestamp: {}, cost: {}ms",
397441
this.getTimestamp(), stopwatch.elapsed(TimeUnit.MICROSECONDS), e);
398442
return null;
@@ -423,8 +467,9 @@ protected ByteBuffer compactToNewFile() throws IOException {
423467
int writePosition = INDEX_HEADER_SIZE + (hashSlotMaxCount * HASH_SLOT_SIZE);
424468
int fileMaxLength = writePosition + COMPACT_INDEX_ITEM_SIZE * indexItemCount.get();
425469

426-
compactMappedFile = new DefaultMappedFile(this.getCompactedFilePath(), fileMaxLength);
470+
compactMappedFile = new DefaultMappedFile(this.getCompactedFilePath(), fileMaxLength, writeWithoutMmap);
427471
MappedByteBuffer newBuffer = compactMappedFile.getMappedByteBuffer();
472+
FileChannel compactFileChannel = compactMappedFile.getFileChannel();
428473

429474
for (int i = 0; i < hashSlotMaxCount; i++) {
430475
int slotPosition = this.getSlotPosition(i);
@@ -437,24 +482,48 @@ protected ByteBuffer compactToNewFile() throws IOException {
437482
buffer.get(payload);
438483
int newSlotValue = payloadBuffer.getInt(COMPACT_INDEX_ITEM_SIZE);
439484
buffer.limit(COMPACT_INDEX_ITEM_SIZE);
440-
newBuffer.position(writePosition);
441-
newBuffer.put(payload, 0, COMPACT_INDEX_ITEM_SIZE);
485+
486+
if (writeWithoutMmap && compactFileChannel != null) {
487+
// Use FileChannel for writing
488+
ByteBuffer writeBuffer = ByteBuffer.wrap(payload, 0, COMPACT_INDEX_ITEM_SIZE);
489+
compactFileChannel.position(writePosition);
490+
compactFileChannel.write(writeBuffer);
491+
} else {
492+
// Use ByteBuffer for writing
493+
newBuffer.position(writePosition);
494+
newBuffer.put(payload, 0, COMPACT_INDEX_ITEM_SIZE);
495+
}
442496
log.trace("IndexStoreFile do compaction, write item, slot: {}, current: {}, next: {}", i, slotValue, newSlotValue);
443497
slotValue = newSlotValue;
444498
writePosition += COMPACT_INDEX_ITEM_SIZE;
445499
}
446500

447501
int length = writePosition - writeBeginPosition;
448-
newBuffer.putInt(slotPosition, writeBeginPosition);
449-
newBuffer.putInt(slotPosition + Integer.BYTES, length);
502+
if (writeWithoutMmap && compactFileChannel != null) {
503+
// Use FileChannel for writing
504+
ByteBuffer slotWriteBuffer = ByteBuffer.allocate(Integer.BYTES * 2);
505+
slotWriteBuffer.putInt(0, writeBeginPosition);
506+
slotWriteBuffer.putInt(Integer.BYTES, length);
507+
slotWriteBuffer.position(0);
508+
slotWriteBuffer.limit(Integer.BYTES * 2);
509+
compactFileChannel.position(slotPosition);
510+
compactFileChannel.write(slotWriteBuffer);
511+
} else {
512+
// Use ByteBuffer for writing
513+
newBuffer.putInt(slotPosition, writeBeginPosition);
514+
newBuffer.putInt(slotPosition + Integer.BYTES, length);
515+
}
450516

451517
if (length > 0) {
452518
log.trace("IndexStoreFile do compaction, write slot, slot: {}, begin: {}, length: {}", i, writeBeginPosition, length);
453519
}
454520
}
455521

456-
this.flushNewMetadata(newBuffer, true);
457-
newBuffer.flip();
522+
this.flushNewMetadata(newBuffer, true, compactFileChannel);
523+
524+
// Set position and limit for reading
525+
newBuffer.position(0);
526+
newBuffer.limit(fileMaxLength);
458527
return newBuffer;
459528
}
460529

tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,13 @@
3939
import org.junit.Assert;
4040
import org.junit.Before;
4141
import org.junit.Test;
42+
import org.junit.runner.RunWith;
43+
import org.junit.runners.Parameterized;
4244

45+
import java.util.Arrays;
46+
import java.util.Collection;
47+
48+
@RunWith(Parameterized.class)
4349
public class IndexStoreFileTest {
4450

4551
private static final String TOPIC_NAME = "TopicTest";
@@ -50,6 +56,17 @@ public class IndexStoreFileTest {
5056
private static final String KEY = "MessageKey";
5157
private static final Set<String> KEY_SET = Collections.singleton(KEY);
5258

59+
@Parameterized.Parameter
60+
public boolean writeWithoutMmap;
61+
62+
@Parameterized.Parameters(name = "writeWithoutMmap={0}")
63+
public static Collection<Object[]> data() {
64+
return Arrays.asList(new Object[][] {
65+
{ true },
66+
{ false }
67+
});
68+
}
69+
5370
private String filePath;
5471
private MessageStoreConfig storeConfig;
5572
private IndexStoreFile indexStoreFile;
@@ -64,6 +81,7 @@ public void init() throws IOException {
6481
storeConfig.setTieredStoreIndexFileMaxHashSlotNum(5);
6582
storeConfig.setTieredStoreIndexFileMaxIndexNum(20);
6683
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.PosixFileSegment");
84+
storeConfig.setWriteWithoutMmap(writeWithoutMmap);
6785
indexStoreFile = new IndexStoreFile(storeConfig, System.currentTimeMillis());
6886
}
6987

0 commit comments

Comments
 (0)