Skip to content

Commit 1aa4823

Browse files
committed
rebase_main
1 parent 287ef29 commit 1aa4823

2 files changed

Lines changed: 93 additions & 8 deletions

File tree

datafusion/ffi/src/udtf.rs

Lines changed: 93 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,24 @@ use std::any::Any;
1919
use std::ffi::c_void;
2020
use std::sync::Arc;
2121

22-
use datafusion_catalog::{TableFunctionImpl, TableProvider};
22+
use datafusion_catalog::{TableFunctionArgs, TableFunctionImpl, TableProvider};
23+
use datafusion_common::DataFusionError;
2324
use datafusion_common::error::Result;
2425
use datafusion_execution::TaskContext;
25-
use datafusion_expr::Expr;
2626
use datafusion_proto::logical_plan::from_proto::parse_exprs;
2727
use datafusion_proto::logical_plan::to_proto::serialize_exprs;
2828
use datafusion_proto::logical_plan::{
2929
DefaultLogicalExtensionCodec, LogicalExtensionCodec,
3030
};
3131
use datafusion_proto::protobuf::LogicalExprList;
32+
use datafusion_session::Session;
3233
use prost::Message;
33-
3434
use stabby::vec::Vec as SVec;
3535
use tokio::runtime::Handle;
3636

3737
use crate::execution::FFI_TaskContextProvider;
3838
use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
39+
use crate::session::{FFI_SessionRef, ForeignSession};
3940
use crate::table_provider::FFI_TableProvider;
4041
use crate::util::FFIResult;
4142
use crate::{df_result, sresult_return};
@@ -44,11 +45,22 @@ use crate::{df_result, sresult_return};
4445
#[repr(C)]
4546
#[derive(Debug)]
4647
pub struct FFI_TableFunction {
47-
/// Equivalent to the `call` function of the TableFunctionImpl.
48+
/// Equivalent to the [`TableFunctionImpl::call`].
4849
/// The arguments are Expr passed as protobuf encoded bytes.
50+
#[deprecated(
51+
since = "53.0.0",
52+
note = "See TableFunctionImpl::call deprecation note"
53+
)]
4954
pub call:
5055
unsafe extern "C" fn(udtf: &Self, args: SVec<u8>) -> FFIResult<FFI_TableProvider>,
5156

57+
/// Equivalent to the [`TableFunctionImpl::call_with_args`].
58+
call_with_args: unsafe extern "C" fn(
59+
udtf: &Self,
60+
args: SVec<u8>,
61+
session: FFI_SessionRef,
62+
) -> FFIResult<FFI_TableProvider>,
63+
5264
pub logical_codec: FFI_LogicalExtensionCodec,
5365

5466
/// Used to create a clone on the provider of the udtf. This should
@@ -117,6 +129,47 @@ unsafe extern "C" fn call_fn_wrapper(
117129
))
118130
}
119131

