|
16 | 16 | // under the License. |
17 | 17 |
|
18 | 18 | use std::collections::HashMap; |
19 | | -use std::ffi::{c_void, CString}; |
| 19 | +use std::ffi::CString; |
20 | 20 | use std::sync::Arc; |
21 | 21 |
|
22 | 22 | use arrow::array::{new_null_array, RecordBatch, RecordBatchReader}; |
@@ -58,6 +58,8 @@ use crate::{ |
58 | 58 | expr::{sort_expr::PySortExpr, PyExpr}, |
59 | 59 | }; |
60 | 60 |
|
| 61 | +const ARROW_STREAM_NAME: &str = "arrow_array_stream"; |
| 62 | + |
61 | 63 | // https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116 |
62 | 64 | // - we have not decided on the table_provider approach yet |
63 | 65 | // this is an interim implementation |
@@ -952,26 +954,10 @@ impl PyDataFrame { |
952 | 954 | }; |
953 | 955 | let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader); |
954 | 956 |
|
955 | | - let stream = Box::new(FFI_ArrowArrayStream::new(reader)); |
956 | | - let stream_ptr = Box::into_raw(stream); |
957 | | - assert!( |
958 | | - !stream_ptr.is_null(), |
959 | | - "ArrowArrayStream pointer should never be null" |
960 | | - ); |
961 | | - let stream_capsule_name = CString::new("arrow_array_stream").unwrap(); |
962 | | - unsafe { |
963 | | - PyCapsule::new_bound_with_destructor( |
964 | | - py, |
965 | | - stream_ptr, |
966 | | - Some(stream_capsule_name), |
967 | | - |ptr: *mut FFI_ArrowArrayStream, _| { |
968 | | - if !ptr.is_null() { |
969 | | - unsafe { Box::from_raw(ptr) }; |
970 | | - } |
971 | | - }, |
972 | | - ) |
973 | | - } |
974 | | - .map_err(PyDataFusionError::from) |
| 957 | + let stream = FFI_ArrowArrayStream::new(reader); |
| 958 | + let name = CString::new(ARROW_STREAM_NAME).unwrap(); |
| 959 | + let capsule = PyCapsule::new(py, stream, Some(name)).map_err(PyDataFusionError::from)?; |
| 960 | + Ok(capsule) |
975 | 961 | } |
976 | 962 |
|
977 | 963 | fn execute_stream(&self, py: Python) -> PyDataFusionResult<PyRecordBatchStream> { |
|
0 commit comments