Skip to content

Commit 949da2a

Browse files
committed
Enhance documentation for __arrow_c_stream__ method and add tests for incremental streaming of DataFrame
1 parent e37796b commit 949da2a

3 files changed

Lines changed: 58 additions & 6 deletions

File tree

python/datafusion/dataframe.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1098,19 +1098,22 @@ def unnest_columns(self, *columns: str, preserve_nulls: bool = True) -> DataFram
10981098
return DataFrame(self.df.unnest_columns(columns, preserve_nulls=preserve_nulls))
10991099

11001100
def __arrow_c_stream__(self, requested_schema: object | None = None) -> object:
1101-
"""Export an Arrow PyCapsule Stream.
1101+
"""Export the DataFrame as an Arrow C Stream.
11021102
1103-
This will execute and collect the DataFrame. We will attempt to respect the
1104-
requested schema, but only trivial transformations will be applied such as only
1105-
returning the fields listed in the requested schema if their data types match
1106-
those in the DataFrame.
1103+
The DataFrame is executed using DataFusion's streaming APIs and exposed via
1104+
Arrow's C Stream interface. Record batches are produced incrementally, so the
1105+
full result set is never materialized in memory. When ``requested_schema`` is
1106+
provided, only straightforward projections such as column selection or
1107+
reordering are applied.
11071108
11081109
Args:
11091110
requested_schema: Attempt to provide the DataFrame using this schema.
11101111
11111112
Returns:
1112-
Arrow PyCapsule object.
1113+
Arrow PyCapsule object representing an ``ArrowArrayStream``.
11131114
"""
1115+
# ``DataFrame.__arrow_c_stream__`` in the Rust extension leverages
1116+
# ``execute_stream`` under the hood to stream batches one at a time.
11141117
return self.df.__arrow_c_stream__(requested_schema)
11151118

11161119
def transform(self, func: Callable[..., DataFrame], *args: Any) -> DataFrame:

python/tests/test_dataframe.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1582,6 +1582,29 @@ def test_empty_to_arrow_table(df):
15821582
assert set(pyarrow_table.column_names) == {"a", "b", "c"}
15831583

15841584

1585+
def test_arrow_c_stream_to_table(monkeypatch):
1586+
ctx = SessionContext()
1587+
1588+
# Create a DataFrame with two separate record batches
1589+
batch1 = pa.record_batch([pa.array([1])], names=["a"])
1590+
batch2 = pa.record_batch([pa.array([2])], names=["a"])
1591+
df = ctx.create_dataframe([[batch1], [batch2]])
1592+
1593+
# Fail if the DataFrame is pre-collected
1594+
def fail_collect(self): # pragma: no cover - failure path
1595+
msg = "collect should not be called"
1596+
raise AssertionError(msg)
1597+
1598+
monkeypatch.setattr(DataFrame, "collect", fail_collect)
1599+
1600+
table = pa.Table.from_batches(df)
1601+
expected = pa.Table.from_batches([batch1, batch2])
1602+
1603+
assert table.equals(expected)
1604+
assert table.schema == df.schema()
1605+
assert table.column("a").num_chunks == 2
1606+
1607+
15851608
def test_to_pylist(df):
15861609
# Convert datafusion dataframe to Python list
15871610
pylist = df.to_pylist()

python/tests/test_io.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17+
import resource
1718
from pathlib import Path
1819

1920
import pyarrow as pa
@@ -92,3 +93,28 @@ def test_read_avro():
9293
path = Path.cwd() / "testing/data/avro/alltypes_plain.avro"
9394
avro_df = read_avro(path=path)
9495
assert avro_df is not None
96+
97+
98+
def test_arrow_c_stream_large_dataset(ctx):
99+
"""DataFrame.__arrow_c_stream__ yields batches incrementally.
100+
101+
This test constructs a DataFrame that would be far larger than available
102+
memory if materialized. The ``__arrow_c_stream__`` method should expose a
103+
stream of record batches without collecting the full dataset, so reading a
104+
handful of batches should not exhaust process memory.
105+
"""
106+
# Create a very large DataFrame using range; this would be terabytes if collected
107+
df = ctx.range(0, 1 << 40)
108+
109+
reader = pa.RecordBatchReader._import_from_c(df.__arrow_c_stream__())
110+
111+
# Track maximum RSS before consuming batches
112+
start_max_rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
113+
114+
for _ in range(5):
115+
batch = reader.read_next_batch()
116+
assert batch is not None
117+
assert len(batch) > 0
118+
current_max_rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
119+
# Ensure memory usage hasn't grown substantially (>50MB)
120+
assert current_max_rss - start_max_rss < 50 * 1024

0 commit comments

Comments
 (0)