Skip to content

Commit 263003d

Browse files
committed
Set runtime on all descendents within the same library
1 parent a081124 commit 263003d

3 files changed

Lines changed: 188 additions & 23 deletions

File tree

datafusion/ffi/src/execution_plan.rs

Lines changed: 85 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -204,13 +204,67 @@ impl Clone for FFI_ExecutionPlan {
204204
}
205205
}
206206

207+
/// Helper function to recursively identify any children that do not
208+
/// have a runtime set but should because they are local to this same
209+
/// library. This does imply a restriction that all execution plans
210+
/// in this chain that are within the same library use the same runtime.
211+
fn pass_runtime_to_children(
212+
plan: &Arc<dyn ExecutionPlan>,
213+
runtime: &Handle,
214+
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
215+
println!("checking plan {:?}", plan.name());
216+
let mut updated_children = false;
217+
let plan_is_foreign = plan.as_any().is::<ForeignExecutionPlan>();
218+
219+
let children = plan
220+
.children()
221+
.into_iter()
222+
.map(|child| {
223+
let child = match pass_runtime_to_children(child, runtime)? {
224+
Some(child) => {
225+
updated_children = true;
226+
child
227+
}
228+
None => Arc::clone(child),
229+
};
230+
231+
// If the parent is foreign and the child is local to this library, then when
232+
// we called `children()` above we will get something other than a
233+
// `ForeignExecutionPlan`. In this case wrap the plan in a `ForeignExecutionPlan`
234+
// because when we call `with_new_children` below it will extract the
235+
// FFI plan that does contain the runtime.
236+
if plan_is_foreign && !child.as_any().is::<ForeignExecutionPlan>() {
237+
updated_children = true;
238+
let ffi_child = FFI_ExecutionPlan::new(child, Some(runtime.clone()));
239+
let foreign_child = ForeignExecutionPlan::try_from(ffi_child);
240+
foreign_child.map(|c| Arc::new(c) as Arc<dyn ExecutionPlan>)
241+
} else {
242+
Ok(child)
243+
}
244+
})
245+
.collect::<Result<Vec<_>>>()?;
246+
if updated_children {
247+
Arc::clone(plan).with_new_children(children).map(Some)
248+
} else {
249+
Ok(None)
250+
}
251+
}
252+
207253
impl FFI_ExecutionPlan {
208254
/// This function is called on the provider's side.
209-
pub fn new(plan: Arc<dyn ExecutionPlan>, runtime: Option<Handle>) -> Self {
255+
pub fn new(mut plan: Arc<dyn ExecutionPlan>, runtime: Option<Handle>) -> Self {
256+
// Note to developers: `pass_runtime_to_children` relies on the logic here to
257+
// get the underlying FFI plan during calls to `new_with_children`.
210258
if let Some(plan) = plan.as_any().downcast_ref::<ForeignExecutionPlan>() {
211259
return plan.plan.clone();
212260
}
213261

262+
if let Some(rt) = &runtime
263+
&& let Ok(Some(p)) = pass_runtime_to_children(&plan, rt)
264+
{
265+
plan = p;
266+
}
267+
214268
let private_data = Box::new(ExecutionPlanPrivateData { plan, runtime });
215269
Self {
216270
properties: properties_fn_wrapper,
@@ -278,28 +332,34 @@ impl TryFrom<&FFI_ExecutionPlan> for Arc<dyn ExecutionPlan> {
278332

279333
fn try_from(plan: &FFI_ExecutionPlan) -> Result<Self, Self::Error> {
280334
if (plan.library_marker_id)() == crate::get_library_marker_id() {
281-
return Ok(Arc::clone(plan.inner()));
335+
Ok(Arc::clone(plan.inner()))
336+
} else {
337+
let plan = ForeignExecutionPlan::try_from(plan.clone())?;
338+
Ok(Arc::new(plan))
282339
}
340+
}
341+
}
283342

343+
impl TryFrom<FFI_ExecutionPlan> for ForeignExecutionPlan {
344+
type Error = DataFusionError;
345+
fn try_from(plan: FFI_ExecutionPlan) -> Result<Self, Self::Error> {
284346
unsafe {
285-
let name = (plan.name)(plan).into();
347+
let name = (plan.name)(&plan).into();
286348

287-
let properties: PlanProperties = (plan.properties)(plan).try_into()?;
349+
let properties: PlanProperties = (plan.properties)(&plan).try_into()?;
288350

289-
let children_rvec = (plan.children)(plan);
351+
let children_rvec = (plan.children)(&plan);
290352
let children = children_rvec
291353
.iter()
292354
.map(<Arc<dyn ExecutionPlan>>::try_from)
293355
.collect::<Result<Vec<_>>>()?;
294356

295-
let plan = ForeignExecutionPlan {
357+
Ok(ForeignExecutionPlan {
296358
name,
297-
plan: plan.clone(),
359+
plan,
298360
properties,
299361
children,
300-
};
301-
302-
Ok(Arc::new(plan))
362+
})
303363
}
304364
}
305365
}
@@ -366,11 +426,10 @@ impl ExecutionPlan for ForeignExecutionPlan {
366426
}
367427
}
368428

369-
#[cfg(test)]
370-
pub(crate) mod tests {
371-
use arrow::datatypes::{DataType, Field, Schema};
372-
use datafusion::physical_plan::Partitioning;
373-
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
429+
#[cfg(any(test, feature = "integration-tests"))]
430+
pub mod tests {
431+
use datafusion_physical_plan::Partitioning;
432+
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
374433

375434
use super::*;
376435

@@ -384,7 +443,7 @@ pub(crate) mod tests {
384443
pub fn new(schema: arrow::datatypes::SchemaRef) -> Self {
385444
Self {
386445
props: PlanProperties::new(
387-
datafusion::physical_expr::EquivalenceProperties::new(schema),
446+
datafusion_physical_expr::EquivalenceProperties::new(schema),
388447
Partitioning::UnknownPartitioning(3),
389448
EmissionType::Incremental,
390449
Boundedness::Bounded,
@@ -442,8 +501,9 @@ pub(crate) mod tests {
442501

443502
#[test]
444503
fn test_round_trip_ffi_execution_plan() -> Result<()> {
445-
let schema =
446-
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
504+
let schema = Arc::new(arrow::datatypes::Schema::new(vec![
505+
arrow::datatypes::Field::new("a", arrow::datatypes::DataType::Float32, false),
506+
]));
447507

448508
let original_plan = Arc::new(EmptyExec::new(schema));
449509
let original_name = original_plan.name().to_string();
@@ -455,7 +515,7 @@ pub(crate) mod tests {
455515

456516
assert_eq!(original_name, foreign_plan.name());
457517

458-
let display = datafusion::physical_plan::display::DisplayableExecutionPlan::new(
518+
let display = datafusion_physical_plan::display::DisplayableExecutionPlan::new(
459519
foreign_plan.as_ref(),
460520
);
461521

@@ -470,8 +530,9 @@ pub(crate) mod tests {
470530

471531
#[test]
472532
fn test_ffi_execution_plan_children() -> Result<()> {
473-
let schema =
474-
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
533+
let schema = Arc::new(arrow::datatypes::Schema::new(vec![
534+
arrow::datatypes::Field::new("a", arrow::datatypes::DataType::Float32, false),
535+
]));
475536

476537
// Version 1: Adding child to the foreign plan
477538
let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
@@ -509,8 +570,9 @@ pub(crate) mod tests {
509570

510571
#[test]
511572
fn test_ffi_execution_plan_local_bypass() {
512-
let schema =
513-
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
573+
let schema = Arc::new(arrow::datatypes::Schema::new(vec![
574+
arrow::datatypes::Field::new("a", arrow::datatypes::DataType::Float32, false),
575+
]));
514576

515577
let plan = Arc::new(EmptyExec::new(schema));
516578

datafusion/ffi/src/tests/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ use super::udf::FFI_ScalarUDF;
3939
use crate::catalog_provider::FFI_CatalogProvider;
4040
use crate::catalog_provider_list::FFI_CatalogProviderList;
4141
use crate::config::extension_options::FFI_ExtensionOptions;
42+
use crate::execution_plan::FFI_ExecutionPlan;
43+
use crate::execution_plan::tests::EmptyExec;
4244
use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
4345
use crate::tests::catalog::create_catalog_provider_list;
4446
use crate::udaf::FFI_AggregateUDF;
@@ -92,6 +94,8 @@ pub struct ForeignLibraryModule {
9294
/// Create extension options, for either ConfigOptions or TableOptions
9395
pub create_extension_options: extern "C" fn() -> FFI_ExtensionOptions,
9496

97+
pub create_empty_exec: extern "C" fn() -> FFI_ExecutionPlan,
98+
9599
pub version: extern "C" fn() -> u64,
96100
}
97101

@@ -133,6 +137,13 @@ extern "C" fn construct_table_provider(
133137
}
134138
}
135139

140+
pub(crate) extern "C" fn create_empty_exec() -> FFI_ExecutionPlan {
141+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
142+
143+
let plan = Arc::new(EmptyExec::new(schema));
144+
FFI_ExecutionPlan::new(plan, None)
145+
}
146+
136147
#[export_root_module]
137148
/// This defines the entry point for using the module.
138149
pub fn get_foreign_library_module() -> ForeignLibraryModuleRef {
@@ -147,6 +158,7 @@ pub fn get_foreign_library_module() -> ForeignLibraryModuleRef {
147158
create_stddev_udaf: create_ffi_stddev_func,
148159
create_rank_udwf: create_ffi_rank_func,
149160
create_extension_options: config::create_extension_options,
161+
create_empty_exec,
150162
version: super::version,
151163
}
152164
.leak_into_prefix()
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
#[cfg(feature = "integration-tests")]
2+
mod tests {
3+
use arrow::datatypes::Field;
4+
use arrow::datatypes::Schema;
5+
use arrow_schema::DataType;
6+
use datafusion_common::DataFusionError;
7+
use datafusion_ffi::execution_plan::FFI_ExecutionPlan;
8+
use datafusion_ffi::execution_plan::ForeignExecutionPlan;
9+
use datafusion_ffi::execution_plan::{ExecutionPlanPrivateData, tests::EmptyExec};
10+
use datafusion_ffi::tests::utils::get_module;
11+
use datafusion_physical_plan::ExecutionPlan;
12+
use std::sync::Arc;
13+
14+
#[test]
15+
fn test_ffi_execution_plan_new_sets_runtimes_on_children()
16+
-> Result<(), DataFusionError> {
17+
// We want to test the case where we have two libraries.
18+
// Library A will have a foreign plan from Library B, called child_plan.
19+
// Library A will add a plan called grandchild_plan under child_plan
20+
// Library A will create a plan called parent_plan, that has child_plan
21+
// under it. So we should have:
22+
// parent_plan (local) -> child_plan (foreign) -> grandchild_plan (local)
23+
// Then we want to turn parent_plan into a FFI plan.
24+
// Verify that grandchild_plan also gets the same runtime as parent_plan.
25+
26+
let module = get_module()?;
27+
28+
fn generate_local_plan() -> Arc<dyn ExecutionPlan> {
29+
let schema =
30+
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
31+
32+
Arc::new(EmptyExec::new(schema))
33+
}
34+
35+
let child_plan =
36+
module
37+
.create_empty_exec()
38+
.ok_or(DataFusionError::NotImplemented(
39+
"External module failed to implement create_empty_exec".to_string(),
40+
))?();
41+
let child_plan: Arc<dyn ExecutionPlan> = (&child_plan)
42+
.try_into()
43+
.expect("should be able create plan");
44+
assert!(child_plan.as_any().is::<ForeignExecutionPlan>());
45+
46+
let grandchild_plan = generate_local_plan();
47+
48+
let child_plan = child_plan.with_new_children(vec![grandchild_plan])?;
49+
50+
unsafe {
51+
// Originally the runtime is not set. We go through the unsafe casting
52+
// of data here because the `inner()` function is private and this is
53+
// only an integration test so we do not want to expose it.
54+
let ffi_child = FFI_ExecutionPlan::new(Arc::clone(&child_plan), None);
55+
let ffi_grandchild =
56+
(ffi_child.children)(&ffi_child).into_iter().next().unwrap();
57+
58+
let grandchild_private_data =
59+
ffi_grandchild.private_data as *const ExecutionPlanPrivateData;
60+
assert!((*grandchild_private_data).runtime.is_none());
61+
}
62+
63+
let parent_plan = generate_local_plan().with_new_children(vec![child_plan])?;
64+
65+
// Adding the grandchild beneath this FFI plan should get the runtime passed down.
66+
let runtime = tokio::runtime::Builder::new_current_thread()
67+
.build()
68+
.unwrap();
69+
let ffi_parent =
70+
FFI_ExecutionPlan::new(parent_plan, Some(runtime.handle().clone()));
71+
72+
unsafe {
73+
let ffi_child = (ffi_parent.children)(&ffi_parent)
74+
.into_iter()
75+
.next()
76+
.unwrap();
77+
let ffi_grandchild =
78+
(ffi_child.children)(&ffi_child).into_iter().next().unwrap();
79+
assert_eq!(
80+
(ffi_grandchild.library_marker_id)(),
81+
(ffi_parent.library_marker_id)()
82+
);
83+
84+
let grandchild_private_data =
85+
ffi_grandchild.private_data as *const ExecutionPlanPrivateData;
86+
assert!((*grandchild_private_data).runtime.is_some());
87+
}
88+
89+
Ok(())
90+
}
91+
}

0 commit comments

Comments
 (0)