Skip to content

Commit e2e26cf

Browse files
authored
Fix RocksDB native memory leak in stateful applications (#32)
* Fix RocksDB native memory leak * Fix unit tests
1 parent 08a8afa commit e2e26cf

2 files changed

Lines changed: 13 additions & 3 deletions

File tree

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/rocksdb/RocksDBCacheProvider.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.google.common.annotations.VisibleForTesting;
1414
import java.util.Map;
1515
import org.apache.kafka.common.config.ConfigException;
16+
import org.apache.kafka.streams.state.internals.BlockBasedTableConfigWithAccessibleCache;
1617
import org.rocksdb.BlockBasedTableConfig;
1718
import org.rocksdb.Cache;
1819
import org.rocksdb.LRUCache;
@@ -107,7 +108,7 @@ protected synchronized void initCache(Options options, Map<String, Object> confi
107108
cacheTotalCapacity / (1024 * 1024), writeBuffersRatio, highPriorityPoolRatio);
108109
}
109110

110-
final BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
111+
final BlockBasedTableConfigWithAccessibleCache tableConfig = (BlockBasedTableConfigWithAccessibleCache) options.tableFormatConfig();
111112

112113
// ######### Block cache (Read buffers) #########
113114
if (configs.containsKey(BLOCK_SIZE)) {
@@ -133,7 +134,12 @@ protected synchronized void initCache(Options options, Map<String, Object> confi
133134

134135
options.setWriteBufferManager(writeBufferManager);
135136

136-
tableConfig.setBlockCache(cache);
137+
final Cache oldCache = tableConfig.blockCache();
138+
if(oldCache != null) {
139+
LOG.info("Releasing old rocksdb cache before setting shared cache.");
140+
oldCache.close();
141+
}
142+
tableConfig.setBlockCache(this.cache);
137143
options.setTableFormatConfig(tableConfig);
138144
}
139145

kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/rocksdb/BoundedMemoryConfigSetterTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.stream.Stream;
1818
import org.apache.kafka.common.config.ConfigException;
1919
import org.apache.kafka.streams.state.RocksDBConfigSetter;
20+
import org.apache.kafka.streams.state.internals.BlockBasedTableConfigWithAccessibleCache;
2021
import org.junit.jupiter.api.AfterEach;
2122
import org.junit.jupiter.api.BeforeEach;
2223
import org.junit.jupiter.api.Test;
@@ -26,6 +27,7 @@
2627
import org.rocksdb.CompactionStyle;
2728
import org.rocksdb.CompressionType;
2829
import org.rocksdb.InfoLogLevel;
30+
import org.rocksdb.LRUCache;
2931
import org.rocksdb.Options;
3032

3133
class BoundedMemoryConfigSetterTest {
@@ -41,7 +43,9 @@ public void setUp() {
4143
RocksDBCacheProvider.get().testDestroy();
4244
options = new Options();
4345
configs = new HashMap<>();
44-
tableConfig = new BlockBasedTableConfig();
46+
tableConfig = new BlockBasedTableConfigWithAccessibleCache();
47+
// mimic kafka streams
48+
tableConfig.setBlockCache(new LRUCache(32 * 1024));
4549
options.setTableFormatConfig(tableConfig);
4650
configSetter = new BoundedMemoryConfigSetter();
4751
}

0 commit comments

Comments
 (0)