Skip to content

Commit 7d217b1

Browse files
adriangbclaude
andauthored
Reduce ExtractLeafExpressions optimizer overhead with fast pre-scan (#20341)
## Summary Follow-up to #20117 which added the `ExtractLeafExpressions` and `PushDownLeafProjections` optimizer rules for get_field pushdown. Benchmarking revealed that these rules added 5-31% overhead on *all* queries (including those with no struct/get_field expressions) because they unconditionally allocated column HashSets, extractors, and walked every expression tree for every Filter/Sort/Limit/Aggregate/Join node. This PR adds: - **`has_extractable_expr()` pre-scan**: A lightweight check using `Expr::exists()` that short-circuits before any expensive allocations when no `MoveTowardsLeafNodes` expressions are present - **Config option** `datafusion.optimizer.enable_leaf_expression_pushdown` to disable the rules entirely ### Benchmark Results (vs no-rules baseline) | Benchmark | Before Fix | After Fix | |---|---|---| | physical_select_aggregates_from_200 | +31.1% | +3.7% | | physical_many_self_joins | +12.9% | +2.2% | | physical_join_consider_sort | +12.9% | +1.0% | | physical_unnest_to_join | +12.5% | +1.4% | | physical_select_one_from_700 | +12.2% | +2.6% | | physical_theta_join_consider_sort | +8.7% | +0.2% | | physical_plan_tpch_q18 | +9.3% | +1.4% | | physical_plan_tpch_all | +4.8% | +2.1% | | physical_plan_tpcds_all | +5.6% | +2.2% | ## Test plan - [x] All 47 `extract_leaf_expressions` unit tests pass - [x] Benchmarked with `cargo bench -p datafusion --bench sql_planner` 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent bdad206 commit 7d217b1

4 files changed

Lines changed: 43 additions & 0 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1142,6 +1142,12 @@ config_namespace! {
11421142
///
11431143
/// Default: true
11441144
pub enable_sort_pushdown: bool, default = true
1145+
1146+
/// When set to true, the optimizer will extract leaf expressions
1147+
/// (such as `get_field`) from filter/sort/join nodes into projections
1148+
/// closer to the leaf table scans, and push those projections down
1149+
/// towards the leaf nodes.
1150+
pub enable_leaf_expression_pushdown: bool, default = true
11451151
}
11461152
}
11471153

datafusion/optimizer/src/extract_leaf_expressions.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,20 @@ use crate::{OptimizerConfig, OptimizerRule};
4343
/// in user queries.
4444
const EXTRACTED_EXPR_PREFIX: &str = "__datafusion_extracted";
4545

46+
/// Returns `true` if any sub-expression in `exprs` has
47+
/// [`ExpressionPlacement::MoveTowardsLeafNodes`] placement.
48+
///
49+
/// This is a lightweight pre-check that short-circuits as soon as one
50+
/// extractable expression is found, avoiding the expensive allocations
51+
/// (column HashSets, extractors, expression rewrites) that the full
52+
/// extraction pipeline requires.
53+
fn has_extractable_expr(exprs: &[Expr]) -> bool {
54+
exprs.iter().any(|expr| {
55+
expr.exists(|e| Ok(e.placement() == ExpressionPlacement::MoveTowardsLeafNodes))
56+
.unwrap_or(false)
57+
})
58+
}
59+
4660
/// Extracts `MoveTowardsLeafNodes` sub-expressions from non-projection nodes
4761
/// into **extraction projections** (pass 1 of 2).
4862
///
@@ -109,6 +123,9 @@ impl OptimizerRule for ExtractLeafExpressions {
109123
plan: LogicalPlan,
110124
config: &dyn OptimizerConfig,
111125
) -> Result<Transformed<LogicalPlan>> {
126+
if !config.options().optimizer.enable_leaf_expression_pushdown {
127+
return Ok(Transformed::no(plan));
128+
}
112129
let alias_generator = config.alias_generator();
113130
extract_from_plan(plan, alias_generator)
114131
}
@@ -144,6 +161,11 @@ fn extract_from_plan(
144161
return Ok(Transformed::no(plan));
145162
}
146163

