@@ -19,14 +19,14 @@ use std::collections::HashMap;
1919use std:: ffi:: CString ;
2020use std:: sync:: Arc ;
2121
22- use arrow:: array:: { new_null_array, RecordBatch , RecordBatchReader } ;
22+ use arrow:: array:: { new_null_array, RecordBatch , RecordBatchIterator , RecordBatchReader } ;
2323use arrow:: compute:: can_cast_types;
2424use arrow:: error:: ArrowError ;
2525use arrow:: ffi:: FFI_ArrowSchema ;
2626use arrow:: ffi_stream:: FFI_ArrowArrayStream ;
2727use arrow:: pyarrow:: FromPyArrow ;
2828use datafusion:: arrow:: datatypes:: { Schema , SchemaRef } ;
29- use datafusion:: arrow:: pyarrow:: { PyArrowType , ToPyArrow } ;
29+ use datafusion:: arrow:: pyarrow:: { IntoPyArrow , PyArrowType , ToPyArrow } ;
3030use datafusion:: arrow:: util:: pretty;
3131use datafusion:: common:: UnnestOptions ;
3232use datafusion:: config:: { CsvOptions , ParquetColumnOptions , ParquetOptions , TableParquetOptions } ;
@@ -918,15 +918,31 @@ impl PyDataFrame {
918918 }
919919
920920 /// Convert to Arrow Table
921- /// Collect the batches and pass to Arrow Table
921+ ///
922+ /// Collect [`RecordBatch`]es in Rust and stream them to PyArrow
923+ /// to construct a `pyarrow.Table` without creating intermediate
924+ /// Python `RecordBatch` objects. The `RecordBatchIterator` owns the
925+ /// `RecordBatch`es so the underlying buffers live long enough for
926+ /// PyArrow to consume them, avoiding use-after-free or double-free
927+ /// issues.
922928 fn to_arrow_table ( & self , py : Python < ' _ > ) -> PyResult < PyObject > {
923- let batches = self . collect ( py) ?. into_pyobject ( py) ?;
924- let schema = self . schema ( ) . into_pyobject ( py) ?;
929+ // Collect the batches on the Rust side
930+ let df = self . df . as_ref ( ) . clone ( ) ;
931+ let batches = wait_for_future ( py, df. collect ( ) ) ?. map_err ( PyDataFusionError :: from) ?;
932+
933+ // Build a RecordBatchReader owning the batches. PyArrow will
934+ // read from this stream and build the Table in C++.
935+ let schema = batches
936+ . first ( )
937+ . map ( |b| b. schema ( ) )
938+ . unwrap_or_else ( || Arc :: new ( Schema :: empty ( ) ) ) ;
939+ let reader = RecordBatchIterator :: new ( batches. into_iter ( ) . map ( Ok ) , schema) ;
940+ let reader: Box < dyn RecordBatchReader + Send > = Box :: new ( reader) ;
941+ let py_reader = reader. into_pyarrow ( py) ?;
925942
926- // Instantiate pyarrow Table object and use its from_batches method
927- let table_class = py. import ( "pyarrow" ) ?. getattr ( "Table" ) ?;
928- let args = PyTuple :: new ( py, & [ batches, schema] ) ?;
929- let table: PyObject = table_class. call_method1 ( "from_batches" , args) ?. into ( ) ;
943+ // `read_all` constructs the pyarrow.Table directly from the
944+ // stream without materializing Python RecordBatch objects.
945+ let table = py_reader. call_method0 ( py, "read_all" ) ?;
930946 Ok ( table)
931947 }
932948
0 commit comments