132+
unsafe extern "C" fn call_with_args_wrapper(
133+
udtf: &FFI_TableFunction,
134+
args: SVec<u8>,
135+
session: FFI_SessionRef,
136+
) -> FFIResult<FFI_TableProvider> {
137+
let runtime = udtf.runtime();
138+
let udtf_inner = udtf.inner();
139+
140+
let ctx: Arc<TaskContext> =
141+
sresult_return!((&udtf.logical_codec.task_ctx_provider).try_into());
142+
let codec: Arc<dyn LogicalExtensionCodec> = (&udtf.logical_codec).into();
143+
144+
let proto_filters = sresult_return!(LogicalExprList::decode(args.as_ref()));
145+
146+
let args = sresult_return!(parse_exprs(
147+
proto_filters.expr.iter(),
148+
ctx.as_ref(),
149+
codec.as_ref()
150+
));
151+
152+
let mut foreign_session = None;
153+
let session = sresult_return!(
154+
session
155+
.as_local()
156+
.map(Ok::<&(dyn Session + Send + Sync), DataFusionError>)
157+
.unwrap_or_else(|| {
158+
foreign_session = Some(ForeignSession::try_from(&session)?);
159+
Ok(foreign_session.as_ref().unwrap())
160+
})
161+
);
162+
let table_provider = sresult_return!(
163+
udtf_inner.call_with_args(TableFunctionArgs::new(&args, session))
164+
);
165+
FFIResult::Ok(FFI_TableProvider::new_with_ffi_codec(
166+
table_provider,
167+
false,
168+
runtime,
169+
udtf.logical_codec.clone(),
170+
))
171+
}
172+
120173
unsafe extern "C" fn release_fn_wrapper(udtf: &mut FFI_TableFunction) {
121174
unsafe {
122175
debug_assert!(!udtf.private_data.is_null());
@@ -177,7 +230,9 @@ impl FFI_TableFunction {
177230
let private_data = Box::new(TableFunctionPrivateData { udtf, runtime });
178231

179232
Self {
233+
#[expect(deprecated)]
180234
call: call_fn_wrapper,
235+
call_with_args: call_with_args_wrapper,
181236
logical_codec,
182237
clone: clone_fn_wrapper,
183238
release: release_fn_wrapper,
@@ -216,13 +271,35 @@ impl From<FFI_TableFunction> for Arc<dyn TableFunctionImpl> {
216271
}
217272

218273
impl TableFunctionImpl for ForeignTableFunction {
219-
fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
274+
fn call_with_args(&self, args: TableFunctionArgs) -> Result<Arc<dyn TableProvider>> {
275+
let session = FFI_SessionRef::new(
276+
args.session(),
277+
self.0.runtime(),
278+
self.0.logical_codec.clone(),
279+
);
280+
let codec: Arc<dyn LogicalExtensionCodec> = (&self.0.logical_codec).into();
281+
let expr_list = LogicalExprList {
282+
expr: serialize_exprs(args.exprs(), codec.as_ref())?,
283+
};
284+
let filters_serialized = expr_list.encode_to_vec().into_iter().collect();
285+
286+
let table_provider =
287+
unsafe { (self.0.call_with_args)(&self.0, filters_serialized, session) };
288+
289+
let table_provider = df_result!(table_provider)?;
290+
let table_provider: Arc<dyn TableProvider> = (&table_provider).into();
291+
292+
Ok(table_provider)
293+
}
294+
295+
fn call(&self, args: &[datafusion_expr::Expr]) -> Result<Arc<dyn TableProvider>> {
220296
let codec: Arc<dyn LogicalExtensionCodec> = (&self.0.logical_codec).into();
221297
let expr_list = LogicalExprList {
222298
expr: serialize_exprs(args, codec.as_ref())?,
223299
};
224300
let filters_serialized = expr_list.encode_to_vec().into_iter().collect();
225301

302+
#[expect(deprecated)]
226303
let table_provider = unsafe { (self.0.call)(&self.0, filters_serialized) };
227304

228305
let table_provider = df_result!(table_provider)?;
@@ -243,16 +320,22 @@ mod tests {
243320
use datafusion::logical_expr::ptr_eq::arc_ptr_eq;
244321
use datafusion::prelude::{SessionContext, lit};
245322
use datafusion::scalar::ScalarValue;
323+
use datafusion_catalog::TableFunctionArgs;
246324
use datafusion_execution::TaskContextProvider;
325+
use datafusion_expr::Expr;
247326

248327
use super::*;
249328

250329
#[derive(Debug)]
251330
struct TestUDTF {}
252331

253332
impl TableFunctionImpl for TestUDTF {
254-
fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
333+
fn call_with_args(
334+
&self,
335+
args: TableFunctionArgs,
336+
) -> Result<Arc<dyn TableProvider>> {
255337
let args = args
338+
.exprs()
256339
.iter()
257340
.map(|arg| {
258341
if let Expr::Literal(scalar, _) = arg {
@@ -342,7 +425,10 @@ mod tests {
342425

343426
let foreign_udf: Arc<dyn TableFunctionImpl> = local_udtf.into();
344427

345-
let table = foreign_udf.call(&[lit(6_u64), lit("one"), lit(2.0), lit(3_u64)])?;
428+
let table = foreign_udf.call_with_args(TableFunctionArgs::new(
429+
&[lit(6_u64), lit("one"), lit(2.0), lit(3_u64)],
430+
&ctx.state(),
431+
))?;
346432

347433
let _ = ctx.register_table("test-table", table)?;
348434

datafusion/ffi/tests/ffi_catalog.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ mod tests {
2424
use std::sync::Arc;
2525

2626
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
27-
use datafusion_common::DataFusionError;
2827
use datafusion_ffi::tests::utils::get_module;
2928

3029
#[tokio::test]

0 commit comments

Comments
 (0)