Skip to content

Commit bdbf8c1

Browse files
committed
feat: add Arrow FFI support for Python object conversions and remove pyarrow_util module
1 parent ed65511 commit bdbf8c1

5 files changed

Lines changed: 127 additions & 64 deletions

File tree

src/arrow_ffi.rs

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Conversions between Python objects exposing Arrow's C Data Interface
19+
//! and DataFusion types.
20+
21+
use std::ptr::addr_of;
22+
23+
use arrow::array::{make_array, Array, ArrayData};
24+
use arrow::ffi::{from_ffi, to_ffi, FFI_ArrowArray, FFI_ArrowSchema};
25+
use arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream};
26+
use datafusion::scalar::ScalarValue;
27+
use pyo3::ffi::Py_uintptr_t;
28+
use pyo3::types::{PyAnyMethods, PyCapsule, PyCapsuleMethods, PyTuple};
29+
use pyo3::{Bound, FromPyObject, PyAny, PyObject, PyResult, Python};
30+
31+
use crate::common::data_type::PyScalarValue;
32+
use crate::errors::PyDataFusionError;
33+
use crate::utils::validate_pycapsule;
34+
35+
/// Convert a Python object implementing ``__arrow_c_array__`` into ``ArrayData``.
36+
pub fn array_data_from_py(value: &Bound<'_, PyAny>) -> PyResult<ArrayData> {
37+
if value.hasattr("__arrow_c_array__")? {
38+
let obj = value.getattr("__arrow_c_array__")?.call0()?;
39+
let tuple = obj.downcast::<PyTuple>()?;
40+
41+
let schema_item = tuple.get_item(0)?;
42+
let schema_capsule = schema_item.downcast::<PyCapsule>()?;
43+
let array_item = tuple.get_item(1)?;
44+
let array_capsule = array_item.downcast::<PyCapsule>()?;
45+
46+
validate_pycapsule(schema_capsule, "arrow_schema")?;
47+
validate_pycapsule(array_capsule, "arrow_array")?;
48+
49+
let schema_ptr = unsafe { schema_capsule.reference::<FFI_ArrowSchema>() };
50+
let array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer().cast()) };
51+
unsafe { from_ffi(array, schema_ptr) }
52+
.map_err(|e| PyDataFusionError::Common(e.to_string()).into())
53+
} else {
54+
Err(pyo3::exceptions::PyTypeError::new_err(
55+
"Object does not implement __arrow_c_array__",
56+
))
57+
}
58+
}
59+
60+
/// Convert a Python object implementing ``__arrow_c_stream__`` into an
61+
/// ``ArrowArrayStreamReader``.
62+
pub fn stream_reader_from_py(value: &Bound<'_, PyAny>) -> PyResult<ArrowArrayStreamReader> {
63+
if value.hasattr("__arrow_c_stream__")? {
64+
let capsule = value.getattr("__arrow_c_stream__")?.call0()?;
65+
let capsule = capsule.downcast::<PyCapsule>()?;
66+
validate_pycapsule(capsule, "arrow_array_stream")?;
67+
let stream = unsafe { FFI_ArrowArrayStream::from_raw(capsule.pointer() as _) };
68+
ArrowArrayStreamReader::try_new(stream)
69+
.map_err(|err| pyo3::exceptions::PyValueError::new_err(err.to_string()))
70+
} else {
71+
Err(pyo3::exceptions::PyTypeError::new_err(
72+
"Object does not implement __arrow_c_stream__",
73+
))
74+
}
75+
}
76+
77+
impl PyScalarValue {
78+
/// Construct a ``PyScalarValue`` from any Python object implementing
79+
/// ``__arrow_c_array__``.
80+
pub fn from_arrow_ffi(value: &Bound<'_, PyAny>) -> PyResult<Self> {
81+
let array_data = array_data_from_py(value)?;
82+
let array = make_array(array_data);
83+
let scalar = ScalarValue::try_from_array(&array, 0).map_err(PyDataFusionError::from)?;
84+
Ok(PyScalarValue(scalar))
85+
}
86+
}
87+
88+
impl<'source> FromPyObject<'source> for PyScalarValue {
89+
fn extract_bound(value: &Bound<'source, PyAny>) -> PyResult<Self> {
90+
Self::from_arrow_ffi(value)
91+
}
92+
}
93+
94+
/// Convert a ``ScalarValue`` into a Python ``pyarrow.Scalar`` using the
95+
/// Arrow FFI.
96+
pub fn scalar_to_pyarrow(scalar: &ScalarValue, py: Python) -> PyResult<PyObject> {
97+
let array = scalar.to_array().map_err(PyDataFusionError::from)?;
98+
let data = array.to_data();
99+
let (ffi_array, ffi_schema) =
100+
to_ffi(&data).map_err(|e| PyDataFusionError::Common(e.to_string()))?;
101+
102+
let pa = py.import("pyarrow")?;
103+
let class = pa.getattr("Array")?;
104+
let pyarray = class.call_method1(
105+
"_import_from_c",
106+
(
107+
addr_of!(ffi_array) as Py_uintptr_t,
108+
addr_of!(ffi_schema) as Py_uintptr_t,
109+
),
110+
)?;
111+
let pyscalar = pyarray.call_method1("__getitem__", (0,))?;
112+
Ok(pyscalar.into())
113+
}
114+
115+
/// Helper newtype mirroring Arrow's ``PyArrowType`` for convenience when
116+
/// building Python objects from Rust values.
117+
#[derive(Debug)]
118+
pub struct PyArrowType<T>(pub T);
119+
120+
impl<T> From<T> for PyArrowType<T> {
121+
fn from(v: T) -> Self {
122+
Self(v)
123+
}
124+
}

