Skip to content

Commit dd86bbd

Browse files
committed
Implement new_with_children and repartitioned on execution plan in FFI
1 parent b04c475 commit dd86bbd

8 files changed

Lines changed: 174 additions & 50 deletions

File tree

datafusion-examples/examples/custom_data_source/adapter_serialization.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,10 @@ impl PhysicalExtensionCodec for AdapterPreservingCodec {
308308
"try_encode not used - adapter wrapping happens in serialize_physical_plan"
309309
)
310310
}
311+
312+
fn as_any(&self) -> &dyn std::any::Any {
313+
self
314+
}
311315
}
312316

313317
impl PhysicalProtoConverterExtension for AdapterPreservingCodec {

datafusion-examples/examples/proto/composed_extension_codec.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,10 @@ impl PhysicalExtensionCodec for ParentPhysicalExtensionCodec {
158158
internal_err!("Not supported")
159159
}
160160
}
161+
162+
fn as_any(&self) -> &dyn Any {
163+
self
164+
}
161165
}
162166

163167
#[derive(Debug)]
@@ -232,4 +236,8 @@ impl PhysicalExtensionCodec for ChildPhysicalExtensionCodec {
232236
internal_err!("Not supported")
233237
}
234238
}
239+
240+
fn as_any(&self) -> &dyn Any {
241+
self
242+
}
235243
}

datafusion-examples/examples/proto/expression_deduplication.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,10 @@ impl PhysicalExtensionCodec for CachingCodec {
199199
) -> Result<()> {
200200
datafusion::common::not_impl_err!("No custom extension nodes")
201201
}
202+
203+
fn as_any(&self) -> &dyn std::any::Any {
204+
self
205+
}
202206
}
203207

