Skip to content

Commit 31ccc50

Browse files
committed
Implement new_with_children and repartitioned on execution plan in FFI
1 parent 4dbb449 commit 31ccc50

8 files changed

Lines changed: 174 additions & 50 deletions

File tree

datafusion-examples/examples/custom_data_source/adapter_serialization.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,10 @@ impl PhysicalExtensionCodec for AdapterPreservingCodec {
308308
"try_encode not used - adapter wrapping happens in serialize_physical_plan"
309309
)
310310
}
311+
312+
fn as_any(&self) -> &dyn std::any::Any {
313+
self
314+
}
311315
}
312316

313317
impl PhysicalProtoConverterExtension for AdapterPreservingCodec {

datafusion-examples/examples/proto/composed_extension_codec.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,10 @@ impl PhysicalExtensionCodec for ParentPhysicalExtensionCodec {
168168
internal_err!("Not supported")
169169
}
170170
}
171+
172+
fn as_any(&self) -> &dyn Any {
173+
self
174+
}
171175
}
172176

173177
#[derive(Debug)]
@@ -251,4 +255,8 @@ impl PhysicalExtensionCodec for ChildPhysicalExtensionCodec {
251255
internal_err!("Not supported")
252256
}
253257
}
258+
259+
fn as_any(&self) -> &dyn Any {
260+
self
261+
}
254262
}

datafusion-examples/examples/proto/expression_deduplication.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,10 @@ impl PhysicalExtensionCodec for CachingCodec {
199199
) -> Result<()> {
200200
datafusion::common::not_impl_err!("No custom extension nodes")
201201
}
202+
203+
fn as_any(&self) -> &dyn std::any::Any {
204+
self
205+
}
202206
}
203207

