Skip to content

Commit c3c6832

Browse files
authored
[ISSUE #9680] Improve RocksDB compaction filter factory resource management (#9681)
* feat: improve rocksdb compaction filter factory resource management - Refactor ConsumeQueueCompactionFilterFactory to use LongSupplier instead of MessageStore - Add proper resource cleanup in ConsumeQueueRocksDBStorage.preShutdown() - Update RocksDBOptionsFactory to accept external compaction filter factory - Optimize write buffer size for CQ rocksdb performance This change improves resource management and reduces memory leaks by: 1. Decoupling compaction filter from MessageStore reference 2. Ensuring proper cleanup of native resources 3. Making compaction filter factory lifecycle manageable * Polish the code * Fix consumeQueueCompactionFilterFactory not use
1 parent 809bfe6 commit c3c6832

3 files changed

Lines changed: 23 additions & 10 deletions

File tree

store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueCompactionFilterFactory.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,20 @@
1616
*/
1717
package org.apache.rocketmq.store.rocksdb;
1818

19+
import java.util.function.LongSupplier;
1920
import org.apache.rocketmq.common.constant.LoggerName;
2021
import org.apache.rocketmq.logging.org.slf4j.Logger;
2122
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
22-
import org.apache.rocketmq.store.MessageStore;
2323
import org.rocksdb.AbstractCompactionFilter;
2424
import org.rocksdb.AbstractCompactionFilterFactory;
2525
import org.rocksdb.RemoveConsumeQueueCompactionFilter;
2626

2727
public class ConsumeQueueCompactionFilterFactory extends AbstractCompactionFilterFactory<RemoveConsumeQueueCompactionFilter> {
2828
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.ROCKSDB_LOGGER_NAME);
29-
private final MessageStore messageStore;
29+
private final LongSupplier minPhyOffsetSupplier;
3030

31-
public ConsumeQueueCompactionFilterFactory(final MessageStore messageStore) {
32-
this.messageStore = messageStore;
31+
public ConsumeQueueCompactionFilterFactory(final LongSupplier minPhyOffsetSupplier) {
32+
this.minPhyOffsetSupplier = minPhyOffsetSupplier;
3333
}
3434

3535
@Override
@@ -39,7 +39,7 @@ public String name() {
3939

4040
@Override
4141
public RemoveConsumeQueueCompactionFilter createCompactionFilter(final AbstractCompactionFilter.Context context) {
42-
long minPhyOffset = this.messageStore.getMinPhyOffset();
42+
long minPhyOffset = this.minPhyOffsetSupplier.getAsLong();
4343
LOGGER.info("manualCompaction minPhyOffset: {}, isFull: {}, isManual: {}",
4444
minPhyOffset, context.isFullCompaction(), context.isManualCompaction());
4545
return new RemoveConsumeQueueCompactionFilter(minPhyOffset);

store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ public class ConsumeQueueRocksDBStorage extends AbstractRocksDBStorage {
3838
private final MessageStore messageStore;
3939
private volatile ColumnFamilyHandle offsetCFHandle;
4040

41+
private ConsumeQueueCompactionFilterFactory compactionFilterFactory;
42+
4143
public ConsumeQueueRocksDBStorage(final MessageStore messageStore, final String dbPath) {
4244
super(dbPath);
4345
this.messageStore = messageStore;
@@ -65,7 +67,9 @@ protected boolean postLoad() {
6567

6668
final List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();
6769

68-
ColumnFamilyOptions cqCfOptions = RocksDBOptionsFactory.createCQCFOptions(this.messageStore);
70+
this.compactionFilterFactory = new ConsumeQueueCompactionFilterFactory(messageStore::getMinPhyOffset);
71+
72+
ColumnFamilyOptions cqCfOptions = RocksDBOptionsFactory.createCQCFOptions(this.messageStore, this.compactionFilterFactory);
6973
this.cfOptions.add(cqCfOptions);
7074
cfDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cqCfOptions));
7175

@@ -84,7 +88,14 @@ protected boolean postLoad() {
8488

8589
@Override
8690
protected void preShutdown() {
87-
this.offsetCFHandle.close();
91+
if (this.offsetCFHandle != null) {
92+
this.offsetCFHandle.close();
93+
}
94+
95+
if (this.compactionFilterFactory != null) {
96+
this.compactionFilterFactory.close();
97+
}
98+
8899
}
89100

90101
public byte[] getCQ(final byte[] keyBytes) throws RocksDBException {
@@ -95,7 +106,8 @@ public byte[] getOffset(final byte[] keyBytes) throws RocksDBException {
95106
return get(this.offsetCFHandle, this.totalOrderReadOptions, keyBytes);
96107
}
97108

98-
public List<byte[]> multiGet(final List<ColumnFamilyHandle> cfhList, final List<byte[]> keys) throws RocksDBException {
109+
public List<byte[]> multiGet(final List<ColumnFamilyHandle> cfhList,
110+
final List<byte[]> keys) throws RocksDBException {
99111
return multiGet(this.totalOrderReadOptions, cfhList, keys);
100112
}
101113

store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@
4141

4242
public class RocksDBOptionsFactory {
4343

44-
public static ColumnFamilyOptions createCQCFOptions(final MessageStore messageStore) {
44+
public static ColumnFamilyOptions createCQCFOptions(final MessageStore messageStore,
45+
ConsumeQueueCompactionFilterFactory consumeQueueCompactionFilterFactory) {
4546
BlockBasedTableConfig blockBasedTableConfig = new BlockBasedTableConfig().
4647
setFormatVersion(5).
4748
setIndexType(IndexType.kBinarySearch).
@@ -92,7 +93,7 @@ public static ColumnFamilyOptions createCQCFOptions(final MessageStore messageSt
9293
setTargetFileSizeBase(256 * SizeUnit.MB).
9394
setTargetFileSizeMultiplier(2).
9495
setMergeOperator(new StringAppendOperator()).
95-
setCompactionFilterFactory(new ConsumeQueueCompactionFilterFactory(messageStore)).
96+
setCompactionFilterFactory(consumeQueueCompactionFilterFactory).
9697
setReportBgIoStats(true).
9798
setOptimizeFiltersForHits(true);
9899
}

0 commit comments

Comments
 (0)