|
16 | 16 | // under the License. |
17 | 17 |
|
18 | 18 | use std::collections::HashMap; |
19 | | -use std::ffi::CString; |
| 19 | +use std::ffi::{c_void, CString}; |
20 | 20 | use std::sync::Arc; |
21 | 21 |
|
22 | 22 | use arrow::array::{new_null_array, RecordBatch, RecordBatchReader}; |
@@ -58,8 +58,6 @@ use crate::{ |
58 | 58 | expr::{sort_expr::PySortExpr, PyExpr}, |
59 | 59 | }; |
60 | 60 |
|
61 | | -const ARROW_STREAM_NAME: &str = "arrow_array_stream"; |
62 | | - |
63 | 61 | // https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116 |
64 | 62 | // - we have not decided on the table_provider approach yet |
65 | 63 | // this is an interim implementation |
@@ -954,10 +952,26 @@ impl PyDataFrame { |
954 | 952 | }; |
955 | 953 | let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader); |
956 | 954 |
|
957 | | - let stream = FFI_ArrowArrayStream::new(reader); |
958 | | - let name = CString::new(ARROW_STREAM_NAME).unwrap(); |
959 | | - let capsule = PyCapsule::new(py, stream, Some(name)).map_err(PyDataFusionError::from)?; |
960 | | - Ok(capsule) |
| 955 | + let stream = Box::new(FFI_ArrowArrayStream::new(reader)); |
| 956 | + let stream_ptr = Box::into_raw(stream); |
| 957 | + assert!( |
| 958 | + !stream_ptr.is_null(), |
| 959 | + "ArrowArrayStream pointer should never be null" |
| 960 | + ); |
| 961 | + let stream_capsule_name = CString::new("arrow_array_stream").unwrap(); |
| 962 | + unsafe { |
| 963 | + PyCapsule::new_bound_with_destructor( |
| 964 | + py, |
| 965 | + stream_ptr, |
| 966 | + Some(stream_capsule_name), |
| 967 | + |ptr: *mut FFI_ArrowArrayStream, _| { |
| 968 | + if !ptr.is_null() { |
| 969 | + unsafe { Box::from_raw(ptr) }; |
| 970 | + } |
| 971 | + }, |
| 972 | + ) |
| 973 | + } |
| 974 | + .map_err(PyDataFusionError::from) |
961 | 975 | } |
962 | 976 |
|
963 | 977 | fn execute_stream(&self, py: Python) -> PyDataFusionResult<PyRecordBatchStream> { |
|
0 commit comments