Skip to content

Commit 3b22836

Browse files
committed
rebase_main
1 parent ddb7193 commit 3b22836

13 files changed

Lines changed: 353 additions & 341 deletions

Cargo.lock

Lines changed: 199 additions & 210 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/ffi/README.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,31 @@ with your library and then to connect it to DataFusion using this crate.
7373
Alternatively, you could use [bindgen] to interface directly to the [FFI] provided
7474
by this crate, but that is currently not supported.
7575

76+
## Stabby Usage
77+
78+
This crate uses [stabby] for ABI-stable types like `stabby::string::String` and
79+
`stabby::vec::Vec`. We chose stabby because [abi_stable] is no longer actively
80+
maintained.
81+
82+
We intentionally use `#[repr(C)]` for our struct definitions instead of stabby's
83+
`#[stabby::stabby]` macro. The reason is that stabby's `IStable` trait (required
84+
by the macro) demands that all inner types also implement `IStable`. This creates
85+
challenges for our use case:
86+
87+
1. **Arrow types**: Arrow's FFI types like `FFI_ArrowSchema` do not implement
88+
`IStable`, and adding such implementations would be laborious and error-prone.
89+
90+
2. **Self-referential function pointers**: Many of our FFI structs contain
91+
function pointers that reference `&Self`, which complicates `IStable`
92+
implementations.
93+
94+
3. **FFI_Option and FFI_Result**: For similar reasons, we provide our own
95+
`FFI_Option<T>` and `FFI_Result<T, E>` types using `#[repr(C, u8)]` instead
96+
of stabby's `Option` and `Result`, which require inner types to be `IStable`.
97+
98+
This hybrid approach gives us stabby's maintained, ABI-stable collection types
99+
while retaining flexibility for our complex FFI struct layouts.
100+
76101
## FFI Boundary
77102

78103
We expect this crate to be used by both sides of the FFI Boundary. This should
@@ -198,6 +223,7 @@ and it is easy to implement on any struct that implements `Session`.
198223
[rust abi]: https://doc.rust-lang.org/reference/abi.html
199224
[ffi]: https://doc.rust-lang.org/nomicon/ffi.html
200225
[stabby]: https://crates.io/crates/stabby
226+
[abi_stable]: https://crates.io/crates/abi_stable
201227
[async-ffi]: https://crates.io/crates/async-ffi
202228
[bindgen]: https://crates.io/crates/bindgen
203229
[`datafusion-python`]: https://datafusion.apache.org/python/

datafusion/ffi/src/session/mod.rs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,6 @@ use std::collections::HashMap;
2020
use std::ffi::c_void;
2121
use std::sync::Arc;
2222

23-
use crate::arrow_wrappers::WrappedSchema;
24-
use crate::execution::FFI_TaskContext;
25-
use crate::execution_plan::FFI_ExecutionPlan;
26-
use crate::physical_expr::FFI_PhysicalExpr;
27-
use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
28-
use crate::session::config::FFI_SessionConfig;
29-
use crate::udaf::FFI_AggregateUDF;
30-
use crate::udf::FFI_ScalarUDF;
31-
use crate::udwf::FFI_WindowUDF;
32-
use crate::util::FFIResult;
33-
use crate::{df_result, rresult, rresult_return};
34-
use abi_stable::StableAbi;
35-
use abi_stable::std_types::{RHashMap, RResult, RStr, RString, RVec};
3623
use arrow_schema::SchemaRef;
3724
use arrow_schema::ffi::FFI_ArrowSchema;
3825
use async_ffi::{FfiFuture, FutureExt};
@@ -63,6 +50,18 @@ use stabby::string::String as SString;
6350
use stabby::vec::Vec as SVec;
6451
use tokio::runtime::Handle;
6552

53+
use crate::arrow_wrappers::WrappedSchema;
54+
use crate::execution::FFI_TaskContext;
55+
use crate::execution_plan::FFI_ExecutionPlan;
56+
use crate::physical_expr::FFI_PhysicalExpr;
57+
use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
58+
use crate::session::config::FFI_SessionConfig;
59+
use crate::udaf::FFI_AggregateUDF;
60+
use crate::udf::FFI_ScalarUDF;
61+
use crate::udwf::FFI_WindowUDF;
62+
use crate::util::{FFI_Result, FFIResult};
63+
use crate::{df_result, sresult, sresult_return};
64+
6665
pub mod config;
6766

