Skip to content

Commit 5c2f852

Browse files
committed
feat: make execution transfomation application rule generic
Currently, it depends on the particular execution transformation that resolves placeholders. However, it would be more convenient to be able to specify the rule which is required to be applied to the plan.
1 parent 6fb8d7f commit 5c2f852

8 files changed

Lines changed: 97 additions & 112 deletions

File tree

datafusion/core/tests/physical_optimizer/physical_expr_resolver.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use datafusion_physical_expr::{
2929
expressions::{BinaryExpr, col, lit, placeholder},
3030
};
3131
use datafusion_physical_optimizer::{
32-
PhysicalOptimizerRule, physical_expr_resolver::PhysicalExprResolver,
32+
PhysicalOptimizerRule, exec_transform_apply::PhysicalExprResolver,
3333
};
3434
use datafusion_physical_plan::{
3535
ExecutionPlan, filter::FilterExec, get_plan_string,

datafusion/physical-optimizer/src/physical_expr_resolver.rs renamed to datafusion/physical-optimizer/src/exec_transform_apply.rs

Lines changed: 42 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,10 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! [`PhysicalExprResolver`] ensures that the physical plan is prepared for placeholder resolution
19-
//! by wrapping it in a [`TransformPlanExec`] with a [`ResolvePlaceholdersRule`] if the plan
20-
//! contains any unresolved placeholders. The actual resolution happens during execution.
18+
//! [`ExecutionTransformationApplier`] ensures that the required execution transformations
19+
//! are applied to the physical plan.
2120
22-
use std::sync::Arc;
21+
use std::{borrow::Cow, sync::Arc};
2322

2423
use datafusion_common::{
2524
Result,
@@ -28,61 +27,67 @@ use datafusion_common::{
2827
};
2928
use datafusion_physical_plan::{
3029
ExecutionPlan,
31-
plan_transformer::{ResolvePlaceholdersRule, TransformPlanExec},
30+
plan_transformer::{ExecutionTransformationRule, TransformPlanExec},
3231
};
3332

3433
use crate::PhysicalOptimizerRule;
3534

36-
/// The phase in which the [`PhysicalExprResolver`] rule is applied.
37-
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38-
pub enum PhysicalExprResolverPhase {
35+
/// The phase in which the [`ExecutionTransformationApplier`] rule is applied.
36+
#[derive(Debug)]
37+
pub enum ExecutionTransformationApplierPhase {
3938
/// Optimization that happens before most other optimizations.
4039
/// This optimization removes all [`TransformPlanExec`] execution plans from the plan
4140
/// tree.
4241
Pre,
4342
/// Optimization that happens after most other optimizations.
44-
/// This optimization checks for the presence of placeholders in the optimized plan, and if
45-
/// they are present, wraps the plan in a [`TransformPlanExec`] with a [`ResolvePlaceholdersRule`].
46-
Post,
43+
/// This optimization checks if `rule` requires to transform the plan and wraps the plan with
44+
/// [`TransformPlanExec`] if it so, or adds rule to the existing transformation node.
45+
Post {
46+
rule: Arc<dyn ExecutionTransformationRule>,
47+
},
4748
}
4849

49-
/// Physical optimizer rule that prepares the plan for placeholder resolution during execution.
50+
/// Physical optimizer rule that wraps the plan with a certain execution-stage transformation.
5051
#[derive(Debug)]
51-
pub struct PhysicalExprResolver {
52-
phase: PhysicalExprResolverPhase,
52+
pub struct ExecutionTransformationApplier {
53+
phase: ExecutionTransformationApplierPhase,
54+
name: Cow<'static, str>,
5355
}
5456

55-
impl PhysicalExprResolver {
56-
/// Creates a new [`PhysicalExprResolver`] optimizer rule that runs in the pre-optimization
57-
/// phase. In this phase, the rule removes any existing [`TransformPlanExec`] from the
58-
/// plan tree.
57+
impl ExecutionTransformationApplier {
58+
/// Creates a new [`ExecutionTransformationApplier`] optimizer rule that runs in the
59+
/// pre-optimization phase.
5960
pub fn new() -> Self {
6061
Self {
61-
phase: PhysicalExprResolverPhase::Pre,
62+
phase: ExecutionTransformationApplierPhase::Pre,
63+
name: Cow::Borrowed("ExecutionTransformationApplier"),
6264
}
6365
}
6466

65-
/// Creates a new [`PhysicalExprResolver`] optimizer rule that runs in the post-optimization
66-
/// phase. In this phase, the rule wraps the physical plan in a [`TransformPlanExec`] with a
67-
/// [`ResolvePlaceholdersRule`] if the plan contains any unresolved placeholders.
68-
pub fn new_post_optimization() -> Self {
67+
/// Creates a new [`ExecutionTransformationApplier`] optimizer rule that runs in the
68+
/// post-optimization phase.
69+
pub fn new_post_optimization(rule: Arc<dyn ExecutionTransformationRule>) -> Self {
70+
let name = format!("ExecutionTransformationApplier({})", rule.name());
6971
Self {
70-
phase: PhysicalExprResolverPhase::Post,
72+
phase: ExecutionTransformationApplierPhase::Post { rule },
73+
name: name.into(),
7174
}
7275
}
7376
}
7477

75-
impl Default for PhysicalExprResolver {
78+
impl Default for ExecutionTransformationApplier {
7679
fn default() -> Self {
7780
Self::new()
7881
}
7982
}
8083

81-
impl PhysicalOptimizerRule for PhysicalExprResolver {
84+
impl PhysicalOptimizerRule for ExecutionTransformationApplier {
8285
fn name(&self) -> &str {
8386
match self.phase {
84-
PhysicalExprResolverPhase::Pre => "PhysicalExprResolver",
85-
PhysicalExprResolverPhase::Post => "PhysicalExprResolver(Post)",
87+
ExecutionTransformationApplierPhase::Pre => "PhysicalExprResolver",
88+
ExecutionTransformationApplierPhase::Post { .. } => {
89+
"PhysicalExprResolver(Post)"
90+
}
8691
}
8792
}
8893

@@ -91,8 +96,8 @@ impl PhysicalOptimizerRule for PhysicalExprResolver {
9196
plan: Arc<dyn ExecutionPlan>,
9297
_config: &ConfigOptions,
9398
) -> Result<Arc<dyn ExecutionPlan>> {
94-
match self.phase {
95-
PhysicalExprResolverPhase::Pre => plan
99+
match &self.phase {
100+
ExecutionTransformationApplierPhase::Pre => plan
96101
.transform_up(|plan| {
97102
if let Some(plan) = plan.as_any().downcast_ref::<TransformPlanExec>()
98103
{
@@ -102,26 +107,22 @@ impl PhysicalOptimizerRule for PhysicalExprResolver {
102107
}
103108
})
104109
.map(|t| t.data),
105-
PhysicalExprResolverPhase::Post => {
110+
ExecutionTransformationApplierPhase::Post { rule } => {
106111
if let Some(transformer) =
107112
plan.as_any().downcast_ref::<TransformPlanExec>()
108113
{
109-
let resolves_placeholders =
110-
transformer.has_rule::<ResolvePlaceholdersRule>();
111-
112-
if resolves_placeholders {
114+
let has_rule = transformer.has_dyn_rule(rule);
115+
if has_rule {
116+
// Rule is already applied.
113117
Ok(plan)
114118
} else {
115119
transformer
116-
.add_rule(Box::new(ResolvePlaceholdersRule::new()))
120+
.add_rule(Arc::clone(rule))
117121
.map(|r| Arc::new(r) as Arc<_>)
118122
}
119123
} else {
120-
let transformer = TransformPlanExec::try_new(
121-
plan,
122-
vec![Box::new(ResolvePlaceholdersRule::new())],
123-
)?;
124-
124+
let transformer =
125+
TransformPlanExec::try_new(plan, vec![Arc::clone(rule)])?;
125126
if transformer.plans_to_transform() > 0 {
126127
Ok(Arc::new(transformer))
127128
} else {

datafusion/physical-optimizer/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ pub mod limit_pushdown_past_window;
3737
pub mod limited_distinct_aggregation;
3838
pub mod optimizer;
3939
pub mod output_requirements;
40-
pub mod physical_expr_resolver;
40+
pub mod exec_transform_apply;
4141
pub mod projection_pushdown;
4242
pub use datafusion_pruning as pruning;
4343
pub mod pushdown_sort;

datafusion/physical-optimizer/src/optimizer.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use crate::join_selection::JoinSelection;
3030
use crate::limit_pushdown::LimitPushdown;
3131
use crate::limited_distinct_aggregation::LimitedDistinctAggregation;
3232
use crate::output_requirements::OutputRequirements;
33-
use crate::physical_expr_resolver::PhysicalExprResolver;
33+
use crate::exec_transform_apply::ExecutionTransformationApplier;
3434
use crate::projection_pushdown::ProjectionPushdown;
3535
use crate::sanity_checker::SanityCheckPlan;
3636
use crate::topk_aggregation::TopKAggregation;
@@ -41,6 +41,7 @@ use crate::pushdown_sort::PushdownSort;
4141
use datafusion_common::Result;
4242
use datafusion_common::config::ConfigOptions;
4343
use datafusion_physical_plan::ExecutionPlan;
44+
use datafusion_physical_plan::plan_transformer::ResolvePlaceholdersRule;
4445

4546
/// `PhysicalOptimizerRule` transforms one ['ExecutionPlan'] into another which
4647
/// computes the same results, but in a potentially more efficient way.
@@ -88,7 +89,7 @@ impl PhysicalOptimizer {
8889
// this information is not lost across different rules during optimization.
8990
Arc::new(OutputRequirements::new_add_mode()),
9091
// This rule removes all existing `TransformPlanExec` nodes from the plan tree.
91-
Arc::new(PhysicalExprResolver::new()),
92+
Arc::new(ExecutionTransformationApplier::new()),
9293
Arc::new(AggregateStatistics::new()),
9394
// Statistics-based join selection will change the Auto mode to a real join implementation,
9495
// like collect left, or hash join, or future sort merge join, which will influence the
@@ -151,7 +152,9 @@ impl PhysicalOptimizer {
151152
// This rule prepares the physical plan for placeholder resolution by wrapping it in a
152153
// `TransformPlanExec` with a `ResolvePlaceholdersRule` if it contains any unresolved
153154
// placeholders.
154-
Arc::new(PhysicalExprResolver::new_post_optimization()),
155+
Arc::new(ExecutionTransformationApplier::new_post_optimization(
156+
Arc::new(ResolvePlaceholdersRule::new()),
157+
)),
155158
// This FilterPushdown handles dynamic filters that may have references to the source ExecutionPlan.
156159
// Therefore it should be run at the end of the optimization process since any changes to the plan may break the dynamic filter's references.
157160
// See `FilterPushdownPhase` for more details.

datafusion/physical-plan/benches/plan_transformer.rs

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,7 @@ impl ExecutionTransformationRule for ResetAllRule {
4646
self
4747
}
4848

49-
fn clone_box(&self) -> Box<dyn ExecutionTransformationRule> {
50-
Box::new(self.clone())
51-
}
52-
53-
fn matches(&mut self, _node: &Arc<dyn ExecutionPlan>) -> Result<bool> {
49+
fn matches(&self, _node: &Arc<dyn ExecutionPlan>) -> Result<bool> {
5450
Ok(true)
5551
}
5652

@@ -77,11 +73,7 @@ impl ExecutionTransformationRule for ResetByNameRule {
7773
self
7874
}
7975

80-
fn clone_box(&self) -> Box<dyn ExecutionTransformationRule> {
81-
Box::new(self.clone())
82-
}
83-
84-
fn matches(&mut self, node: &Arc<dyn ExecutionPlan>) -> Result<bool> {
76+
fn matches(&self, node: &Arc<dyn ExecutionPlan>) -> Result<bool> {
8577
Ok(node.name() == self.node_name)
8678
}
8779

@@ -166,7 +158,7 @@ fn benchmark_with_transformer_exec(
166158
group: &mut BenchmarkGroup<'_, WallTime>,
167159
batch_label: &str,
168160
plan: &Arc<dyn ExecutionPlan>,
169-
rules: &[Box<dyn ExecutionTransformationRule>],
161+
rules: &[Arc<dyn ExecutionTransformationRule>],
170162
batch_size: BatchSize,
171163
) {
172164
let ctx = Arc::new(TaskContext::default());
@@ -182,7 +174,7 @@ fn benchmark_with_transformer_exec(
182174
|| {
183175
(
184176
Arc::clone(plan),
185-
rules.iter().map(|r| r.clone_box()).collect(),
177+
rules.to_vec(),
186178
)
187179
},
188180
|(plan, rules)| {
@@ -201,8 +193,7 @@ fn benchmark_with_transformer_exec(
201193
),
202194
|b| {
203195
let plan = Arc::clone(plan);
204-
let rules = rules.iter().map(|r| r.clone_box()).collect();
205-
let transformer = Arc::new(TransformPlanExec::try_new(plan, rules).unwrap());
196+
let transformer = Arc::new(TransformPlanExec::try_new(plan, rules.to_vec()).unwrap());
206197

207198
b.iter_batched(
208199
|| Arc::clone(&transformer),
@@ -257,14 +248,14 @@ fn criterion_benchmark(c: &mut Criterion) {
257248

258249
for count in rules_count {
259250
let reset_all_rules = (0..count)
260-
.map(|_| Box::new(ResetAllRule {}) as Box<_>)
251+
.map(|_| Arc::new(ResetAllRule {}) as _)
261252
.collect::<Vec<_>>();
262253

263254
let reset_one_rules = (0..count)
264255
.map(|_| {
265-
Box::new(ResetByNameRule {
256+
Arc::new(ResetByNameRule {
266257
node_name: "EmptyExec".to_string(),
267-
}) as Box<_>
258+
}) as _
268259
})
269260
.collect::<Vec<_>>();
270261

0 commit comments

Comments
 (0)