Skip to content

Commit ad71e58

Browse files
committed
Add memory limit test for record batch stream to ensure compliance with configured limits
1 parent d63d08b commit ad71e58

1 file changed

Lines changed: 21 additions & 0 deletions

File tree

python/tests/test_record_batch_stream.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18+
import ctypes
19+
20+
import pyarrow as pa
1821
import pytest
22+
from datafusion import RuntimeEnvBuilder, SessionContext
1923

2024

2125
def test_record_batch_stream_next(ctx):
@@ -65,3 +69,20 @@ def test_arrow_c_stream_partitioned(tmp_path, ctx):
6569
gc.collect()
6670

6771
assert peak - baseline < rows_per_part * 8 * 2
72+
73+
74+
def test_record_batch_stream_memory_limit():
75+
"""Ensure streaming respects configured memory limits."""
76+
runtime = RuntimeEnvBuilder().with_memory_limit(1024)
77+
ctx = SessionContext(runtime=runtime)
78+
df = ctx.sql("SELECT * FROM range(0, 1000000)")
79+
capsule = df.__arrow_c_stream__()
80+
ctypes.pythonapi.PyCapsule_GetPointer.restype = ctypes.c_void_p
81+
ctypes.pythonapi.PyCapsule_GetPointer.argtypes = [ctypes.py_object, ctypes.c_char_p]
82+
ptr = ctypes.pythonapi.PyCapsule_GetPointer(capsule, b"arrow_array_stream")
83+
reader = pa.RecordBatchReader._import_from_c(ptr)
84+
85+
with pytest.raises(RuntimeError, match="(?i)memory"):
86+
list(reader)
87+
88+
assert ctx.sql("SELECT 1").collect()[0].num_rows == 1

0 commit comments

Comments
 (0)