204208
impl PhysicalProtoConverterExtension for CachingCodec {

datafusion/ffi/src/execution_plan.rs

Lines changed: 107 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ use std::pin::Pin;
2020
use std::sync::Arc;
2121

2222
use abi_stable::StableAbi;
23-
use abi_stable::std_types::{RString, RVec};
23+
use abi_stable::std_types::{ROption, RResult, RString, RVec};
24+
use datafusion_common::config::ConfigOptions;
2425
use datafusion_common::tree_node::TreeNodeRecursion;
2526
use datafusion_common::{DataFusionError, Result};
2627
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
@@ -29,11 +30,12 @@ use datafusion_physical_plan::{
2930
};
3031
use tokio::runtime::Handle;
3132

33+
use crate::config::FFI_ConfigOptions;
3234
use crate::execution::FFI_TaskContext;
3335
use crate::plan_properties::FFI_PlanProperties;
3436
use crate::record_batch_stream::FFI_RecordBatchStream;
3537
use crate::util::FFIResult;
36-
use crate::{df_result, rresult};
38+
use crate::{df_result, rresult, rresult_return};
3739

3840
/// A stable struct for sharing a [`ExecutionPlan`] across FFI boundaries.
3941
#[repr(C)]
@@ -45,6 +47,9 @@ pub struct FFI_ExecutionPlan {
4547
/// Return a vector of children plans
4648
pub children: unsafe extern "C" fn(plan: &Self) -> RVec<FFI_ExecutionPlan>,
4749

50+
pub with_new_children:
51+
unsafe extern "C" fn(plan: &Self, children: RVec<Self>) -> FFIResult<Self>,
52+
4853
/// Return the plan name.
4954
pub name: unsafe extern "C" fn(plan: &Self) -> RString,
5055

@@ -56,6 +61,12 @@ pub struct FFI_ExecutionPlan {
5661
context: FFI_TaskContext,
5762
) -> FFIResult<FFI_RecordBatchStream>,
5863

64+
pub repartitioned: unsafe extern "C" fn(
65+
plan: &Self,
66+
target_partitions: usize,
67+
config: FFI_ConfigOptions,
68+
) -> FFIResult<ROption<FFI_ExecutionPlan>>,
69+
5970
/// Used to create a clone on the provider of the execution plan. This should
6071
/// only need to be called by the receiver of the plan.
6172
pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
@@ -86,6 +97,11 @@ impl FFI_ExecutionPlan {
8697
let private_data = self.private_data as *const ExecutionPlanPrivateData;
8798
unsafe { &(*private_data).plan }
8899
}
100+
101+
fn runtime(&self) -> Option<Handle> {
102+
let private_data = self.private_data as *const ExecutionPlanPrivateData;
103+
unsafe { (*private_data).runtime.clone() }
104+
}
89105
}
90106

91107
unsafe extern "C" fn properties_fn_wrapper(
@@ -97,37 +113,69 @@ unsafe extern "C" fn properties_fn_wrapper(
97113
unsafe extern "C" fn children_fn_wrapper(
98114
plan: &FFI_ExecutionPlan,
99115
) -> RVec<FFI_ExecutionPlan> {
100-
unsafe {
101-
let private_data = plan.private_data as *const ExecutionPlanPrivateData;
102-
let plan = &(*private_data).plan;
103-
let runtime = &(*private_data).runtime;
116+
let runtime = plan.runtime();
117+
let plan = plan.inner();
104118

105-
let children: Vec<_> = plan
106-
.children()
107-
.into_iter()
108-
.map(|child| FFI_ExecutionPlan::new(Arc::clone(child), runtime.clone()))
109-
.collect();
119+
let children: Vec<_> = plan
120+
.children()
121+
.into_iter()
122+
.map(|child| FFI_ExecutionPlan::new(Arc::clone(child), runtime.clone()))
123+
.collect();
110124

111-
children.into()
112-
}
125+
children.into()
126+
}
127+
128+
unsafe extern "C" fn with_new_children_fn_wrapper(
129+
plan: &FFI_ExecutionPlan,
130+
children: RVec<FFI_ExecutionPlan>,
131+
) -> FFIResult<FFI_ExecutionPlan> {
132+
let runtime = plan.runtime();
133+
let plan = Arc::clone(plan.inner());
134+
let children = rresult_return!(
135+
children
136+
.iter()
137+
.map(<Arc<dyn ExecutionPlan>>::try_from)
138+
.collect::<Result<Vec<_>>>()
139+
);
140+
141+
let new_plan = rresult_return!(plan.with_new_children(children));
142+
143+
RResult::ROk(FFI_ExecutionPlan::new(new_plan, runtime))
113144
}
114145

115146
unsafe extern "C" fn execute_fn_wrapper(
116147
plan: &FFI_ExecutionPlan,
117148
partition: usize,
118149
context: FFI_TaskContext,
119150
) -> FFIResult<FFI_RecordBatchStream> {
120-
unsafe {
121-
let ctx = context.into();
122-
let private_data = plan.private_data as *const ExecutionPlanPrivateData;
123-
let plan = &(*private_data).plan;
124-
let runtime = (*private_data).runtime.clone();
125-
126-
rresult!(
127-
plan.execute(partition, ctx)
128-
.map(|rbs| FFI_RecordBatchStream::new(rbs, runtime))
129-
)
130-
}
151+
let ctx = context.into();
152+
let runtime = plan.runtime();
153+
let plan = plan.inner();
154+
155+
let _guard = runtime.as_ref().map(|rt| rt.enter());
156+
157+
rresult!(
158+
plan.execute(partition, ctx)
159+
.map(|rbs| FFI_RecordBatchStream::new(rbs, runtime))
160+
)
161+
}
162+
163+
unsafe extern "C" fn repartitioned_fn_wrapper(
164+
plan: &FFI_ExecutionPlan,
165+
target_partitions: usize,
166+
config: FFI_ConfigOptions,
167+
) -> FFIResult<ROption<FFI_ExecutionPlan>> {
168+
let maybe_config: Result<ConfigOptions, DataFusionError> = config.try_into();
169+
let config = rresult_return!(maybe_config);
170+
let runtime = plan.runtime();
171+
let plan = plan.inner();
172+
173+
rresult!(
174+
plan.repartitioned(target_partitions, &config)
175+
.map(|maybe_plan| maybe_plan
176+
.map(|plan| FFI_ExecutionPlan::new(plan, runtime))
177+
.into())
178+
)
131179
}
132180

133181
unsafe extern "C" fn name_fn_wrapper(plan: &FFI_ExecutionPlan) -> RString {
@@ -145,12 +193,10 @@ unsafe extern "C" fn release_fn_wrapper(plan: &mut FFI_ExecutionPlan) {
145193
}
146194

147195
unsafe extern "C" fn clone_fn_wrapper(plan: &FFI_ExecutionPlan) -> FFI_ExecutionPlan {
148-
unsafe {
149-
let private_data = plan.private_data as *const ExecutionPlanPrivateData;
150-
let plan_data = &(*private_data);
196+
let runtime = plan.runtime();
197+
let plan = plan.inner();
151198

152-
FFI_ExecutionPlan::new(Arc::clone(&plan_data.plan), plan_data.runtime.clone())
153-
}
199+
FFI_ExecutionPlan::new(Arc::clone(plan), runtime)
154200
}
155201

156202
impl Clone for FFI_ExecutionPlan {
@@ -162,13 +208,18 @@ impl Clone for FFI_ExecutionPlan {
162208
impl FFI_ExecutionPlan {
163209
/// This function is called on the provider's side.
164210
pub fn new(plan: Arc<dyn ExecutionPlan>, runtime: Option<Handle>) -> Self {
165-
let private_data = Box::new(ExecutionPlanPrivateData { plan, runtime });
211+
if let Some(plan) = plan.as_any().downcast_ref::<ForeignExecutionPlan>() {
212+
return plan.plan.clone();
213+
}
166214

215+
let private_data = Box::new(ExecutionPlanPrivateData { plan, runtime });
167216
Self {
168217
properties: properties_fn_wrapper,
169218
children: children_fn_wrapper,
219+
with_new_children: with_new_children_fn_wrapper,
170220
name: name_fn_wrapper,
171221
execute: execute_fn_wrapper,
222+
repartitioned: repartitioned_fn_wrapper,
172223
clone: clone_fn_wrapper,
173224
release: release_fn_wrapper,
174225
private_data: Box::into_raw(private_data) as *mut c_void,
@@ -275,12 +326,16 @@ impl ExecutionPlan for ForeignExecutionPlan {
275326
self: Arc<Self>,
276327
children: Vec<Arc<dyn ExecutionPlan>>,
277328
) -> Result<Arc<dyn ExecutionPlan>> {
278-
Ok(Arc::new(ForeignExecutionPlan {
279-
plan: self.plan.clone(),
280-
name: self.name.clone(),
281-
children,
282-
properties: Arc::clone(&self.properties),
283-
}))
329+
unsafe {
330+
let children = children
331+
.into_iter()
332+
.map(|child| FFI_ExecutionPlan::new(child, None))
333+
.collect::<RVec<_>>();
334+
let new_plan =
335+
df_result!((self.plan.with_new_children)(&self.plan, children))?;
336+
337+
(&new_plan).try_into()
338+
}
284339
}
285340

286341
fn execute(
@@ -310,6 +365,22 @@ impl ExecutionPlan for ForeignExecutionPlan {
310365
}
311366
Ok(tnr)
312367
}
368+
369+
fn repartitioned(
370+
&self,
371+
target_partitions: usize,
372+
config: &ConfigOptions,
373+
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
374+
let config = config.into();
375+
let maybe_plan: Option<FFI_ExecutionPlan> = df_result!(unsafe {
376+
(self.plan.repartitioned)(&self.plan, target_partitions, config)
377+
})?
378+
.into();
379+
380+
maybe_plan
381+
.map(|plan| <Arc<dyn ExecutionPlan>>::try_from(&plan))
382+
.transpose()
383+
}
313384
}
314385

315386
#[cfg(test)]

datafusion/ffi/src/physical_expr/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,10 @@ impl Drop for FFI_PhysicalExpr {
448448
impl From<Arc<dyn PhysicalExpr>> for FFI_PhysicalExpr {
449449
/// Creates a new [`FFI_PhysicalExpr`].
450450
fn from(expr: Arc<dyn PhysicalExpr>) -> Self {
451+
if let Some(expr) = expr.as_any().downcast_ref::<ForeignPhysicalExpr>() {
452+
return expr.expr.clone();
453+
}
454+
451455
let private_data = Box::new(PhysicalExprPrivateData { expr });
452456

453457
Self {

datafusion/ffi/src/proto/physical_extension_codec.rs

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -111,14 +111,14 @@ unsafe impl Send for FFI_PhysicalExtensionCodec {}
111111
unsafe impl Sync for FFI_PhysicalExtensionCodec {}
112112

113113
struct PhysicalExtensionCodecPrivateData {
114-
provider: Arc<dyn PhysicalExtensionCodec>,
114+
codec: Arc<dyn PhysicalExtensionCodec>,
115115
runtime: Option<Handle>,
116116
}
117117

118118
impl FFI_PhysicalExtensionCodec {
119119
fn inner(&self) -> &Arc<dyn PhysicalExtensionCodec> {
120120
let private_data = self.private_data as *const PhysicalExtensionCodecPrivateData;
121-
unsafe { &(*private_data).provider }
121+
unsafe { &(*private_data).codec }
122122
}
123123

124124
fn runtime(&self) -> &Option<Handle> {
@@ -132,6 +132,7 @@ unsafe extern "C" fn try_decode_fn_wrapper(
132132
buf: RSlice<u8>,
133133
inputs: RVec<FFI_ExecutionPlan>,
134134
) -> FFIResult<FFI_ExecutionPlan> {
135+
let runtime = codec.runtime().clone();
135136
let task_ctx: Arc<TaskContext> =
136137
rresult_return!((&codec.task_ctx_provider).try_into());
137138
let codec = codec.inner();
@@ -144,7 +145,7 @@ unsafe extern "C" fn try_decode_fn_wrapper(
144145
let plan =
145146
rresult_return!(codec.try_decode(buf.as_ref(), &inputs, task_ctx.as_ref()));
146147

147-
RResult::ROk(FFI_ExecutionPlan::new(plan, None))
148+
RResult::ROk(FFI_ExecutionPlan::new(plan, runtime))
148149
}
149150

150151
unsafe extern "C" fn try_encode_fn_wrapper(
@@ -240,11 +241,10 @@ unsafe extern "C" fn try_encode_udwf_fn_wrapper(
240241
RResult::ROk(bytes.into())
241242
}
242243

243-
unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_PhysicalExtensionCodec) {
244+
unsafe extern "C" fn release_fn_wrapper(codec: &mut FFI_PhysicalExtensionCodec) {
244245
unsafe {
245-
let private_data = Box::from_raw(
246-
provider.private_data as *mut PhysicalExtensionCodecPrivateData,
247-
);
246+
let private_data =
247+
Box::from_raw(codec.private_data as *mut PhysicalExtensionCodecPrivateData);
248248
drop(private_data);
249249
}
250250
}
@@ -267,13 +267,19 @@ impl Drop for FFI_PhysicalExtensionCodec {
267267
impl FFI_PhysicalExtensionCodec {
268268
/// Creates a new [`FFI_PhysicalExtensionCodec`].
269269
pub fn new(
270-
provider: Arc<dyn PhysicalExtensionCodec + Send>,
270+
codec: Arc<dyn PhysicalExtensionCodec + Send>,
271271
runtime: Option<Handle>,
272272
task_ctx_provider: impl Into<FFI_TaskContextProvider>,
273273
) -> Self {
274+
if let Some(codec) = codec
275+
.as_any()
276+
.downcast_ref::<ForeignPhysicalExtensionCodec>()
277+
{
278+
return codec.0.clone();
279+
}
280+
274281
let task_ctx_provider = task_ctx_provider.into();
275-
let private_data =
276-
Box::new(PhysicalExtensionCodecPrivateData { provider, runtime });
282+
let private_data = Box::new(PhysicalExtensionCodecPrivateData { codec, runtime });
277283

278284
Self {
279285
try_decode: try_decode_fn_wrapper,
@@ -306,11 +312,11 @@ unsafe impl Send for ForeignPhysicalExtensionCodec {}
306312
unsafe impl Sync for ForeignPhysicalExtensionCodec {}
307313

308314
impl From<&FFI_PhysicalExtensionCodec> for Arc<dyn PhysicalExtensionCodec> {
309-
fn from(provider: &FFI_PhysicalExtensionCodec) -> Self {
310-
if (provider.library_marker_id)() == crate::get_library_marker_id() {
311-
Arc::clone(provider.inner())
315+
fn from(codec: &FFI_PhysicalExtensionCodec) -> Self {
316+
if (codec.library_marker_id)() == crate::get_library_marker_id() {
317+
Arc::clone(codec.inner())
312318
} else {
313-
Arc::new(ForeignPhysicalExtensionCodec(provider.clone()))
319+
Arc::new(ForeignPhysicalExtensionCodec(codec.clone()))
314320
}
315321
}
316322
}
@@ -403,6 +409,10 @@ impl PhysicalExtensionCodec for ForeignPhysicalExtensionCodec {
403409

404410
Ok(())
405411
}
412+
413+
fn as_any(&self) -> &dyn std::any::Any {
414+
self
415+
}
406416
}
407417

408418
#[cfg(test)]
@@ -558,6 +568,10 @@ pub(crate) mod tests {
558568

559569
Ok(())
560570
}
571+
572+
fn as_any(&self) -> &dyn std::any::Any {
573+
self
574+
}
561575
}
562576

563577
fn create_test_exec() -> Arc<dyn ExecutionPlan> {

0 commit comments

Comments
 (0)