Skip to content

Commit d6f5c86

Browse files
committed
refactor: enhance DataFrame and RecordBatchStream iteration support
1 parent 80bc96a commit d6f5c86

3 files changed

Lines changed: 54 additions & 19 deletions

File tree

docs/source/user-guide/dataframe/index.rst

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,14 +168,33 @@ out-of-memory errors.
168168
for batch in reader:
169169
... # process each batch as it is produced
170170
171-
DataFrames are also iterable, yielding :class:`pyarrow.RecordBatch` objects
172-
lazily so you can loop over results directly:
171+
DataFrames are also iterable, yielding :class:`datafusion.RecordBatch`
172+
objects lazily so you can loop over results directly without importing
173+
PyArrow:
173174

174175
.. code-block:: python
175176
176177
for batch in df:
178+
... # each batch is a ``RecordBatch``
179+
180+
Asynchronous iteration is supported as well, allowing integration with
181+
``asyncio`` event loops:
182+
183+
.. code-block:: python
184+
185+
async for batch in df:
177186
... # process each batch as it is produced
178187
188+
To work with the stream directly, use
189+
``to_record_batch_stream()``, which returns a
190+
:class:`~datafusion.RecordBatchStream`:
191+
192+
.. code-block:: python
193+
194+
stream = df.to_record_batch_stream()
195+
for batch in stream:
196+
...
197+
179198
See :doc:`../io/arrow` for additional details on the Arrow interface.
180199

181200
HTML Rendering

python/datafusion/dataframe.py

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
from datafusion._internal import ParquetWriterOptions as ParquetWriterOptionsInternal
4444
from datafusion.expr import Expr, SortExpr, sort_or_default
4545
from datafusion.plan import ExecutionPlan, LogicalPlan
46-
from datafusion.record_batch import RecordBatchStream
46+
from datafusion.record_batch import RecordBatch, RecordBatchStream
4747

4848
if TYPE_CHECKING:
4949
import pathlib
@@ -1030,6 +1030,10 @@ def execute_stream(self) -> RecordBatchStream:
10301030
"""
10311031
return RecordBatchStream(self.df.execute_stream())
10321032

1033+
def to_record_batch_stream(self) -> RecordBatchStream:
1034+
"""Return a :class:`RecordBatchStream` executing this DataFrame."""
1035+
return self.execute_stream()
1036+
10331037
def execute_stream_partitioned(self) -> list[RecordBatchStream]:
10341038
"""Executes this DataFrame and returns a stream for each partition.
10351039
@@ -1121,22 +1125,13 @@ def __arrow_c_stream__(self, requested_schema: object | None = None) -> object:
11211125
# preserving the original partition order.
11221126
return self.df.__arrow_c_stream__(requested_schema)
11231127

1124-
def __iter__(self) -> Iterator[pa.RecordBatch]:
1125-
"""Yield record batches from the DataFrame without materializing results.
1126-
1127-
This implementation streams record batches via the Arrow C Stream
1128-
interface, allowing callers such as :func:`pyarrow.Table.from_batches` to
1129-
consume results lazily. The DataFrame is executed using DataFusion's
1130-
partitioned streaming APIs so ``collect`` is never invoked and batch
1131-
order across partitions is preserved.
1132-
"""
1133-
from contextlib import closing
1134-
1135-
import pyarrow as pa
1128+
def __iter__(self) -> Iterator[RecordBatch]:
1129+
"""Yield :class:`RecordBatch` objects by streaming execution."""
1130+
yield from self.to_record_batch_stream()
11361131

1137-
reader = pa.RecordBatchReader._import_from_c_capsule(self.__arrow_c_stream__())
1138-
with closing(reader):
1139-
yield from reader
1132+
async def __aiter__(self) -> RecordBatchStream:
1133+
"""Return an asynchronous iterator over streamed ``RecordBatch`` objects."""
1134+
return await self.to_record_batch_stream().__aiter__()
11401135

11411136
def transform(self, func: Callable[..., DataFrame], *args: Any) -> DataFrame:
11421137
"""Apply a function to the current DataFrame which returns another DataFrame.

python/datafusion/record_batch.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,26 @@ def to_pyarrow(self) -> pa.RecordBatch:
4646
"""Convert to :py:class:`pa.RecordBatch`."""
4747
return self.record_batch.to_pyarrow()
4848

49+
def __arrow_c_array__(
50+
self, requested_schema: object | None = None
51+
) -> tuple[object, object]:
52+
"""Export the record batch via the Arrow C Data Interface.
53+
54+
This allows zero-copy interchange with libraries that support the
55+
`Arrow PyCapsule interface <https://arrow.apache.org/docs/format/
56+
CDataInterface/PyCapsuleInterface.html>`_.
57+
58+
Args:
59+
requested_schema: Attempt to provide the record batch using this
60+
schema. Only straightforward projections such as column
61+
selection or reordering are applied.
62+
63+
Returns:
64+
Two Arrow PyCapsule objects representing the ``ArrowArray`` and
65+
``ArrowSchema``.
66+
"""
67+
return self.record_batch.__arrow_c_array__(requested_schema)
68+
4969

5070
class RecordBatchStream:
5171
"""This class represents a stream of record batches.
@@ -72,8 +92,9 @@ def __next__(self) -> RecordBatch:
7292
next_batch = next(self.rbs)
7393
return RecordBatch(next_batch)
7494

75-
def __aiter__(self) -> typing_extensions.Self:
95+
async def __aiter__(self) -> typing_extensions.Self:
7696
"""Async iterator function."""
97+
await self.rbs.__aiter__()
7798
return self
7899

79100
def __iter__(self) -> typing_extensions.Self:

0 commit comments

Comments
 (0)