Skip to content

Commit 98e7e00

Browse files
committed
refactor: update DataFrame iteration to yield RecordBatch objects directly
1 parent 416bf8e commit 98e7e00

3 files changed

Lines changed: 18 additions & 13 deletions

File tree

docs/source/user-guide/dataframe/index.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,13 +131,13 @@ Terminal Operations
131131
-------------------
132132

133133
To materialize the results of your DataFrame operations, call a terminal method or iterate over the
134-
``DataFrame`` to consume ``pyarrow.RecordBatch`` objects lazily:
134+
``DataFrame`` to consume :py:class:`datafusion.record_batch.RecordBatch` objects lazily:
135135

136136
.. code-block:: python
137137
138138
# Iterate over the DataFrame to stream record batches
139139
for batch in df:
140-
... # process each batch as it is produced
140+
... # batch is a datafusion.record_batch.RecordBatch
141141
142142
# Collect all data as PyArrow RecordBatches
143143
result_batches = df.collect()

python/datafusion/dataframe.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1133,23 +1133,24 @@ def __arrow_c_stream__(self, requested_schema: object | None = None) -> object:
11331133
# preserving the original partition order.
11341134
return self.df.__arrow_c_stream__(requested_schema)
11351135

1136-
def __iter__(self) -> Iterator[pa.RecordBatch]:
1137-
"""Yield record batches from this DataFrame lazily.
1136+
def __iter__(self) -> Iterator[RecordBatch]:
1137+
"""Yield :class:`datafusion.record_batch.RecordBatch` objects lazily.
11381138
1139-
This delegates to :py:meth:`to_stream` and converts each batch to a
1140-
:class:`pyarrow.RecordBatch` without eagerly materializing the entire
1141-
result set.
1139+
This delegates to :py:meth:`to_stream` without converting each batch to a
1140+
:class:`pyarrow.RecordBatch`. Use
1141+
:py:meth:`datafusion.record_batch.RecordBatch.to_pyarrow` when a
1142+
:class:`pyarrow.RecordBatch` is required.
11421143
"""
11431144
for batch in self.to_stream():
1144-
yield batch.to_pyarrow()
1145+
yield batch
11451146

1146-
def __aiter__(self) -> AsyncIterator[pa.RecordBatch]:
1147-
"""Asynchronously yield record batches from this DataFrame lazily."""
1147+
def __aiter__(self) -> AsyncIterator[RecordBatch]:
1148+
"""Asynchronously yield :class:`datafusion.record_batch.RecordBatch` objects lazily."""
11481149
stream = self.to_stream()
11491150

1150-
async def iterator() -> AsyncIterator[pa.RecordBatch]:
1151+
async def iterator() -> AsyncIterator[RecordBatch]:
11511152
async for batch in stream:
1152-
yield batch.to_pyarrow()
1153+
yield batch
11531154

11541155
return iterator()
11551156

python/tests/test_dataframe_iter_stream.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
# under the License.
1717

1818

19+
from datafusion.record_batch import RecordBatch
20+
21+
1922
def test_to_stream(ctx):
2023
df = ctx.from_pydict({"a": [1, 2]})
2124
stream = df.to_stream()
@@ -28,4 +31,5 @@ def test_dataframe_iter(ctx):
2831
df = ctx.from_pydict({"a": [1, 2]})
2932
batches = list(df)
3033
assert len(batches) == 1
31-
assert batches[0].to_pydict() == {"a": [1, 2]}
34+
assert isinstance(batches[0], RecordBatch)
35+
assert batches[0].to_pyarrow().to_pydict() == {"a": [1, 2]}

0 commit comments

Comments
 (0)