164+
// Fast pre-check: skip all allocations if no extractable expressions exist
165+
if !has_extractable_expr(&plan.expressions()) {
166+
return Ok(Transformed::no(plan));
167+
}
168+
147169
// Save original output schema before any transformation
148170
let original_schema = Arc::clone(plan.schema());
149171

@@ -693,6 +715,9 @@ impl OptimizerRule for PushDownLeafProjections {
693715
plan: LogicalPlan,
694716
config: &dyn OptimizerConfig,
695717
) -> Result<Transformed<LogicalPlan>> {
718+
if !config.options().optimizer.enable_leaf_expression_pushdown {
719+
return Ok(Transformed::no(plan));
720+
}
696721
let alias_generator = config.alias_generator();
697722
match try_push_input(&plan, alias_generator)? {
698723
Some(new_plan) => Ok(Transformed::yes(new_plan)),
@@ -750,6 +775,15 @@ fn split_and_push_projection(
750775
proj: &Projection,
751776
alias_generator: &Arc<AliasGenerator>,
752777
) -> Result<Option<LogicalPlan>> {
778+
// Fast pre-check: skip if there are no pre-existing extracted aliases
779+
// and no new extractable expressions.
780+
let has_existing_extracted = proj.expr.iter().any(|e| {
781+
matches!(e, Expr::Alias(alias) if alias.name.starts_with(EXTRACTED_EXPR_PREFIX))
782+
});
783+
if !has_existing_extracted && !has_extractable_expr(&proj.expr) {
784+
return Ok(None);
785+
}
786+
753787
let input = &proj.input;
754788
let input_schema = input.schema();
755789

datafusion/sqllogictest/test_files/information_schema.slt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,7 @@ datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown true
297297
datafusion.optimizer.enable_distinct_aggregation_soft_limit true
298298
datafusion.optimizer.enable_dynamic_filter_pushdown true
299299
datafusion.optimizer.enable_join_dynamic_filter_pushdown true
300+
datafusion.optimizer.enable_leaf_expression_pushdown true
300301
datafusion.optimizer.enable_piecewise_merge_join false
301302
datafusion.optimizer.enable_round_robin_repartition true
302303
datafusion.optimizer.enable_sort_pushdown true
@@ -434,6 +435,7 @@ datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown true When set to t
434435
datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read.
435436
datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden.
436437
datafusion.optimizer.enable_join_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase.
438+
datafusion.optimizer.enable_leaf_expression_pushdown true When set to true, the optimizer will extract leaf expressions (such as `get_field`) from filter/sort/join nodes into projections closer to the leaf table scans, and push those projections down towards the leaf nodes.
437439
datafusion.optimizer.enable_piecewise_merge_join false When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently experimental. Physical planner will opt for PiecewiseMergeJoin when there is only one range filter.
438440
datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores
439441
datafusion.optimizer.enable_sort_pushdown true Enable sort pushdown optimization. When enabled, attempts to push sort requirements down to data sources that can natively handle them (e.g., by reversing file/row group read order). Returns **inexact ordering**: Sort operator is kept for correctness, but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N), providing significant speedup. Memory: No additional overhead (only changes read order). Future: Will add option to detect perfectly sorted data and eliminate Sort completely. Default: true

docs/source/user-guide/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ The following configuration settings are available:
165165
| datafusion.optimizer.prefer_existing_union | false | When set to true, the optimizer will not attempt to convert Union to Interleave |
166166
| datafusion.optimizer.expand_views_at_output | false | When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. |
167167
| datafusion.optimizer.enable_sort_pushdown | true | Enable sort pushdown optimization. When enabled, attempts to push sort requirements down to data sources that can natively handle them (e.g., by reversing file/row group read order). Returns **inexact ordering**: Sort operator is kept for correctness, but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N), providing significant speedup. Memory: No additional overhead (only changes read order). Future: Will add option to detect perfectly sorted data and eliminate Sort completely. Default: true |
168+
| datafusion.optimizer.enable_leaf_expression_pushdown | true | When set to true, the optimizer will extract leaf expressions (such as `get_field`) from filter/sort/join nodes into projections closer to the leaf table scans, and push those projections down towards the leaf nodes. |
168169
| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans |
169170
| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans |
170171
| datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |

0 commit comments

Comments
 (0)