Skip to content

Commit e480180

Browse files
committed
Set runtime on all descendents within the same library
1 parent bbfc11f commit e480180

3 files changed

Lines changed: 188 additions & 23 deletions

File tree

datafusion/ffi/src/execution_plan.rs

Lines changed: 85 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -205,13 +205,67 @@ impl Clone for FFI_ExecutionPlan {
205205
}
206206
}
207207

208+
/// Helper function to recursively identify any children that do not
209+
/// have a runtime set but should because they are local to this same
210+
/// library. This does imply a restriction that all execution plans
211+
/// in this chain that are within the same library use the same runtime.
212+
fn pass_runtime_to_children(
213+
plan: &Arc<dyn ExecutionPlan>,
214+
runtime: &Handle,
215+
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
216+
println!("checking plan {:?}", plan.name());
217+
let mut updated_children = false;
218+
let plan_is_foreign = plan.as_any().is::<ForeignExecutionPlan>();
219+
220+
let children = plan
221+
.children()
222+
.into_iter()
223+
.map(|child| {
224+
let child = match pass_runtime_to_children(child, runtime)? {
225+
Some(child) => {
226+
updated_children = true;
227+
child
228+
}
229+
None => Arc::clone(child),
230+
};
231+
232+
// If the parent is foreign and the child is local to this library, then when
233+
// we called `children()` above we will get something other than a
234+
// `ForeignExecutionPlan`. In this case wrap the plan in a `ForeignExecutionPlan`
235+
// because when we call `with_new_children` below it will extract the
236+
// FFI plan that does contain the runtime.
237+
if plan_is_foreign && !child.as_any().is::<ForeignExecutionPlan>() {
238+
updated_children = true;
239+
let ffi_child = FFI_ExecutionPlan::new(child, Some(runtime.clone()));
240+
let foreign_child = ForeignExecutionPlan::try_from(ffi_child);
241+
foreign_child.map(|c| Arc::new(c) as Arc<dyn ExecutionPlan>)
242+
} else {
243+
Ok(child)
244+
}
245+
})
246+
.collect::<Result<Vec<_>>>()?;
247+
if updated_children {
248+
Arc::clone(plan).with_new_children(children).map(Some)
249+
} else {
250+
Ok(None)
251+
}
252+
}
253+
208254
impl FFI_ExecutionPlan {
209255
/// This function is called on the provider's side.
210-
pub fn new(plan: Arc<dyn ExecutionPlan>, runtime: Option<Handle>) -> Self {
256+
pub fn new(mut plan: Arc<dyn ExecutionPlan>, runtime: Option<Handle>) -> Self {
257+
// Note to developers: `pass_runtime_to_children` relies on the logic here to
258+
// get the underlying FFI plan during calls to `new_with_children`.
211259
if let Some(plan) = plan.as_any().downcast_ref::<ForeignExecutionPlan>() {
212260
return plan.plan.clone();
213261
}
214262

263+
if let Some(rt) = &runtime
264+
&& let Ok(Some(p)) = pass_runtime_to_children(&plan, rt)
265+
{
266+
plan = p;
267+
}
268+
215269
let private_data = Box::new(ExecutionPlanPrivateData { plan, runtime });
216270
Self {
217271
properties: properties_fn_wrapper,
@@ -279,28 +333,34 @@ impl TryFrom<&FFI_ExecutionPlan> for Arc<dyn ExecutionPlan> {
279333

280334
fn try_from(plan: &FFI_ExecutionPlan) -> Result<Self, Self::Error> {
281335
if (plan.library_marker_id)() == crate::get_library_marker_id() {
282-
return Ok(Arc::clone(plan.inner()));
336+
Ok(Arc::clone(plan.inner()))
337+
} else {
338+
let plan = ForeignExecutionPlan::try_from(plan.clone())?;
339+
Ok(Arc::new(plan))
283340
}
341+
}
342+
}
284343

344+
impl TryFrom<FFI_ExecutionPlan> for ForeignExecutionPlan {
345+
type Error = DataFusionError;
346+
fn try_from(plan: FFI_ExecutionPlan) -> Result<Self, Self::Error> {
285347
unsafe {
286-
let name = (plan.name)(plan).into();
348+
let name = (plan.name)(&plan).into();
287349

288-
let properties: PlanProperties = (plan.properties)(plan).try_into()?;
350+
let properties: PlanProperties = (plan.properties)(&plan).try_into()?;
289351

290-
let children_rvec = (plan.children)(plan);
352+
let children_rvec = (plan.children)(&plan);
291353
let children = children_rvec
292354
.iter()
293355
.map(<Arc<dyn ExecutionPlan>>::try_from)
294356
.collect::<Result<Vec<_>>>()?;
295357

296-
let plan = ForeignExecutionPlan {
358+
Ok(ForeignExecutionPlan {
297359
name,
298-
plan: plan.clone(),
360+
plan,
299361
properties: Arc::new(properties),
300362
children,
301-
};
302-
303-
Ok(Arc::new(plan))
363+
})
304364
}
305365
}
306366
}
@@ -383,11 +443,10 @@ impl ExecutionPlan for ForeignExecutionPlan {
383443
}
384444
}
385445

386-
#[cfg(test)]
387-
pub(crate) mod tests {
388-
use arrow::datatypes::{DataType, Field, Schema};
389-
use datafusion::physical_plan::Partitioning;
390-
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
446+
#[cfg(any(test, feature = "integration-tests"))]
447+
pub mod tests {
448+
use datafusion_physical_plan::Partitioning;
449+
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
391450

392451
use super::*;
393452

@@ -401,7 +460,7 @@ pub(crate) mod tests {
401460
pub fn new(schema: arrow::datatypes::SchemaRef) -> Self {
402461
Self {
403462
props: Arc::new(PlanProperties::new(
404-
datafusion::physical_expr::EquivalenceProperties::new(schema),
463+
datafusion_physical_expr::EquivalenceProperties::new(schema),
405464
Partitioning::UnknownPartitioning(3),
406465
EmissionType::Incremental,
407466
Boundedness::Bounded,
@@ -475,8 +534,9 @@ pub(crate) mod tests {
475534

476535
#[test]
477536
fn test_round_trip_ffi_execution_plan() -> Result<()> {
478-
let schema =
479-
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
537+
let schema = Arc::new(arrow::datatypes::Schema::new(vec![
538+
arrow::datatypes::Field::new("a", arrow::datatypes::DataType::Float32, false),
539+
]));
480540

481541
let original_plan = Arc::new(EmptyExec::new(schema));
482542
let original_name = original_plan.name().to_string();
@@ -488,7 +548,7 @@ pub(crate) mod tests {
488548

489549
assert_eq!(original_name, foreign_plan.name());
490550

491-
let display = datafusion::physical_plan::display::DisplayableExecutionPlan::new(
551+
let display = datafusion_physical_plan::display::DisplayableExecutionPlan::new(
492552
foreign_plan.as_ref(),
493553
);
494554

@@ -503,8 +563,9 @@ pub(crate) mod tests {
503563

504564
#[test]
505565
fn test_ffi_execution_plan_children() -> Result<()> {
506-
let schema =
507-
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
566+
let schema = Arc::new(arrow::datatypes::Schema::new(vec![
567+
arrow::datatypes::Field::new("a", arrow::datatypes::DataType::Float32, false),
568+
]));
508569

509570
// Version 1: Adding child to the foreign plan
510571
let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
@@ -542,8 +603,9 @@ pub(crate) mod tests {
542603

543604
#[test]
544605
fn test_ffi_execution_plan_local_bypass() {
545-
let schema =
546-
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
606+
let schema = Arc::new(arrow::datatypes::Schema::new(vec![
607+
arrow::datatypes::Field::new("a", arrow::datatypes::DataType::Float32, false),
608+
]));
547609

548610
let plan = Arc::new(EmptyExec::new(schema));
549611

datafusion/ffi/src/tests/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ use udf_udaf_udwf::{
3737
use crate::catalog_provider::FFI_CatalogProvider;
3838
use crate::catalog_provider_list::FFI_CatalogProviderList;
3939
use crate::config::extension_options::FFI_ExtensionOptions;
40+
use crate::execution_plan::FFI_ExecutionPlan;
41+
use crate::execution_plan::tests::EmptyExec;
4042
use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
4143
use crate::table_provider::FFI_TableProvider;
4244
use crate::table_provider_factory::FFI_TableProviderFactory;
@@ -98,6 +100,8 @@ pub struct ForeignLibraryModule {
98100
/// Create extension options, for either ConfigOptions or TableOptions
99101
pub create_extension_options: extern "C" fn() -> FFI_ExtensionOptions,
100102

103+
pub create_empty_exec: extern "C" fn() -> FFI_ExecutionPlan,
104+
101105
pub version: extern "C" fn() -> u64,
102106
}
103107

@@ -147,6 +151,13 @@ extern "C" fn construct_table_provider_factory(
147151
table_provider_factory::create(codec)
148152
}
149153

154+
pub(crate) extern "C" fn create_empty_exec() -> FFI_ExecutionPlan {
155+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
156+
157+
let plan = Arc::new(EmptyExec::new(schema));
158+
FFI_ExecutionPlan::new(plan, None)
159+
}
160+
150161
#[export_root_module]
151162
/// This defines the entry point for using the module.
152163
pub fn get_foreign_library_module() -> ForeignLibraryModuleRef {
@@ -162,6 +173,7 @@ pub fn get_foreign_library_module() -> ForeignLibraryModuleRef {
162173
create_stddev_udaf: create_ffi_stddev_func,
163174
create_rank_udwf: create_ffi_rank_func,
164175
create_extension_options: config::create_extension_options,
176+
create_empty_exec,
165177
version: super::version,
166178
}
167179
.leak_into_prefix()
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
#[cfg(feature = "integration-tests")]
2+
mod tests {
3+
use arrow::datatypes::Field;
4+
use arrow::datatypes::Schema;
5+
use arrow_schema::DataType;
6+
use datafusion_common::DataFusionError;
7+
use datafusion_ffi::execution_plan::FFI_ExecutionPlan;
8+
use datafusion_ffi::execution_plan::ForeignExecutionPlan;
9+
use datafusion_ffi::execution_plan::{ExecutionPlanPrivateData, tests::EmptyExec};
10+
use datafusion_ffi::tests::utils::get_module;
11+
use datafusion_physical_plan::ExecutionPlan;
12+
use std::sync::Arc;
13+
14+
#[test]
15+
fn test_ffi_execution_plan_new_sets_runtimes_on_children()
16+
-> Result<(), DataFusionError> {
17+
// We want to test the case where we have two libraries.
18+
// Library A will have a foreign plan from Library B, called child_plan.
19+
// Library A will add a plan called grandchild_plan under child_plan
20+
// Library A will create a plan called parent_plan, that has child_plan
21+
// under it. So we should have:
22+
// parent_plan (local) -> child_plan (foreign) -> grandchild_plan (local)
23+
// Then we want to turn parent_plan into a FFI plan.
24+
// Verify that grandchild_plan also gets the same runtime as parent_plan.
25+
26+
let module = get_module()?;
27+
28+
fn generate_local_plan() -> Arc<dyn ExecutionPlan> {
29+
let schema =
30+
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
31+
32+
Arc::new(EmptyExec::new(schema))
33+
}
34+
35+
let child_plan =
36+
module
37+
.create_empty_exec()
38+
.ok_or(DataFusionError::NotImplemented(
39+
"External module failed to implement create_empty_exec".to_string(),
40+
))?();
41+
let child_plan: Arc<dyn ExecutionPlan> = (&child_plan)
42+
.try_into()
43+
.expect("should be able create plan");
44+
assert!(child_plan.as_any().is::<ForeignExecutionPlan>());
45+
46+
let grandchild_plan = generate_local_plan();
47+
48+
let child_plan = child_plan.with_new_children(vec![grandchild_plan])?;
49+
50+
unsafe {
51+
// Originally the runtime is not set. We go through the unsafe casting
52+
// of data here because the `inner()` function is private and this is
53+
// only an integration test so we do not want to expose it.
54+
let ffi_child = FFI_ExecutionPlan::new(Arc::clone(&child_plan), None);
55+
let ffi_grandchild =
56+
(ffi_child.children)(&ffi_child).into_iter().next().unwrap();
57+
58+
let grandchild_private_data =
59+
ffi_grandchild.private_data as *const ExecutionPlanPrivateData;
60+
assert!((*grandchild_private_data).runtime.is_none());
61+
}
62+
63+
let parent_plan = generate_local_plan().with_new_children(vec![child_plan])?;
64+
65+
// Adding the grandchild beneath this FFI plan should get the runtime passed down.
66+
let runtime = tokio::runtime::Builder::new_current_thread()
67+
.build()
68+
.unwrap();
69+
let ffi_parent =
70+
FFI_ExecutionPlan::new(parent_plan, Some(runtime.handle().clone()));
71+
72+
unsafe {
73+
let ffi_child = (ffi_parent.children)(&ffi_parent)
74+
.into_iter()
75+
.next()
76+
.unwrap();
77+
let ffi_grandchild =
78+
(ffi_child.children)(&ffi_child).into_iter().next().unwrap();
79+
assert_eq!(
80+
(ffi_grandchild.library_marker_id)(),
81+
(ffi_parent.library_marker_id)()
82+
);
83+
84+
let grandchild_private_data =
85+
ffi_grandchild.private_data as *const ExecutionPlanPrivateData;
86+
assert!((*grandchild_private_data).runtime.is_some());
87+
}
88+
89+
Ok(())
90+
}
91+
}

0 commit comments

Comments
 (0)