Skip to content

Commit ab6a810

Browse files
committed
revert branch UNPICK
1 parent f02114a commit ab6a810

5 files changed

Lines changed: 28 additions & 122 deletions

File tree

docs/source/user-guide/dataframe/index.rst

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -145,31 +145,10 @@ To materialize the results of your DataFrame operations:
145145
146146
# Display results
147147
df.show() # Print tabular format to console
148-
148+
149149
# Count rows
150150
count = df.count()
151151
152-
PyArrow Streaming
153-
-----------------
154-
155-
DataFusion DataFrames implement the ``__arrow_c_stream__`` protocol, enabling
156-
zero-copy streaming into libraries like `PyArrow <https://arrow.apache.org/>`_.
157-
Earlier versions eagerly converted the entire DataFrame when exporting to
158-
PyArrow, which could exhaust memory on large datasets. With streaming, batches
159-
are produced lazily so you can process arbitrarily large results without
160-
out-of-memory errors.
161-
162-
.. code-block:: python
163-
164-
import pyarrow as pa
165-
166-
# Create a PyArrow RecordBatchReader without materializing all batches
167-
reader = pa.RecordBatchReader._import_from_c(df.__arrow_c_stream__())
168-
for batch in reader:
169-
... # process each batch as it is produced
170-
171-
See :doc:`../io/arrow` for additional details on the Arrow interface.
172-
173152
HTML Rendering
174153
--------------
175154

python/tests/test_dataframe.py

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1582,29 +1582,6 @@ def test_empty_to_arrow_table(df):
15821582
assert set(pyarrow_table.column_names) == {"a", "b", "c"}
15831583

15841584

1585-
def test_arrow_c_stream_to_table(monkeypatch):
1586-
ctx = SessionContext()
1587-
1588-
# Create a DataFrame with two separate record batches
1589-
batch1 = pa.record_batch([pa.array([1])], names=["a"])
1590-
batch2 = pa.record_batch([pa.array([2])], names=["a"])
1591-
df = ctx.create_dataframe([[batch1], [batch2]])
1592-
1593-
# Fail if the DataFrame is pre-collected
1594-
def fail_collect(self): # pragma: no cover - failure path
1595-
msg = "collect should not be called"
1596-
raise AssertionError(msg)
1597-
1598-
monkeypatch.setattr(DataFrame, "collect", fail_collect)
1599-
1600-
table = pa.Table.from_batches(df)
1601-
expected = pa.Table.from_batches([batch1, batch2])
1602-
1603-
assert table.equals(expected)
1604-
assert table.schema == df.schema()
1605-
assert table.column("a").num_chunks == 2
1606-
1607-
16081585
def test_to_pylist(df):
16091586
# Convert datafusion dataframe to Python list
16101587
pylist = df.to_pylist()

src/context.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use crate::udaf::PyAggregateUDF;
4545
use crate::udf::PyScalarUDF;
4646
use crate::udtf::PyTableFunction;
4747
use crate::udwf::PyWindowUDF;
48-
use crate::utils::{get_global_ctx, spawn_stream, validate_pycapsule, wait_for_future};
48+
use crate::utils::{get_global_ctx, get_tokio_runtime, validate_pycapsule, wait_for_future};
4949
use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
5050
use datafusion::arrow::pyarrow::PyArrowType;
5151
use datafusion::arrow::record_batch::RecordBatch;
@@ -74,6 +74,7 @@ use datafusion_ffi::catalog_provider::{FFI_CatalogProvider, ForeignCatalogProvid
7474
use datafusion_ffi::table_provider::{FFI_TableProvider, ForeignTableProvider};
7575
use pyo3::types::{PyCapsule, PyDict, PyList, PyTuple, PyType};
7676
use pyo3::IntoPyObjectExt;
77+
use tokio::task::JoinHandle;
7778

7879
/// Configuration options for a SessionContext
7980
#[pyclass(name = "SessionConfig", module = "datafusion", subclass)]
@@ -1131,8 +1132,12 @@ impl PySessionContext {
11311132
py: Python,
11321133
) -> PyDataFusionResult<PyRecordBatchStream> {
11331134
let ctx: TaskContext = TaskContext::from(&self.ctx.state());
1135+
// create a Tokio runtime to run the async code
1136+
let rt = &get_tokio_runtime().0;
11341137
let plan = plan.plan.clone();
1135-
let stream = spawn_stream(py, async move { plan.execute(part, Arc::new(ctx)) })?;
1138+
let fut: JoinHandle<datafusion::common::Result<SendableRecordBatchStream>> =
1139+
rt.spawn(async move { plan.execute(part, Arc::new(ctx)) });
1140+
let stream = wait_for_future(py, async { fut.await.map_err(to_datafusion_err) })???;
11361141
Ok(PyRecordBatchStream::new(stream))
11371142
}
11381143
}