204208
impl PhysicalProtoConverterExtension for CachingCodec {

datafusion/ffi/src/execution_plan.rs

Lines changed: 107 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,21 @@ use std::pin::Pin;
2020
use std::sync::Arc;
2121

2222
use abi_stable::StableAbi;
23-
use abi_stable::std_types::{RString, RVec};
23+
use abi_stable::std_types::{ROption, RResult, RString, RVec};
24+
use datafusion_common::config::ConfigOptions;
2425
use datafusion_common::{DataFusionError, Result};
2526
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
2627
use datafusion_physical_plan::{
2728
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
2829
};
2930
use tokio::runtime::Handle;
3031

32+
use crate::config::FFI_ConfigOptions;
3133
use crate::execution::FFI_TaskContext;
3234
use crate::plan_properties::FFI_PlanProperties;
3335
use crate::record_batch_stream::FFI_RecordBatchStream;
3436
use crate::util::FFIResult;
35-
use crate::{df_result, rresult};
37+
use crate::{df_result, rresult, rresult_return};
3638

3739
/// A stable struct for sharing a [`ExecutionPlan`] across FFI boundaries.
3840
#[repr(C)]
@@ -44,6 +46,9 @@ pub struct FFI_ExecutionPlan {
4446
/// Return a vector of children plans
4547
pub children: unsafe extern "C" fn(plan: &Self) -> RVec<FFI_ExecutionPlan>,
4648

49+
pub with_new_children:
50+
unsafe extern "C" fn(plan: &Self, children: RVec<Self>) -> FFIResult<Self>,
51+
4752
/// Return the plan name.
4853
pub name: unsafe extern "C" fn(plan: &Self) -> RString,
4954

@@ -55,6 +60,12 @@ pub struct FFI_ExecutionPlan {
5560
context: FFI_TaskContext,
5661
) -> FFIResult<FFI_RecordBatchStream>,
5762

63+
pub repartitioned: unsafe extern "C" fn(
64+
plan: &Self,
65+
target_partitions: usize,
66+
config: FFI_ConfigOptions,
67+
) -> FFIResult<ROption<FFI_ExecutionPlan>>,
68+
5869
/// Used to create a clone on the provider of the execution plan. This should
5970
/// only need to be called by the receiver of the plan.
6071
pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
@@ -85,6 +96,11 @@ impl FFI_ExecutionPlan {
8596
let private_data = self.private_data as *const ExecutionPlanPrivateData;
8697
unsafe { &(*private_data).plan }
8798
}
99+
100+
fn runtime(&self) -> Option<Handle> {
101+
let private_data = self.private_data as *const ExecutionPlanPrivateData;
102+
unsafe { (*private_data).runtime.clone() }
103+
}
88104
}
89105

90106
unsafe extern "C" fn properties_fn_wrapper(
@@ -96,37 +112,69 @@ unsafe extern "C" fn properties_fn_wrapper(
96112
unsafe extern "C" fn children_fn_wrapper(
97113
plan: &FFI_ExecutionPlan,
98114
) -> RVec<FFI_ExecutionPlan> {
99-
unsafe {
100-
let private_data = plan.private_data as *const ExecutionPlanPrivateData;
101-
let plan = &(*private_data).plan;
102-
let runtime = &(*private_data).runtime;
115+
let runtime = plan.runtime();
116+
let plan = plan.inner();
103117

104-
let children: Vec<_> = plan
105-
.children()
106-
.into_iter()
107-
.map(|child| FFI_ExecutionPlan::new(Arc::clone(child), runtime.clone()))
108-
.collect();
118+
let children: Vec<_> = plan
119+
.children()
120+
.into_iter()
121+
.map(|child| FFI_ExecutionPlan::new(Arc::clone(child), runtime.clone()))
122+
.collect();
109123

110-
children.into()
111-
}
124+
children.into()
125+
}
126+
127+
unsafe extern "C" fn with_new_children_fn_wrapper(
128+
plan: &FFI_ExecutionPlan,
129+
children: RVec<FFI_ExecutionPlan>,
130+
) -> FFIResult<FFI_ExecutionPlan> {
131+
let runtime = plan.runtime();
132+
let plan = Arc::clone(plan.inner());
133+
let children = rresult_return!(
134+
children
135+
.iter()
136+
.map(<Arc<dyn ExecutionPlan>>::try_from)
137+
.collect::<Result<Vec<_>>>()
138+
);
139+
140+
let new_plan = rresult_return!(plan.with_new_children(children));
141+
142+
RResult::ROk(FFI_ExecutionPlan::new(new_plan, runtime))
112143
}
113144

114145
unsafe extern "C" fn execute_fn_wrapper(
115146
plan: &FFI_ExecutionPlan,
116147
partition: usize,
117148
context: FFI_TaskContext,
118149
) -> FFIResult<FFI_RecordBatchStream> {
119-
unsafe {
120-
let ctx = context.into();
121-
let private_data = plan.private_data as *const ExecutionPlanPrivateData;
122-
let plan = &(*private_data).plan;
123-
let runtime = (*private_data).runtime.clone();
124-
125-
rresult!(
126-
plan.execute(partition, ctx)
127-
.map(|rbs| FFI_RecordBatchStream::new(rbs, runtime))
128-
)
129-
}
150+
let ctx = context.into();
151+
let runtime = plan.runtime();
152+
let plan = plan.inner();
153+
154+
let _guard = runtime.as_ref().map(|rt| rt.enter());
155+
156+
rresult!(
157+
plan.execute(partition, ctx)
158+
.map(|rbs| FFI_RecordBatchStream::new(rbs, runtime))
159+
)
160+
}
161+
162+
unsafe extern "C" fn repartitioned_fn_wrapper(
163+
plan: &FFI_ExecutionPlan,
164+
target_partitions: usize,
165+
config: FFI_ConfigOptions,
166+
) -> FFIResult<ROption<FFI_ExecutionPlan>> {
167+
let maybe_config: Result<ConfigOptions, DataFusionError> = config.try_into();
168+
let config = rresult_return!(maybe_config);
169+
let runtime = plan.runtime();
170+
let plan = plan.inner();
171+
172+
rresult!(
173+
plan.repartitioned(target_partitions, &config)
174+
.map(|maybe_plan| maybe_plan
175+
.map(|plan| FFI_ExecutionPlan::new(plan, runtime))
176+
.into())
177+
)
130178
}
131179

132180
unsafe extern "C" fn name_fn_wrapper(plan: &FFI_ExecutionPlan) -> RString {
@@ -144,12 +192,10 @@ unsafe extern "C" fn release_fn_wrapper(plan: &mut FFI_ExecutionPlan) {
144192
}
145193

146194
unsafe extern "C" fn clone_fn_wrapper(plan: &FFI_ExecutionPlan) -> FFI_ExecutionPlan {
147-
unsafe {
148-
let private_data = plan.private_data as *const ExecutionPlanPrivateData;
149-
let plan_data = &(*private_data);
195+
let runtime = plan.runtime();
196+
let plan = plan.inner();
150197

151-
FFI_ExecutionPlan::new(Arc::clone(&plan_data.plan), plan_data.runtime.clone())
152-
}
198+
FFI_ExecutionPlan::new(Arc::clone(plan), runtime)
153199
}
154200

155201
impl Clone for FFI_ExecutionPlan {
@@ -161,13 +207,18 @@ impl Clone for FFI_ExecutionPlan {
161207
impl FFI_ExecutionPlan {
162208
/// This function is called on the provider's side.
163209
pub fn new(plan: Arc<dyn ExecutionPlan>, runtime: Option<Handle>) -> Self {
164-
let private_data = Box::new(ExecutionPlanPrivateData { plan, runtime });
210+
if let Some(plan) = plan.as_any().downcast_ref::<ForeignExecutionPlan>() {
211+
return plan.plan.clone();
212+
}
165213

214+
let private_data = Box::new(ExecutionPlanPrivateData { plan, runtime });
166215
Self {
167216
properties: properties_fn_wrapper,
168217
children: children_fn_wrapper,
218+
with_new_children: with_new_children_fn_wrapper,
169219
name: name_fn_wrapper,
170220
execute: execute_fn_wrapper,
221+
repartitioned: repartitioned_fn_wrapper,
171222
clone: clone_fn_wrapper,
172223
release: release_fn_wrapper,
173224
private_data: Box::into_raw(private_data) as *mut c_void,
@@ -274,12 +325,16 @@ impl ExecutionPlan for ForeignExecutionPlan {
274325
self: Arc<Self>,
275326
children: Vec<Arc<dyn ExecutionPlan>>,
276327
) -> Result<Arc<dyn ExecutionPlan>> {
277-
Ok(Arc::new(ForeignExecutionPlan {
278-
plan: self.plan.clone(),
279-
name: self.name.clone(),
280-
children,
281-
properties: self.properties.clone(),
282-
}))
328+
unsafe {
329+
let children = children
330+
.into_iter()
331+
.map(|child| FFI_ExecutionPlan::new(child, None))
332+
.collect::<RVec<_>>();
333+
let new_plan =
334+
df_result!((self.plan.with_new_children)(&self.plan, children))?;
335+
336+
(&new_plan).try_into()
337+
}
283338
}
284339

285340
fn execute(
@@ -293,6 +348,22 @@ impl ExecutionPlan for ForeignExecutionPlan {
293348
.map(|stream| Pin::new(Box::new(stream)) as SendableRecordBatchStream)
294349
}
295350
}
351+
352+
fn repartitioned(
353+
&self,
354+
target_partitions: usize,
355+
config: &ConfigOptions,
356+
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
357+
let config = config.into();
358+
let maybe_plan: Option<FFI_ExecutionPlan> = df_result!(unsafe {
359+
(self.plan.repartitioned)(&self.plan, target_partitions, config)
360+
})?
361+
.into();
362+
363+
maybe_plan
364+
.map(|plan| <Arc<dyn ExecutionPlan>>::try_from(&plan))
365+
.transpose()
366+
}
296367
}
297368

298369
#[cfg(test)]

datafusion/ffi/src/physical_expr/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,10 @@ impl Drop for FFI_PhysicalExpr {
448448
impl From<Arc<dyn PhysicalExpr>> for FFI_PhysicalExpr {
449449
/// Creates a new [`FFI_PhysicalExpr`].
450450
fn from(expr: Arc<dyn PhysicalExpr>) -> Self {
451+
if let Some(expr) = expr.as_any().downcast_ref::<ForeignPhysicalExpr>() {
452+
return expr.expr.clone();
453+
}
454+
451455
let private_data = Box::new(PhysicalExprPrivateData { expr });
452456

453457
Self {

datafusion/ffi/src/proto/physical_extension_codec.rs

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -111,14 +111,14 @@ unsafe impl Send for FFI_PhysicalExtensionCodec {}
111111
unsafe impl Sync for FFI_PhysicalExtensionCodec {}
112112

113113
struct PhysicalExtensionCodecPrivateData {
114-
provider: Arc<dyn PhysicalExtensionCodec>,
114+
codec: Arc<dyn PhysicalExtensionCodec>,
115115
runtime: Option<Handle>,
116116
}
117117

118118
impl FFI_PhysicalExtensionCodec {
119119
fn inner(&self) -> &Arc<dyn PhysicalExtensionCodec> {
120120
let private_data = self.private_data as *const PhysicalExtensionCodecPrivateData;
121-
unsafe { &(*private_data).provider }
121+
unsafe { &(*private_data).codec }
122122
}
123123

124124
fn runtime(&self) -> &Option<Handle> {
@@ -132,6 +132,7 @@ unsafe extern "C" fn try_decode_fn_wrapper(
132132
buf: RSlice<u8>,
133133
inputs: RVec<FFI_ExecutionPlan>,
134134
) -> FFIResult<FFI_ExecutionPlan> {
135+
let runtime = codec.runtime().clone();
135136
let task_ctx: Arc<TaskContext> =
136137
rresult_return!((&codec.task_ctx_provider).try_into());
137138
let codec = codec.inner();
@@ -144,7 +145,7 @@ unsafe extern "C" fn try_decode_fn_wrapper(
144145
let plan =
145146
rresult_return!(codec.try_decode(buf.as_ref(), &inputs, task_ctx.as_ref()));
146147

147-
RResult::ROk(FFI_ExecutionPlan::new(plan, None))
148+
RResult::ROk(FFI_ExecutionPlan::new(plan, runtime))
148149
}
149150

150151
unsafe extern "C" fn try_encode_fn_wrapper(
@@ -240,11 +241,10 @@ unsafe extern "C" fn try_encode_udwf_fn_wrapper(
240241
RResult::ROk(bytes.into())
241242
}
242243

243-
unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_PhysicalExtensionCodec) {
244+
unsafe extern "C" fn release_fn_wrapper(codec: &mut FFI_PhysicalExtensionCodec) {
244245
unsafe {
245-
let private_data = Box::from_raw(
246-
provider.private_data as *mut PhysicalExtensionCodecPrivateData,
247-
);
246+
let private_data =
247+
Box::from_raw(codec.private_data as *mut PhysicalExtensionCodecPrivateData);
248248
drop(private_data);
249249
}
250250
}
@@ -267,13 +267,19 @@ impl Drop for FFI_PhysicalExtensionCodec {
267267
impl FFI_PhysicalExtensionCodec {
268268
/// Creates a new [`FFI_PhysicalExtensionCodec`].
269269
pub fn new(
270-
provider: Arc<dyn PhysicalExtensionCodec + Send>,
270+
codec: Arc<dyn PhysicalExtensionCodec + Send>,
271271
runtime: Option<Handle>,
272272
task_ctx_provider: impl Into<FFI_TaskContextProvider>,
273273
) -> Self {
274+
if let Some(codec) = codec
275+
.as_any()
276+
.downcast_ref::<ForeignPhysicalExtensionCodec>()
277+
{
278+
return codec.0.clone();
279+
}
280+
274281
let task_ctx_provider = task_ctx_provider.into();
275-
let private_data =
276-
Box::new(PhysicalExtensionCodecPrivateData { provider, runtime });
282+
let private_data = Box::new(PhysicalExtensionCodecPrivateData { codec, runtime });
277283

278284
Self {
279285
try_decode: try_decode_fn_wrapper,
@@ -306,11 +312,11 @@ unsafe impl Send for ForeignPhysicalExtensionCodec {}
306312
unsafe impl Sync for ForeignPhysicalExtensionCodec {}
307313

308314
impl From<&FFI_PhysicalExtensionCodec> for Arc<dyn PhysicalExtensionCodec> {
309-
fn from(provider: &FFI_PhysicalExtensionCodec) -> Self {
310-
if (provider.library_marker_id)() == crate::get_library_marker_id() {
311-
Arc::clone(provider.inner())
315+
fn from(codec: &FFI_PhysicalExtensionCodec) -> Self {
316+
if (codec.library_marker_id)() == crate::get_library_marker_id() {
317+
Arc::clone(codec.inner())
312318
} else {
313-
Arc::new(ForeignPhysicalExtensionCodec(provider.clone()))
319+
Arc::new(ForeignPhysicalExtensionCodec(codec.clone()))
314320
}
315321
}
316322
}
@@ -403,6 +409,10 @@ impl PhysicalExtensionCodec for ForeignPhysicalExtensionCodec {
403409

404410
Ok(())
405411
}
412+
413+
fn as_any(&self) -> &dyn std::any::Any {
414+
self
415+
}
406416
}
407417

408418
#[cfg(test)]
@@ -558,6 +568,10 @@ pub(crate) mod tests {
558568

559569
Ok(())
560570
}
571+
572+
fn as_any(&self) -> &dyn std::any::Any {
573+
self
574+
}
561575
}
562576

563577
fn create_test_exec() -> Arc<dyn ExecutionPlan> {

0 commit comments

Comments
 (0)