Skip to content

Commit 9c6a35f

Browse files
adriangbclaude
andauthored
feat: add ExtractLeafExpressions optimizer rule for get_field pushdown (#20117)
## Summary Adds a two-pass optimizer pipeline (`ExtractLeafExpressions` + `PushDownLeafProjections`) that pushes cheap `MoveTowardsLeafNodes` expressions (like `get_field` for struct field access) closer to data sources, enabling source-level optimizations such as Parquet column pruning for nested struct fields. ### Motivation Previously, `get_field(s, 'label')` stayed in the top-level `ProjectionExec`, forcing the scan to read the entire struct column `s`. With this change, `get_field` is pushed all the way down to `DataSourceExec`, allowing Parquet to read only the referenced sub-columns. ### Example ```sql SELECT id, s['label'] FROM t WHERE s['value'] > 150 ``` **Before:** `get_field(s, 'label')` stayed in ProjectionExec, reading full struct `s` **After:** Both `get_field` expressions pushed to DataSourceExec: ``` DataSourceExec: projection=[get_field(s, value) as __datafusion_extracted_1, get_field(s, label) as __datafusion_extracted_2, id] ``` ### How It Works **Pass 1 — `ExtractLeafExpressions`** (top-down): For non-projection nodes (Filter, Sort, Limit, Aggregate, Join), extracts `MoveTowardsLeafNodes` sub-expressions into **extraction projections** below the node with `__datafusion_extracted_N` aliases, and adds **recovery projections** above to restore the original output schema. ```text -- Before: Filter: user['status'] = 'active' TableScan: t [id, user] -- After: Projection: id, user ← recovery projection Filter: __datafusion_extracted_1 = 'active' Projection: user['status'] AS __datafusion_extracted_1, id, user ← extraction projection TableScan: t [id, user] ``` **Pass 2 — `PushDownLeafProjections`** (top-down): Pushes extraction projections down through schema-preserving nodes (Filter, Sort, Limit) and merges them into existing projections. Also handles: - **Mixed projections** containing `MoveTowardsLeafNodes` sub-expressions — splits them into recovery + extraction, then pushes the extraction down - **Multi-input nodes** (Join, SubqueryAlias) — routes each extracted expression to the correct input based on column references - **SubqueryAlias** — remaps qualifiers from alias-space to input-space before routing After both passes, `OptimizeProjections` (which runs next) merges consecutive projections and pushes `get_field` to the scan. ### Changes by file - **`extract_leaf_expressions.rs`** (new, ~2800 lines): Both optimizer rules plus extensive unit tests - **`push_down_filter.rs`**: Teaches `PushDownFilter` to not push filters through extraction projections (would undo the extraction by rewriting `__datafusion_extracted_1 > 150` back to `get_field(s,'value') > 150`). Adds 2 unit tests for this behavior. - **`utils.rs`**: Adds `EXTRACTED_EXPR_PREFIX` constant and `is_extracted_expr_projection()` detection helper - **`optimizer.rs`**: Registers both new rules after `CommonSubexprEliminate` and before `OptimizeProjections` - **`expr.rs`**: Makes `Expr::Alias` delegate to the inner expression for `placement()`, so aliases around `get_field` are correctly classified as `MoveTowardsLeafNodes` - **`test/mod.rs`**: Adds `test_table_scan_with_struct()` test helper - **SLT files**: Updated explain plans reflecting extraction aliases in logical plans and pushed-down `get_field` in physical plans ### Interaction with other optimizer rules - **`PushDownFilter`**: Extraction projections are detected via `is_extracted_expr_projection()` and filters are NOT pushed through them - **`CommonSubexprEliminate`**: Runs before extraction; CSE aliases (`__common_expr_N`) are preserved and correctly handled during merge - **`OptimizeProjections`**: Runs after extraction; merges the recovery + extraction projections and pushes `get_field` to the scan ## Test plan - [x] ~1400 lines of unit tests in `extract_leaf_expressions.rs` covering: Filter, Sort, Limit, Aggregate, Join, SubqueryAlias, Union, nested projections, deduplication, idempotency, mixed projections, and multi-input routing - [x] 2 new unit tests in `push_down_filter.rs` for filter-through-extraction blocking - [x] Updated sqllogictest expectations in `projection_pushdown.slt`, `push_down_filter.slt`, `explain.slt`, `projection.slt`, `struct.slt`, `unnest.slt` - [x] All optimizer tests pass (`cargo test -p datafusion-optimizer`) 🤖 Generated with [Claude Code](https://claude.ai/code) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent f48dc72 commit 9c6a35f

9 files changed

Lines changed: 1951 additions & 286 deletions

File tree

datafusion/optimizer/src/extract_leaf_expressions.rs

Lines changed: 1571 additions & 87 deletions
Large diffs are not rendered by default.

datafusion/optimizer/src/optimize_projections/mod.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,19 @@ fn merge_consecutive_projections(proj: Projection) -> Result<Transformed<Project
559559
metadata,
560560
}) => rewrite_expr(*expr, &prev_projection).map(|result| {
561561
result.update_data(|expr| {
562-
Expr::Alias(Alias::new(expr, relation, name).with_metadata(metadata))
562+
// After substitution, the inner expression may now have the
563+
// same schema_name as the alias (e.g. when an extraction
564+
// alias like `__extracted_1 AS f(x)` is resolved back to
565+
// `f(x)`). Wrapping in a redundant self-alias causes a
566+
// cosmetic `f(x) AS f(x)` due to Display vs schema_name
567+
// formatting differences. Drop the alias when it matches.
568+
if metadata.is_none() && expr.schema_name().to_string() == name {
569+
expr
570+
} else {
571+
Expr::Alias(
572+
Alias::new(expr, relation, name).with_metadata(metadata),
573+
)
574+
}
563575
})
564576
}),
565577
e => rewrite_expr(e, &prev_projection),