src/dataframe.rs

Lines changed: 17 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@ use std::collections::HashMap;
1919
use std::ffi::CString;
2020
use std::sync::Arc;
2121

22-
use arrow::array::{new_null_array, RecordBatch, RecordBatchReader};
22+
use arrow::array::{new_null_array, RecordBatch, RecordBatchIterator, RecordBatchReader};
2323
use arrow::compute::can_cast_types;
2424
use arrow::error::ArrowError;
2525
use arrow::ffi::FFI_ArrowSchema;
2626
use arrow::ffi_stream::FFI_ArrowArrayStream;
2727
use arrow::pyarrow::FromPyArrow;
28-
use datafusion::arrow::datatypes::{Schema, SchemaRef};
28+
use datafusion::arrow::datatypes::Schema;
2929
use datafusion::arrow::pyarrow::{PyArrowType, ToPyArrow};
3030
use datafusion::arrow::util::pretty;
3131
use datafusion::common::UnnestOptions;
@@ -42,7 +42,7 @@ use pyo3::exceptions::PyValueError;
4242
use pyo3::prelude::*;
4343
use pyo3::pybacked::PyBackedStr;
4444
use pyo3::types::{PyCapsule, PyList, PyTuple, PyTupleMethods};
45-
use tokio::{runtime::Handle, task::JoinHandle};
45+
use tokio::task::JoinHandle;
4646

4747
use crate::catalog::PyTable;
4848
use crate::errors::{py_datafusion_err, to_datafusion_err, PyDataFusionError};
@@ -51,8 +51,7 @@ use crate::physical_plan::PyExecutionPlan;
5151
use crate::record_batch::PyRecordBatchStream;
5252
use crate::sql::logical::PyLogicalPlan;
5353
use crate::utils::{
54-
get_tokio_runtime, is_ipython_env, py_obj_to_scalar_value, spawn_stream, validate_pycapsule,
55-
wait_for_future,
54+
get_tokio_runtime, is_ipython_env, py_obj_to_scalar_value, validate_pycapsule, wait_for_future,
5655
};
5756
use crate::{
5857
errors::PyDataFusionResult,
@@ -354,47 +353,6 @@ impl PyDataFrame {
354353
Ok(html_str)
355354
}
356355
}
357-
/// Synchronous wrapper around a [`SendableRecordBatchStream`] used for
358-
/// the `__arrow_c_stream__` implementation.
359-
///
360-
/// It uses `runtime.block_on` to consume the underlying async stream,
361-
/// providing synchronous iteration. When a `projection` is set, each
362-
/// batch is converted via `record_batch_into_schema` to apply schema
363-
/// changes per batch.
364-
struct DataFrameStreamReader {
365-
stream: SendableRecordBatchStream,
366-
runtime: Handle,
367-
schema: SchemaRef,
368-
projection: Option<SchemaRef>,
369-
}
370-
371-
impl Iterator for DataFrameStreamReader {
372-
type Item = Result<RecordBatch, ArrowError>;
373-
374-
fn next(&mut self) -> Option<Self::Item> {
375-
match self.runtime.block_on(self.stream.next()) {
376-
Some(Ok(batch)) => {
377-
let batch = if let Some(ref schema) = self.projection {
378-
match record_batch_into_schema(batch, schema.as_ref()) {
379-
Ok(b) => b,
380-
Err(e) => return Some(Err(e)),
381-
}
382-
} else {
383-
batch
384-
};
385-
Some(Ok(batch))
386-
}
387-
Some(Err(e)) => Some(Err(ArrowError::ExternalError(Box::new(e)))),
388-
None => None,
389-
}
390-
}
391-
}
392-
393-
impl RecordBatchReader for DataFrameStreamReader {
394-
fn schema(&self) -> SchemaRef {
395-
self.schema.clone()
396-
}
397-
}
398356