6867
/// A stable struct for sharing [`Session`] across FFI boundaries.
@@ -639,9 +638,11 @@ impl Session for ForeignSession {
639638

640639
#[cfg(test)]
641640
mod tests {
641+
use std::sync::Arc;
642642

643643
use arrow_schema::{DataType, Field, Schema};
644644
use datafusion::execution::SessionStateBuilder;
645+
use datafusion_common::DataFusionError;
645646
use datafusion_expr::col;
646647
use datafusion_expr::registry::FunctionRegistry;
647648
use datafusion_proto::logical_plan::DefaultLogicalExtensionCodec;

datafusion/ffi/src/udtf.rs

Lines changed: 6 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -19,27 +19,23 @@ use std::any::Any;
1919
use std::ffi::c_void;
2020
use std::sync::Arc;
2121

22-
use abi_stable::StableAbi;
23-
use abi_stable::std_types::{RResult, RVec};
24-
use datafusion_catalog::{TableFunctionArgs, TableFunctionImpl, TableProvider};
25-
use datafusion_common::DataFusionError;
22+
use datafusion_catalog::{TableFunctionImpl, TableProvider};
2623
use datafusion_common::error::Result;
2724
use datafusion_execution::TaskContext;
25+
use datafusion_expr::Expr;
2826
use datafusion_proto::logical_plan::from_proto::parse_exprs;
2927
use datafusion_proto::logical_plan::to_proto::serialize_exprs;
3028
use datafusion_proto::logical_plan::{
3129
DefaultLogicalExtensionCodec, LogicalExtensionCodec,
3230
};
3331
use datafusion_proto::protobuf::LogicalExprList;
34-
use datafusion_session::Session;
3532
use prost::Message;
3633

3734
use stabby::vec::Vec as SVec;
3835
use tokio::runtime::Handle;
3936

4037
use crate::execution::FFI_TaskContextProvider;
4138
use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
42-
use crate::session::{FFI_SessionRef, ForeignSession};
4339
use crate::table_provider::FFI_TableProvider;
4440
use crate::util::{FFI_Result, FFIResult};
4541
use crate::{df_result, sresult_return};
@@ -48,22 +44,11 @@ use crate::{df_result, sresult_return};
4844
#[repr(C)]
4945
#[derive(Debug)]
5046
pub struct FFI_TableFunction {
51-
/// Equivalent to the [`TableFunctionImpl::call`].
47+
/// Equivalent to the `call` function of the TableFunctionImpl.
5248
/// The arguments are Expr passed as protobuf encoded bytes.
53-
#[deprecated(
54-
since = "53.0.0",
55-
note = "See TableFunctionImpl::call deprecation note"
56-
)]
5749
pub call:
5850
unsafe extern "C" fn(udtf: &Self, args: SVec<u8>) -> FFIResult<FFI_TableProvider>,
5951

60-
/// Equivalent to the [`TableFunctionImpl::call_with_args`].
61-
call_with_args: unsafe extern "C" fn(
62-
udtf: &Self,
63-
args: RVec<u8>,
64-
session: FFI_SessionRef,
65-
) -> FFIResult<FFI_TableProvider>,
66-
6752
pub logical_codec: FFI_LogicalExtensionCodec,
6853

6954
/// Used to create a clone on the provider of the udtf. This should
@@ -131,47 +116,6 @@ unsafe extern "C" fn call_fn_wrapper(
131116
))
132117
}
133118