datafusion/optimizer/src/optimizer.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use crate::eliminate_join::EliminateJoin;
4343
use crate::eliminate_limit::EliminateLimit;
4444
use crate::eliminate_outer_join::EliminateOuterJoin;
4545
use crate::extract_equijoin_predicate::ExtractEquijoinPredicate;
46+
use crate::extract_leaf_expressions::{ExtractLeafExpressions, PushDownLeafProjections};
4647
use crate::filter_null_join_keys::FilterNullJoinKeys;
4748
use crate::optimize_projections::OptimizeProjections;
4849
use crate::optimize_unions::OptimizeUnions;
@@ -260,6 +261,8 @@ impl Optimizer {
260261
// that might benefit from the following rules
261262
Arc::new(EliminateGroupByConstant::new()),
262263
Arc::new(CommonSubexprEliminate::new()),
264+
Arc::new(ExtractLeafExpressions::new()),
265+
Arc::new(PushDownLeafProjections::new()),
263266
Arc::new(OptimizeProjections::new()),
264267
];
265268

datafusion/sqllogictest/test_files/explain.slt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,8 @@ logical_plan after push_down_filter SAME TEXT AS ABOVE
197197
logical_plan after single_distinct_aggregation_to_group_by SAME TEXT AS ABOVE
198198
logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE
199199
logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE
200+
logical_plan after extract_leaf_expressions SAME TEXT AS ABOVE
201+
logical_plan after push_down_leaf_projections SAME TEXT AS ABOVE
200202
logical_plan after optimize_projections TableScan: simple_explain_test projection=[a, b, c]
201203
logical_plan after rewrite_set_comparison SAME TEXT AS ABOVE
202204
logical_plan after optimize_unions SAME TEXT AS ABOVE
@@ -219,6 +221,8 @@ logical_plan after push_down_filter SAME TEXT AS ABOVE
219221
logical_plan after single_distinct_aggregation_to_group_by SAME TEXT AS ABOVE
220222
logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE
221223
logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE
224+
logical_plan after extract_leaf_expressions SAME TEXT AS ABOVE
225+
logical_plan after push_down_leaf_projections SAME TEXT AS ABOVE
222226
logical_plan after optimize_projections SAME TEXT AS ABOVE
223227
logical_plan TableScan: simple_explain_test projection=[a, b, c]
224228
initial_physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true
@@ -558,6 +562,8 @@ logical_plan after push_down_filter SAME TEXT AS ABOVE
558562
logical_plan after single_distinct_aggregation_to_group_by SAME TEXT AS ABOVE
559563
logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE
560564
logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE
565+
logical_plan after extract_leaf_expressions SAME TEXT AS ABOVE
566+
logical_plan after push_down_leaf_projections SAME TEXT AS ABOVE
561567
logical_plan after optimize_projections TableScan: simple_explain_test projection=[a, b, c]
562568
logical_plan after rewrite_set_comparison SAME TEXT AS ABOVE
563569
logical_plan after optimize_unions SAME TEXT AS ABOVE
@@ -580,6 +586,8 @@ logical_plan after push_down_filter SAME TEXT AS ABOVE
580586
logical_plan after single_distinct_aggregation_to_group_by SAME TEXT AS ABOVE
581587
logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE
582588
logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE
589+
logical_plan after extract_leaf_expressions SAME TEXT AS ABOVE
590+
logical_plan after push_down_leaf_projections SAME TEXT AS ABOVE
583591
logical_plan after optimize_projections SAME TEXT AS ABOVE
584592
logical_plan TableScan: simple_explain_test projection=[a, b, c]
585593
initial_physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true

datafusion/sqllogictest/test_files/insert.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ ORDER BY c1
165165
----
166166
logical_plan
167167
01)Dml: op=[Insert Into] table=[table_without_values]
168-
02)--Projection: a1 AS a1, a2 AS a2
168+
02)--Projection: a1, a2
169169
03)----Sort: aggregate_test_100.c1 ASC NULLS LAST
170170
04)------Projection: sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS a1, count(Int64(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS a2, aggregate_test_100.c1
171171
05)--------WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(Int64(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]

0 commit comments

Comments
 (0)