src/expr.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,14 @@ use datafusion::logical_expr::{
3636
lit, Between, BinaryExpr, Case, Cast, Expr, Like, Operator, TryCast,
3737
};
3838

39+
use crate::arrow_ffi::scalar_to_pyarrow;
3940
use crate::common::data_type::{DataTypeMap, NullTreatment, PyScalarValue, RexType};
4041
use crate::errors::{py_runtime_err, py_type_err, py_unsupported_variant_err, PyDataFusionResult};
4142
use crate::expr::aggregate_expr::PyAggregateFunction;
4243
use crate::expr::binary_expr::PyBinaryExpr;
4344
use crate::expr::column::PyColumn;
4445
use crate::expr::literal::PyLiteral;
4546
use crate::functions::add_builder_fns_to_window;
46-
use crate::pyarrow_util::scalar_to_pyarrow;
4747
use crate::sql::logical::PyLogicalPlan;
4848

4949
use self::alias::PyAlias;

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pub use datafusion_substrait;
3333
pub mod catalog;
3434
pub mod common;
3535

36+
pub mod arrow_ffi;
3637
#[allow(clippy::borrow_deref_ref)]
3738
mod config;
3839
#[allow(clippy::borrow_deref_ref)]
@@ -48,7 +49,6 @@ pub mod expr;
4849
mod functions;
4950
pub mod physical_plan;
5051
mod pyarrow_filter_expression;
51-
pub mod pyarrow_util;
5252
mod record_batch;
5353
pub mod sql;
5454
pub mod store;

src/pyarrow_filter_expression.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ use std::result::Result;
2424
use datafusion::common::{Column, ScalarValue};
2525
use datafusion::logical_expr::{expr::InList, Between, BinaryExpr, Expr, Operator};
2626

27+
use crate::arrow_ffi::scalar_to_pyarrow;
2728
use crate::errors::{PyDataFusionError, PyDataFusionResult};
28-
use crate::pyarrow_util::scalar_to_pyarrow;
2929

3030
#[derive(Debug)]
3131
#[repr(transparent)]

src/pyarrow_util.rs

Lines changed: 0 additions & 61 deletions
This file was deleted.

0 commit comments

Comments
 (0)