Skip to content

Commit 9962911

Browse files
adriangbclaude
andauthored
feat: add ExpressionPlacement enum for optimizer expression placement decisions (#20065)
## Summary This PR is part of work towards #19387 Extracts the `ExpressionPlacement` enum from #20036 to provide a mechanism for expressions to indicate where they should be placed in the query plan for optimal execution. I've opted to go the route of having expressions declare their behavior via a new API on `enum Expr` and `trait PhysicalExpr`: ```rust enum Expr { pub fn placement(&self) -> ExpressionPlacement { ... } ... } ``` And: ```rust trait PhysicalExpr { fn placement(&self) -> ExpressionPlacement { ... } } ``` Where `ExpressionPlacement`: ```rust enum ExpressionPlacement { /// Argument is a literal constant value or an expression that can be /// evaluated to a constant at planning time. Literal, /// Argument is a simple column reference. Column, /// Argument is a complex expression that can be safely placed at leaf nodes. /// For example, if `get_field(struct_col, 'field_name')` is implemented as a /// leaf-pushable expression, then it would return this variant. /// Then `other_leaf_function(get_field(...), 42)` could also be classified as /// leaf-pushable using the knowledge that `get_field(...)` is leaf-pushable. PlaceAtLeaves, /// Argument is a complex expression that should be placed at root nodes. /// For example, `min(col1 + col2)` is not leaf-pushable because it requires per-row computation. PlaceAtRoot, } ``` We arrived at `ExprPlacement` after iterating through a version that had: ```rust enum ArgTriviality { Literal, Column, Trivial, NonTrivial, } ``` This terminology came from existing concepts in the codebase that were sprinkled around various places in the logical and physical layers. Some examples: https://github.com/apache/datafusion/blob/f819061833d0ee4d7899ed6a0a431c584533b241/datafusion/physical-plan/src/projection.rs#L282-L290 https://github.com/apache/datafusion/blob/f819061833d0ee4d7899ed6a0a431c584533b241/datafusion/physical-plan/src/projection.rs#L1120-L1125 https://github.com/apache/datafusion/blob/f819061833d0ee4d7899ed6a0a431c584533b241/datafusion/optimizer/src/optimize_projections/mod.rs#L589-L592 The new API adds the nuance / distinction of the case of `get_field(col, 'a')` where it is neither a column nor a literal but it is trivial. It also gives scalar functions the ability to classify themselves. This part was a bit tricky because `ScalarUDFImpl` (the scalar function trait that users implement) lives in `datafuions-expr` which cannot have references to `datafusion-physical-expr-common` (where `PhysicalExpr` is defined). But once we are in the physical layer scalar functions are represented as `func: ScalarUDFImpl, args: Vec<Arc<dyn PhysicalExpr>>`. And since we can't have a trait method referencing `PhysicalExpr` there would be no way to ask a function to classify itself in the physical layer. Additionally even if we could refer to `PhysicalExpr` from the `ScalarUDFImpl` trait we would then need 2 methods with similar but divergent logic (match on the `Expr` enum in one, downcast to various known types in the physical version) that adds boilerplate for implementers. The `ExprPlacement` enum solves this problem: we can have a single method `ScalarUDFImpl::placement(args: &[ExpressionPlacement])`. The parent of `ScalarUDFImpl` will call either `Expr::placement` or `PhysicalExpr::placement` depending on which one it has. ## Changes - Add `ExpressionPlacement` enum in `datafusion-expr-common` with four variants: - `Literal` - constant values - `Column` - simple column references - `PlaceAtLeaves` - cheap expressions (like `get_field`) that can be pushed to leaf nodes - `PlaceAtRoot` - expensive expressions that should stay at root - Add `placement()` method to: - `Expr` enum - `ScalarUDF` / `ScalarUDFImpl` traits (with default returning `PlaceAtRoot`) - `PhysicalExpr` trait (with default returning `PlaceAtRoot`) - Physical expression implementations for `Column`, `Literal`, and `ScalarFunctionExpr` - Implement `placement()` for `GetFieldFunc` that returns `PlaceAtLeaves` when accessing struct fields with literal keys - Replace `is_expr_trivial()` function checks with `placement()` checks in: - `datafusion/optimizer/src/optimize_projections/mod.rs` - `datafusion/physical-plan/src/projection.rs` ## Test Plan - [x] `cargo check` passes on all affected packages - [x] `cargo test -p datafusion-optimizer` passes - [x] `cargo test -p datafusion-physical-plan` passes (except unrelated zstd feature test) - [x] `cargo test -p datafusion-functions --lib getfield` passes 🤖 Generated with [Claude Code](https://claude.ai/code) --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 66ee0af commit 9962911

16 files changed

Lines changed: 307 additions & 39 deletions

File tree

datafusion/expr-common/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,10 @@ pub mod dyn_eq;
4040
pub mod groups_accumulator;
4141
pub mod interval_arithmetic;
4242
pub mod operator;
43+
pub mod placement;
4344
pub mod signature;
4445
pub mod sort_properties;
4546
pub mod statistics;
4647
pub mod type_coercion;
48+
49+
pub use placement::ExpressionPlacement;
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Expression placement information for optimization decisions.
19+
20+
/// Describes where an expression should be placed in the query plan for
21+
/// optimal execution. This is used by optimizers to make decisions about
22+
/// expression placement, such as whether to push expressions down through
23+
/// projections.
24+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
25+
pub enum ExpressionPlacement {
26+
/// A constant literal value.
27+
Literal,
28+
/// A simple column reference.
29+
Column,
30+
/// A cheap expression that can be pushed to leaf nodes in the plan.
31+
/// Examples include `get_field` for struct field access.
32+
/// Pushing these expressions down in the plan can reduce data early
33+
/// at low compute cost.
34+
/// See [`ExpressionPlacement::should_push_to_leaves`] for details.
35+
MoveTowardsLeafNodes,
36+
/// An expensive expression that should stay where it is in the plan.
37+
/// Examples include complex scalar functions or UDFs.
38+
KeepInPlace,
39+
}
40+
41+
impl ExpressionPlacement {
42+
/// Returns true if the expression can be pushed down to leaf nodes
43+
/// in the query plan.
44+
///
45+
/// This returns true for:
46+
/// - [`ExpressionPlacement::Column`]: Simple column references can be pushed down. They do no compute and do not increase or
47+
/// decrease the amount of data being processed.
48+
/// A projection that reduces the number of columns can eliminate unnecessary data early,
49+
/// but this method only considers one expression at a time, not a projection as a whole.
50+
/// - [`ExpressionPlacement::MoveTowardsLeafNodes`]: Cheap expressions can be pushed down to leaves to take advantage of
51+
/// early computation and potential optimizations at the data source level.
52+
/// For example `struct_col['field']` is cheap to compute (just an Arc clone of the nested array for `'field'`)
53+
/// and thus can reduce data early in the plan at very low compute cost.
54+
/// It may even be possible to eliminate the expression entirely if the data source can project only the needed field
55+
/// (as e.g. Parquet can).
56+
pub fn should_push_to_leaves(&self) -> bool {
57+
matches!(
58+
self,
59+
ExpressionPlacement::Column | ExpressionPlacement::MoveTowardsLeafNodes
60+
)
61+
}
62+
}

datafusion/expr/src/expr.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use datafusion_common::tree_node::{
3838
use datafusion_common::{
3939
Column, DFSchema, HashMap, Result, ScalarValue, Spans, TableReference,
4040
};
41+
use datafusion_expr_common::placement::ExpressionPlacement;
4142
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
4243
#[cfg(feature = "sql")]
4344
use sqlparser::ast::{
@@ -1536,6 +1537,23 @@ impl Expr {
15361537
}
15371538
}
15381539

1540+
/// Returns placement information for this expression.
1541+
///
1542+
/// This is used by optimizers to make decisions about expression placement,
1543+
/// such as whether to push expressions down through projections.
1544+
pub fn placement(&self) -> ExpressionPlacement {
1545+
match self {
1546+
Expr::Column(_) => ExpressionPlacement::Column,
1547+
Expr::Literal(_, _) => ExpressionPlacement::Literal,
1548+
Expr::ScalarFunction(func) => {
1549+
let arg_placements: Vec<_> =
1550+
func.args.iter().map(|arg| arg.placement()).collect();
1551+
func.func.placement(&arg_placements)
1552+
}
1553+
_ => ExpressionPlacement::KeepInPlace,
1554+
}
1555+
}
1556+
15391557
/// Return String representation of the variant represented by `self`
15401558
/// Useful for non-rust based bindings
15411559
pub fn variant_name(&self) -> &str {

datafusion/expr/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ pub use datafusion_expr_common::accumulator::Accumulator;
9595
pub use datafusion_expr_common::columnar_value::ColumnarValue;
9696
pub use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};
9797
pub use datafusion_expr_common::operator::Operator;
98+
pub use datafusion_expr_common::placement::ExpressionPlacement;
9899
pub use datafusion_expr_common::signature::{
99100
ArrayFunctionArgument, ArrayFunctionSignature, Coercion, Signature,
100101
TIMEZONE_WILDCARD, TypeSignature, TypeSignatureClass, Volatility,

datafusion/expr/src/udf.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use datafusion_common::config::ConfigOptions;
3131
use datafusion_common::{ExprSchema, Result, ScalarValue, not_impl_err};
3232
use datafusion_expr_common::dyn_eq::{DynEq, DynHash};
3333
use datafusion_expr_common::interval_arithmetic::Interval;
34+
use datafusion_expr_common::placement::ExpressionPlacement;
3435
use std::any::Any;
3536
use std::cmp::Ordering;
3637
use std::fmt::Debug;
@@ -361,6 +362,13 @@ impl ScalarUDF {
361362
pub fn as_async(&self) -> Option<&AsyncScalarUDF> {
362363
self.inner().as_any().downcast_ref::<AsyncScalarUDF>()
363364
}
365+
366+
/// Returns placement information for this function.
367+
///
368+
/// See [`ScalarUDFImpl::placement`] for more details.
369+
pub fn placement(&self, args: &[ExpressionPlacement]) -> ExpressionPlacement {
370+
self.inner.placement(args)
371+
}
364372
}
365373

366374
impl<F> From<F> for ScalarUDF
@@ -964,6 +972,20 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync {
964972
fn documentation(&self) -> Option<&Documentation> {
965973
None
966974
}
975+
976+
/// Returns placement information for this function.
977+
///
978+
/// This is used by optimizers to make decisions about expression placement,
979+
/// such as whether to push expressions down through projections.
980+
///
981+
/// The default implementation returns [`ExpressionPlacement::KeepInPlace`],
982+
/// meaning the expression should be kept where it is in the plan.
983+
///
984+
/// Override this method to indicate that the function can be pushed down
985+
/// closer to the data source.
986+
fn placement(&self, _args: &[ExpressionPlacement]) -> ExpressionPlacement {
987+
ExpressionPlacement::KeepInPlace
988+
}
967989
}
968990

969991
/// ScalarUDF that adds an alias to the underlying function. It is better to
@@ -1091,6 +1113,10 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl {
10911113
fn documentation(&self) -> Option<&Documentation> {
10921114
self.inner.documentation()
10931115
}
1116+
1117+
fn placement(&self, args: &[ExpressionPlacement]) -> ExpressionPlacement {
1118+
self.inner.placement(args)
1119+
}
10941120
}
10951121

10961122
#[cfg(test)]

datafusion/functions/src/core/getfield.rs

Lines changed: 116 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ use datafusion_common::{
3333
use datafusion_expr::expr::ScalarFunction;
3434
use datafusion_expr::simplify::ExprSimplifyResult;
3535
use datafusion_expr::{
36-
ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF,
37-
ScalarUDFImpl, Signature, Volatility,
36+
ColumnarValue, Documentation, Expr, ExpressionPlacement, ReturnFieldArgs,
37+
ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
3838
};
3939
use datafusion_macros::user_doc;
4040

@@ -499,6 +499,32 @@ impl ScalarUDFImpl for GetFieldFunc {
499499
fn documentation(&self) -> Option<&Documentation> {
500500
self.doc()
501501
}
502+
503+
fn placement(&self, args: &[ExpressionPlacement]) -> ExpressionPlacement {
504+
// get_field can be pushed to leaves if:
505+
// 1. The base (first arg) is a column or already placeable at leaves
506+
// 2. All field keys (remaining args) are literals
507+
if args.is_empty() {
508+
return ExpressionPlacement::KeepInPlace;
509+
}
510+
511+
let base_placement = args[0];
512+
let base_is_pushable = matches!(
513+
base_placement,
514+
ExpressionPlacement::Column | ExpressionPlacement::MoveTowardsLeafNodes
515+
);
516+
517+
let all_keys_are_literals = args
518+
.iter()
519+
.skip(1)
520+
.all(|p| matches!(p, ExpressionPlacement::Literal));
521+
522+
if base_is_pushable && all_keys_are_literals {
523+
ExpressionPlacement::MoveTowardsLeafNodes
524+
} else {
525+
ExpressionPlacement::KeepInPlace
526+
}
527+
}
502528
}
503529

504530
#[cfg(test)]
@@ -542,4 +568,92 @@ mod tests {
542568

543569
Ok(())
544570
}
571+
572+
#[test]
573+
fn test_placement_literal_key() {
574+
let func = GetFieldFunc::new();
575+
576+
// get_field(col, 'literal') -> leaf-pushable (static field access)
577+
let args = vec![ExpressionPlacement::Column, ExpressionPlacement::Literal];
578+
assert_eq!(
579+
func.placement(&args),
580+
ExpressionPlacement::MoveTowardsLeafNodes
581+
);
582+
583+
// get_field(col, 'a', 'b') -> leaf-pushable (nested static field access)
584+
let args = vec![
585+
ExpressionPlacement::Column,
586+
ExpressionPlacement::Literal,
587+
ExpressionPlacement::Literal,
588+
];
589+
assert_eq!(
590+
func.placement(&args),
591+
ExpressionPlacement::MoveTowardsLeafNodes
592+
);
593+
594+
// get_field(get_field(col, 'a'), 'b') represented as MoveTowardsLeafNodes for base
595+
let args = vec![
596+
ExpressionPlacement::MoveTowardsLeafNodes,
597+
ExpressionPlacement::Literal,
598+
];
599+
assert_eq!(
600+
func.placement(&args),
601+
ExpressionPlacement::MoveTowardsLeafNodes
602+
);
603+
}
604+
605+
#[test]
606+
fn test_placement_column_key() {
607+
let func = GetFieldFunc::new();
608+
609+
// get_field(col, other_col) -> NOT leaf-pushable (dynamic per-row lookup)
610+
let args = vec![ExpressionPlacement::Column, ExpressionPlacement::Column];
611+
assert_eq!(func.placement(&args), ExpressionPlacement::KeepInPlace);
612+
613+
// get_field(col, 'a', other_col) -> NOT leaf-pushable (dynamic nested lookup)
614+
let args = vec![
615+
ExpressionPlacement::Column,
616+
ExpressionPlacement::Literal,
617+
ExpressionPlacement::Column,
618+
];
619+
assert_eq!(func.placement(&args), ExpressionPlacement::KeepInPlace);
620+
}
621+
622+
#[test]
623+
fn test_placement_root() {
624+
let func = GetFieldFunc::new();
625+
626+
// get_field(root_expr, 'literal') -> NOT leaf-pushable
627+
let args = vec![
628+
ExpressionPlacement::KeepInPlace,
629+
ExpressionPlacement::Literal,
630+
];
631+
assert_eq!(func.placement(&args), ExpressionPlacement::KeepInPlace);
632+
633+
// get_field(col, root_expr) -> NOT leaf-pushable
634+
let args = vec![
635+
ExpressionPlacement::Column,
636+
ExpressionPlacement::KeepInPlace,
637+
];
638+
assert_eq!(func.placement(&args), ExpressionPlacement::KeepInPlace);
639+
}
640+
641+
#[test]
642+
fn test_placement_edge_cases() {
643+
let func = GetFieldFunc::new();
644+
645+
// Empty args -> NOT leaf-pushable
646+
assert_eq!(func.placement(&[]), ExpressionPlacement::KeepInPlace);
647+
648+
// Just base, no key -> MoveTowardsLeafNodes (not a valid call but should handle gracefully)
649+
let args = vec![ExpressionPlacement::Column];
650+
assert_eq!(
651+
func.placement(&args),
652+
ExpressionPlacement::MoveTowardsLeafNodes
653+
);
654+
655+
// Literal base with literal key -> NOT leaf-pushable (would be constant-folded)
656+
let args = vec![ExpressionPlacement::Literal, ExpressionPlacement::Literal];
657+
assert_eq!(func.placement(&args), ExpressionPlacement::KeepInPlace);
658+
}
545659
}

datafusion/optimizer/src/common_subexpr_eliminate.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -702,6 +702,11 @@ impl CSEController for ExprCSEController<'_> {
702702
#[expect(deprecated)]
703703
let is_normal_minus_aggregates = matches!(
704704
node,
705+
// TODO: there's an argument for removing `Literal` from here,
706+
// maybe using `Expr::placemement().should_push_to_leaves()` instead
707+
// so that we extract common literals and don't broadcast them to num_batch_rows multiple times.
708+
// However that currently breaks things like `percentile_cont()` which expect literal arguments
709+
// (and would instead be getting `col(__common_expr_n)`).
705710
Expr::Literal(..)
706711
| Expr::Column(..)
707712
| Expr::ScalarVariable(..)

datafusion/optimizer/src/optimize_projections/mod.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -525,15 +525,14 @@ fn merge_consecutive_projections(proj: Projection) -> Result<Transformed<Project
525525
expr.iter()
526526
.for_each(|expr| expr.add_column_ref_counts(&mut column_referral_map));
527527

528-
// If an expression is non-trivial and appears more than once, do not merge
528+
// If an expression is non-trivial (KeepInPlace) and appears more than once, do not merge
529529
// them as consecutive projections will benefit from a compute-once approach.
530530
// For details, see: https://github.com/apache/datafusion/issues/8296
531531
if column_referral_map.into_iter().any(|(col, usage)| {
532532
usage > 1
533-
&& !is_expr_trivial(
534-
&prev_projection.expr
535-
[prev_projection.schema.index_of_column(col).unwrap()],
536-
)
533+
&& !prev_projection.expr[prev_projection.schema.index_of_column(col).unwrap()]
534+
.placement()
535+
.should_push_to_leaves()
537536
}) {
538537
// no change
539538
return Projection::try_new_with_schema(expr, input, schema).map(Transformed::no);
@@ -586,11 +585,6 @@ fn merge_consecutive_projections(proj: Projection) -> Result<Transformed<Project
586585
}
587586
}
588587

589-
// Check whether `expr` is trivial; i.e. it doesn't imply any computation.
590-
fn is_expr_trivial(expr: &Expr) -> bool {
591-
matches!(expr, Expr::Column(_) | Expr::Literal(_, _))
592-
}
593-
594588
/// Rewrites a projection expression using the projection before it (i.e. its input)
595589
/// This is a subroutine to the `merge_consecutive_projections` function.
596590
///

datafusion/physical-expr-common/src/physical_expr.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use datafusion_common::{
3535
};
3636
use datafusion_expr_common::columnar_value::ColumnarValue;
3737
use datafusion_expr_common::interval_arithmetic::Interval;
38+
use datafusion_expr_common::placement::ExpressionPlacement;
3839
use datafusion_expr_common::sort_properties::ExprProperties;
3940
use datafusion_expr_common::statistics::Distribution;
4041

@@ -430,6 +431,16 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
430431
fn is_volatile_node(&self) -> bool {
431432
false
432433
}
434+
435+
/// Returns placement information for this expression.
436+
///
437+
/// This is used by optimizers to make decisions about expression placement,
438+
/// such as whether to push expressions down through projections.
439+
///
440+
/// The default implementation returns [`ExpressionPlacement::KeepInPlace`].
441+
fn placement(&self) -> ExpressionPlacement {
442+
ExpressionPlacement::KeepInPlace
443+
}
433444
}
434445

435446
#[deprecated(

datafusion/physical-expr/src/expressions/column.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use arrow::{
3030
use datafusion_common::tree_node::{Transformed, TreeNode};
3131
use datafusion_common::{Result, internal_err, plan_err};
3232
use datafusion_expr::ColumnarValue;
33+
use datafusion_expr_common::placement::ExpressionPlacement;
3334

3435
/// Represents the column at a given index in a RecordBatch
3536
///
@@ -146,6 +147,10 @@ impl PhysicalExpr for Column {
146147
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147148
write!(f, "{}", self.name)
148149
}
150+
151+
fn placement(&self) -> ExpressionPlacement {
152+
ExpressionPlacement::Column
153+
}
149154
}
150155

151156
impl Column {

0 commit comments

Comments
 (0)