Skip to content

Commit d63d08b

Browse files
committed
Add memory limit configuration for DataFusion runtime to prevent out-of-memory errors
1 parent c5cd149 commit d63d08b

5 files changed

Lines changed: 60 additions & 1 deletion

File tree

docs/source/user-guide/configuration.rst

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,26 @@ a :py:class:`~datafusion.context.SessionConfig` and :py:class:`~datafusion.conte
4646
ctx = SessionContext(config, runtime)
4747
print(ctx)
4848
49+
.. _memory_limit:
50+
51+
Limiting memory usage
52+
---------------------
53+
54+
For long-running queries or streaming results it can be helpful to cap
55+
DataFusion's memory consumption. Use
56+
``RuntimeEnvBuilder.with_memory_limit`` to set a maximum in bytes. When this
57+
limit is reached DataFusion will spill intermediate data to disk using a fair
58+
spill pool, preventing the process from exhausting system memory.
59+
60+
.. code-block:: python
61+
62+
from datafusion import RuntimeEnvBuilder, SessionContext
63+
64+
runtime = RuntimeEnvBuilder().with_memory_limit(1_000_000)
65+
ctx = SessionContext(runtime=runtime)
66+
67+
# queries now respect the 1 MB cap and may spill to disk
68+
4969
5070
.. _target_partitions:
5171

docs/source/user-guide/io/arrow.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ If the goal is simply to persist results, prefer engine-level writers such as
8686
``df.write_parquet()``. These writers stream data from Rust directly to the
8787
destination and avoid Python-side memory growth.
8888

89+
See :ref:`memory_limit` for how to configure a runtime memory cap to further
90+
guard against out-of-memory conditions when streaming large datasets.
91+
8992
.. ipython:: python
9093
9194
df = df.select((col("a") * lit(1.5)).alias("c"), lit("df").alias("d"))

python/datafusion/context.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,23 @@ def with_unbounded_memory_pool(self) -> RuntimeEnvBuilder:
339339
self.config_internal = self.config_internal.with_unbounded_memory_pool()
340340
return self
341341

342+
def with_memory_limit(self, size: int) -> RuntimeEnvBuilder:
343+
"""Limit runtime memory using a fair spill pool.
344+
345+
Args:
346+
size: Maximum number of bytes that the runtime may allocate before spilling.
347+
348+
Returns:
349+
A new :py:class:`RuntimeEnvBuilder` object with the updated setting.
350+
351+
Example usage::
352+
353+
RuntimeEnvBuilder().with_memory_limit(1024)
354+
"""
355+
356+
self.config_internal = self.config_internal.with_memory_limit(size)
357+
return self
358+
342359
def with_fair_spill_pool(self, size: int) -> RuntimeEnvBuilder:
343360
"""Use a fair spill pool with the specified size.
344361

python/tests/test_context.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -634,6 +634,21 @@ def test_sql_with_options_no_statements(ctx):
634634
ctx.sql_with_options(sql, options=options)
635635

636636

637+
def test_memory_limit_streaming():
638+
runtime = RuntimeEnvBuilder().with_memory_limit(1024)
639+
ctx = SessionContext(runtime=runtime)
640+
641+
# Create a batch larger than the 1KB limit
642+
array = pa.array(range(10000))
643+
batch = pa.RecordBatch.from_arrays([array], names=["a"])
644+
df = ctx.create_dataframe([[batch]])
645+
stream = df.select(column("a") + literal(1)).execute_stream()
646+
647+
with pytest.raises(Exception):
648+
for _ in stream:
649+
pass
650+
651+
637652
@pytest.fixture
638653
def batch():
639654
return pa.RecordBatch.from_arrays(

src/context.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,10 @@ impl PyRuntimeEnvBuilder {
250250
Self { builder }
251251
}
252252

253+
fn with_memory_limit(&self, size: usize) -> Self {
254+
self.with_fair_spill_pool(size)
255+
}
256+
253257
fn with_temp_file_path(&self, path: &str) -> Self {
254258
let builder = self.builder.clone();
255259
let builder = builder.with_temp_file_path(path);
@@ -322,7 +326,7 @@ impl PySessionContext {
322326
} else {
323327
RuntimeEnvBuilder::default()
324328
};
325-
let runtime = Arc::new(runtime_env_builder.build()?);
329+
let runtime = runtime_env_builder.build_arc()?;
326330
let session_state = SessionStateBuilder::new()
327331
.with_config(config)
328332
.with_runtime_env(runtime)

0 commit comments

Comments
 (0)