Skip to content

Commit 358fe30

Browse files
committed
test: add tests for converting Arrow stream to Python list, dict, and pandas DataFrame
1 parent b2ec87b commit 358fe30

2 files changed

Lines changed: 51 additions & 1 deletion

File tree

python/tests/test_dataframe.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1795,6 +1795,40 @@ def test_arrow_c_stream_capsule_manual_destructor_noop(ctx):
17951795
gc.collect()
17961796

17971797

1798+
def test_arrow_stream_to_pylist(df):
1799+
capsule = df.__arrow_c_stream__()
1800+
reader = pa.RecordBatchReader._import_from_c_capsule(capsule)
1801+
reader.read_all()
1802+
1803+
pylist = df.to_pylist()
1804+
assert pylist == [
1805+
{"a": 1, "b": 4, "c": 8},
1806+
{"a": 2, "b": 5, "c": 5},
1807+
{"a": 3, "b": 6, "c": 8},
1808+
]
1809+
1810+
1811+
def test_arrow_stream_to_pydict(df):
1812+
capsule = df.__arrow_c_stream__()
1813+
reader = pa.RecordBatchReader._import_from_c_capsule(capsule)
1814+
reader.read_all()
1815+
1816+
pydict = df.to_pydict()
1817+
assert pydict == {"a": [1, 2, 3], "b": [4, 5, 6], "c": [8, 5, 8]}
1818+
1819+
1820+
def test_arrow_stream_to_pandas(df):
1821+
pd = pytest.importorskip("pandas")
1822+
1823+
capsule = df.__arrow_c_stream__()
1824+
reader = pa.RecordBatchReader._import_from_c_capsule(capsule)
1825+
reader.read_all()
1826+
1827+
pdf = df.to_pandas()
1828+
assert isinstance(pdf, pd.DataFrame)
1829+
assert pdf.shape == (3, 3)
1830+
1831+
17981832
def test_to_pylist(df):
17991833
# Convert datafusion dataframe to Python list
18001834
pylist = df.to_pylist()

src/dataframe.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use std::collections::HashMap;
1919
use std::ffi::CString;
20+
use std::os::raw::c_void;
2021
use std::sync::Arc;
2122

2223
use arrow::array::{new_null_array, RecordBatch, RecordBatchReader};
@@ -963,8 +964,23 @@ impl PyDataFrame {
963964
let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);
964965

965966
let stream = FFI_ArrowArrayStream::new(reader);
967+
fn drop_stream(ptr: *mut FFI_ArrowArrayStream, _ctx: *mut c_void) {
968+
if ptr.is_null() {
969+
return;
970+
}
971+
unsafe {
972+
drop(Box::from_raw(ptr));
973+
}
974+
}
975+
let stream_ptr = Box::into_raw(Box::new(stream));
966976
let name = CString::new("arrow_array_stream").unwrap();
967-
let capsule = PyCapsule::new(py, stream, Some(name)).map_err(py_datafusion_err)?;
977+
let capsule = PyCapsule::new_bound_with_destructor(
978+
py,
979+
stream_ptr,
980+
Some(name),
981+
drop_stream as fn(*mut FFI_ArrowArrayStream, *mut c_void),
982+
)
983+
.map_err(py_datafusion_err)?;
968984
Ok(capsule)
969985
}
970986

0 commit comments

Comments
 (0)