134-
unsafe extern "C" fn call_with_args_wrapper(
135-
udtf: &FFI_TableFunction,
136-
args: RVec<u8>,
137-
session: FFI_SessionRef,
138-
) -> FFIResult<FFI_TableProvider> {
139-
let runtime = udtf.runtime();
140-
let udtf_inner = udtf.inner();
141-
142-
let ctx: Arc<TaskContext> =
143-
rresult_return!((&udtf.logical_codec.task_ctx_provider).try_into());
144-
let codec: Arc<dyn LogicalExtensionCodec> = (&udtf.logical_codec).into();
145-
146-
let proto_filters = rresult_return!(LogicalExprList::decode(args.as_ref()));
147-
148-
let args = rresult_return!(parse_exprs(
149-
proto_filters.expr.iter(),
150-
ctx.as_ref(),
151-
codec.as_ref()
152-
));
153-
154-
let mut foreign_session = None;
155-
let session = rresult_return!(
156-
session
157-
.as_local()
158-
.map(Ok::<&(dyn Session + Send + Sync), DataFusionError>)
159-
.unwrap_or_else(|| {
160-
foreign_session = Some(ForeignSession::try_from(&session)?);
161-
Ok(foreign_session.as_ref().unwrap())
162-
})
163-
);
164-
let table_provider = rresult_return!(
165-
udtf_inner.call_with_args(TableFunctionArgs::new(&args, session))
166-
);
167-
RResult::ROk(FFI_TableProvider::new_with_ffi_codec(
168-
table_provider,
169-
false,
170-
runtime,
171-
udtf.logical_codec.clone(),
172-
))
173-
}
174-
175119
unsafe extern "C" fn release_fn_wrapper(udtf: &mut FFI_TableFunction) {
176120
unsafe {
177121
debug_assert!(!udtf.private_data.is_null());
@@ -232,9 +176,7 @@ impl FFI_TableFunction {
232176
let private_data = Box::new(TableFunctionPrivateData { udtf, runtime });
233177

234178
Self {
235-
#[expect(deprecated)]
236179
call: call_fn_wrapper,
237-
call_with_args: call_with_args_wrapper,
238180
logical_codec,
239181
clone: clone_fn_wrapper,
240182
release: release_fn_wrapper,
@@ -273,35 +215,13 @@ impl From<FFI_TableFunction> for Arc<dyn TableFunctionImpl> {
273215
}
274216

275217
impl TableFunctionImpl for ForeignTableFunction {
276-
fn call_with_args(&self, args: TableFunctionArgs) -> Result<Arc<dyn TableProvider>> {
277-
let session = FFI_SessionRef::new(
278-
args.session(),
279-
self.0.runtime(),
280-
self.0.logical_codec.clone(),
281-
);
282-
let codec: Arc<dyn LogicalExtensionCodec> = (&self.0.logical_codec).into();
283-
let expr_list = LogicalExprList {
284-
expr: serialize_exprs(args.exprs(), codec.as_ref())?,
285-
};
286-
let filters_serialized = expr_list.encode_to_vec().into();
287-
288-
let table_provider =
289-
unsafe { (self.0.call_with_args)(&self.0, filters_serialized, session) };
290-
291-
let table_provider = df_result!(table_provider)?;
292-
let table_provider: Arc<dyn TableProvider> = (&table_provider).into();
293-
294-
Ok(table_provider)
295-
}
296-
297-
fn call(&self, args: &[datafusion_expr::Expr]) -> Result<Arc<dyn TableProvider>> {
218+
fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
298219
let codec: Arc<dyn LogicalExtensionCodec> = (&self.0.logical_codec).into();
299220
let expr_list = LogicalExprList {
300221
expr: serialize_exprs(args, codec.as_ref())?,
301222
};
302223
let filters_serialized = expr_list.encode_to_vec().into_iter().collect();
303224

304-
#[expect(deprecated)]
305225
let table_provider = unsafe { (self.0.call)(&self.0, filters_serialized) };
306226

307227
let table_provider = df_result!(table_provider)?;
@@ -322,22 +242,16 @@ mod tests {
322242
use datafusion::logical_expr::ptr_eq::arc_ptr_eq;
323243
use datafusion::prelude::{SessionContext, lit};
324244
use datafusion::scalar::ScalarValue;
325-
use datafusion_catalog::TableFunctionArgs;
326245
use datafusion_execution::TaskContextProvider;
327-
use datafusion_expr::Expr;
328246

329247
use super::*;
330248

331249
#[derive(Debug)]
332250
struct TestUDTF {}
333251

334252
impl TableFunctionImpl for TestUDTF {
335-
fn call_with_args(
336-
&self,
337-
args: TableFunctionArgs,
338-
) -> Result<Arc<dyn TableProvider>> {
253+
fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
339254
let args = args
340-
.exprs()
341255
.iter()
342256
.map(|arg| {
343257
if let Expr::Literal(scalar, _) = arg {
@@ -427,10 +341,7 @@ mod tests {
427341

428342
let foreign_udf: Arc<dyn TableFunctionImpl> = local_udtf.into();
429343

430-
let table = foreign_udf.call_with_args(TableFunctionArgs::new(
431-
&[lit(6_u64), lit("one"), lit(2.0), lit(3_u64)],
432-
&ctx.state(),
433-
))?;
344+
let table = foreign_udf.call(&[lit(6_u64), lit("one"), lit(2.0), lit(3_u64)])?;
434345

435346
let _ = ctx.register_table("test-table", table)?;
436347

datafusion/ffi/tests/ffi_catalog.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ mod tests {
2424
use std::sync::Arc;
2525

2626
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
27+
use datafusion_common::DataFusionError;
2728
use datafusion_ffi::tests::utils::get_module;
2829

2930
#[tokio::test]

datafusion/ffi/tests/ffi_config.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
/// when the feature integration-tests is built
2020
#[cfg(feature = "integration-tests")]
2121
mod tests {
22-
use datafusion::error::Result;
22+
use datafusion::error::{DataFusionError, Result};
2323
use datafusion_common::ScalarValue;
2424
use datafusion_common::config::{ConfigOptions, TableOptions};
2525
use datafusion_execution::config::SessionConfig;
@@ -31,7 +31,13 @@ mod tests {
3131
fn test_ffi_config_options_extension() -> Result<()> {
3232
let module = get_module()?;
3333

34-
let extension_options = (module.create_extension_options)();
34+
let extension_options =
35+
module
36+
.create_extension_options()
37+
.ok_or(DataFusionError::NotImplemented(
38+
"External test library failed to implement create_extension_options"
39+
.to_string(),
40+
))?();
3541

3642
let mut config = ConfigOptions::new();
3743
config.extensions.insert(extension_options);
@@ -55,7 +61,13 @@ mod tests {
5561
fn test_ffi_table_options_extension() -> Result<()> {
5662
let module = get_module()?;
5763

58-
let extension_options = (module.create_extension_options)();
64+
let extension_options =
65+
module
66+
.create_extension_options()
67+
.ok_or(DataFusionError::NotImplemented(
68+
"External test library failed to implement create_extension_options"
69+
.to_string(),
70+
))?();
5971

6072
let mut table_options = TableOptions::new();
6173
table_options.extensions.insert(extension_options);
@@ -80,7 +92,13 @@ mod tests {
8092
fn test_ffi_session_config_options_extension() -> Result<()> {
8193
let module = get_module()?;
8294

83-
let extension_options = (module.create_extension_options)();
95+
let extension_options =
96+
module
97+
.create_extension_options()
98+
.ok_or(DataFusionError::NotImplemented(
99+
"External test library failed to implement create_extension_options"
100+
.to_string(),
101+
))?();
84102

85103
let mut config = SessionConfig::new().with_option_extension(extension_options);
86104

datafusion/ffi/tests/ffi_execution_plan.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,12 @@ mod tests {
4949
Arc::new(EmptyExec::new(schema))
5050
}
5151

52-
let child_plan = (module.create_empty_exec)();
52+
let child_plan =
53+
module
54+
.create_empty_exec()
55+
.ok_or(DataFusionError::NotImplemented(
56+
"External module failed to implement create_empty_exec".to_string(),
57+
))?();
5358
let child_plan: Arc<dyn ExecutionPlan> = (&child_plan)
5459
.try_into()
5560
.expect("should be able create plan");

datafusion/ffi/tests/ffi_integration.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ mod tests {
2626

2727
use arrow::datatypes::Schema;
2828
use datafusion::catalog::{TableProvider, TableProviderFactory};
29-
use datafusion::error::Result;
29+
use datafusion::error::{DataFusionError, Result};
3030
use datafusion_common::TableReference;
3131
use datafusion_common::ToDFSchema;
3232
use datafusion_expr::CreateExternalTable;
@@ -42,7 +42,11 @@ mod tests {
4242

4343
// By calling the code below, the table provided will be created within
4444
// the module's code.
45-
let ffi_table_provider = (table_provider_module.create_table)(synchronous, codec);
45+
let ffi_table_provider = table_provider_module.create_table().ok_or(
46+
DataFusionError::NotImplemented(
47+
"External table provider failed to implement create_table".to_string(),
48+
),
49+
)?(synchronous, codec);
4650

4751
// In order to access the table provider within this executable, we need to
4852
// turn it into a `TableProvider`.
@@ -76,8 +80,11 @@ mod tests {
7680
let table_provider_module = get_module()?;
7781
let (ctx, codec) = super::utils::ctx_and_codec();
7882

79-
let ffi_table_provider_factory =
80-
(table_provider_module.create_table_factory)(codec);
83+
let ffi_table_provider_factory = table_provider_module
84+
.create_table_factory()
85+
.ok_or(DataFusionError::NotImplemented(
86+
"External table provider factory failed to implement create".to_string(),
87+
))?(codec);
8188

8289
let foreign_table_provider_factory: Arc<dyn TableProviderFactory> =
8390
(&ffi_table_provider_factory).into();

0 commit comments

Comments
 (0)