399357
#[pymethods]
400358
impl PyDataFrame {
@@ -921,12 +879,8 @@ impl PyDataFrame {
921879
py: Python<'py>,
922880
requested_schema: Option<Bound<'py, PyCapsule>>,
923881
) -> PyDataFusionResult<Bound<'py, PyCapsule>> {
924-
let rt = &get_tokio_runtime().0;
925-
let df = self.df.as_ref().clone();
926-
let stream = spawn_stream(py, async move { df.execute_stream().await })?;
927-
882+
let mut batches = wait_for_future(py, self.df.as_ref().clone().collect())??;
928883
let mut schema: Schema = self.df.schema().to_owned().into();
929-
let mut projection: Option<SchemaRef> = None;
930884

931885
if let Some(schema_capsule) = requested_schema {
932886
validate_pycapsule(&schema_capsule, "arrow_schema")?;
@@ -935,17 +889,16 @@ impl PyDataFrame {
935889
let desired_schema = Schema::try_from(schema_ptr)?;
936890

937891
schema = project_schema(schema, desired_schema)?;
938-
projection = Some(Arc::new(schema.clone()));
892+
893+
batches = batches
894+
.into_iter()
895+
.map(|record_batch| record_batch_into_schema(record_batch, &schema))
896+
.collect::<Result<Vec<RecordBatch>, ArrowError>>()?;
939897
}
940898

941-
let schema_ref = projection.clone().unwrap_or_else(|| Arc::new(schema));
899+
let batches_wrapped = batches.into_iter().map(Ok);
942900

943-
let reader = DataFrameStreamReader {
944-
stream,
945-
runtime: rt.handle().clone(),
946-
schema: schema_ref,
947-
projection,
948-
};
901+
let reader = RecordBatchIterator::new(batches_wrapped, Arc::new(schema));
949902
let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);
950903

951904
let ffi_stream = FFI_ArrowArrayStream::new(reader);
@@ -954,8 +907,12 @@ impl PyDataFrame {
954907
}
955908

956909
fn execute_stream(&self, py: Python) -> PyDataFusionResult<PyRecordBatchStream> {
910+
// create a Tokio runtime to run the async code
911+
let rt = &get_tokio_runtime().0;
957912
let df = self.df.as_ref().clone();
958-
let stream = spawn_stream(py, async move { df.execute_stream().await })?;
913+
let fut: JoinHandle<datafusion::common::Result<SendableRecordBatchStream>> =
914+
rt.spawn(async move { df.execute_stream().await });
915+
let stream = wait_for_future(py, async { fut.await.map_err(to_datafusion_err) })???;
959916
Ok(PyRecordBatchStream::new(stream))
960917
}
961918

src/utils.rs

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,16 @@
1717

1818
use crate::{
1919
common::data_type::PyScalarValue,
20-
errors::{to_datafusion_err, PyDataFusionError, PyDataFusionResult},
20+
errors::{PyDataFusionError, PyDataFusionResult},
2121
TokioRuntime,
2222
};
2323
use datafusion::{
24-
common::ScalarValue, execution::context::SessionContext, execution::SendableRecordBatchStream,
25-
logical_expr::Volatility,
24+
common::ScalarValue, execution::context::SessionContext, logical_expr::Volatility,
2625
};
2726
use pyo3::prelude::*;
2827
use pyo3::{exceptions::PyValueError, types::PyCapsule};
2928
use std::{future::Future, sync::OnceLock, time::Duration};
30-
use tokio::{runtime::Runtime, task::JoinHandle, time::sleep};
29+
use tokio::{runtime::Runtime, time::sleep};
3130
/// Utility to get the Tokio Runtime from Python
3231
#[inline]
3332
pub(crate) fn get_tokio_runtime() -> &'static TokioRuntime {
@@ -85,17 +84,6 @@ where
8584
})
8685
}
8786

88-
/// Spawn a [`SendableRecordBatchStream`] on the Tokio runtime and wait for completion
89-
/// while respecting Python signal handling.
90-
pub(crate) fn spawn_stream<F>(py: Python, fut: F) -> PyDataFusionResult<SendableRecordBatchStream>
91-
where
92-
F: Future<Output = datafusion::common::Result<SendableRecordBatchStream>> + Send + 'static,
93-
{
94-
let rt = &get_tokio_runtime().0;
95-
let handle: JoinHandle<datafusion::common::Result<SendableRecordBatchStream>> = rt.spawn(fut);
96-
wait_for_future(py, async { handle.await.map_err(to_datafusion_err) })???
97-
}
98-
9987
pub(crate) fn parse_volatility(value: &str) -> PyDataFusionResult<Volatility> {
10088
Ok(match value {
10189
"immutable" => Volatility::Immutable,

0 commit comments

Comments
 (0)