Skip to content

Commit 3b19e8d

Browse files
committed
feat: add to_stream method for lazy processing of DataFrame results
1 parent 87dd275 commit 3b19e8d

3 files changed

Lines changed: 30 additions & 36 deletions

File tree

docs/source/user-guide/dataframe/index.rst

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,18 @@ out-of-memory errors.
168168
for batch in reader:
169169
... # process each batch as it is produced
170170
171-
DataFrames are also iterable, yielding :class:`pyarrow.RecordBatch` objects
172-
lazily so you can loop over results directly:
171+
DataFrames expose :py:meth:`~datafusion.DataFrame.to_stream`, which returns a
172+
``RecordBatchStream`` for lazily processing results without materializing them
173+
all at once:
174+
175+
.. code-block:: python
176+
177+
stream = df.to_stream()
178+
for batch in stream:
179+
... # process each batch as it is produced
180+
181+
DataFrames themselves are also iterable and delegate to ``to_stream()`` under
182+
the hood:
173183

174184
.. code-block:: python
175185

python/datafusion/dataframe.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1022,6 +1022,14 @@ def to_arrow_table(self) -> pa.Table:
10221022
"""
10231023
return self.df.to_arrow_table()
10241024

1025+
def to_stream(self) -> RecordBatchStream:
1026+
"""Execute this :py:class:`DataFrame` and return a record batch stream.
1027+
1028+
This is a convenience wrapper around :py:meth:`execute_stream` and can be
1029+
used to iterate over results without materializing them.
1030+
"""
1031+
return self.execute_stream()
1032+
10251033
def execute_stream(self) -> RecordBatchStream:
10261034
"""Executes this DataFrame and returns a stream over a single partition.
10271035

python/tests/test_dataframe_iter_stream.py

Lines changed: 10 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -15,41 +15,17 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
import pyarrow as pa
19-
20-
21-
def test_iter_releases_reader(monkeypatch, ctx):
22-
batches = [
23-
pa.RecordBatch.from_pydict({"a": [1]}),
24-
pa.RecordBatch.from_pydict({"a": [2]}),
25-
]
26-
27-
class DummyReader:
28-
def __init__(self, batches):
29-
self._iter = iter(batches)
30-
self.closed = False
31-
32-
def __iter__(self):
33-
return self
34-
35-
def __next__(self):
36-
return next(self._iter)
37-
38-
def close(self):
39-
self.closed = True
40-
41-
dummy_reader = DummyReader(batches)
42-
43-
class FakeRecordBatchReader:
44-
@staticmethod
45-
def _import_from_c_capsule(*_args, **_kwargs):
46-
return dummy_reader
47-
48-
monkeypatch.setattr(pa, "RecordBatchReader", FakeRecordBatchReader)
4918

19+
def test_to_stream(ctx):
5020
df = ctx.from_pydict({"a": [1, 2]})
21+
stream = df.to_stream()
22+
batches = [rb.to_pyarrow() for rb in stream]
23+
assert len(batches) == 1
24+
assert batches[0].to_pydict() == {"a": [1, 2]}
5125

52-
for _ in df:
53-
break
5426

55-
assert dummy_reader.closed
27+
def test_dataframe_iter(ctx):
28+
df = ctx.from_pydict({"a": [1, 2]})
29+
batches = list(df)
30+
assert len(batches) == 1
31+
assert batches[0].to_pydict() == {"a": [1, 2]}

0 commit comments

Comments
 (0)