Skip to content

Commit 6f2e428

Browse files
committed
Refactor: expose predicate constant inference from physical-expr
1 parent 4e2e4e8 commit 6f2e428

3 files changed

Lines changed: 92 additions & 56 deletions

File tree

datafusion/physical-expr/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,9 @@ pub use datafusion_physical_expr_common::sort_expr::{
7272
pub use planner::{create_physical_expr, create_physical_exprs};
7373
pub use scalar_function::ScalarFunctionExpr;
7474
pub use simplifier::PhysicalExprSimplifier;
75-
pub use utils::{conjunction, conjunction_opt, split_conjunction};
75+
pub use utils::{
76+
collect_predicate_constants, conjunction, conjunction_opt, split_conjunction,
77+
};
7678

7779
// For backwards compatibility
7880
pub mod tree_node {

datafusion/physical-expr/src/utils/mod.rs

Lines changed: 83 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@ pub use guarantee::{Guarantee, LiteralGuarantee};
2121
use std::borrow::Borrow;
2222
use std::sync::Arc;
2323

24-
use crate::PhysicalExpr;
25-
use crate::PhysicalSortExpr;
26-
use crate::expressions::{BinaryExpr, Column};
24+
use crate::expressions::{BinaryExpr, Column, Literal};
2725
use crate::tree_node::ExprContext;
26+
use crate::{
27+
AcrossPartitions, ConstExpr, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
28+
};
2829

2930
use arrow::datatypes::Schema;
3031
use datafusion_common::tree_node::{
@@ -45,6 +46,59 @@ pub fn split_conjunction(
4546
split_impl(Operator::And, predicate, vec![])
4647
}
4748

49+
/// Collects predicate-derived constants from equality conjunctions.
50+
///
51+
/// For each equality predicate of the form `lhs = rhs`, if either side is
52+
/// already known constant according to `input_eqs`, or is a literal, then the
53+
/// other side is also constant and will be returned as a [`ConstExpr`].
54+
///
55+
/// Literals are treated as uniform constants across partitions, so
56+
/// `col = literal` produces a constant for `col` with the literal value.
57+
pub fn collect_predicate_constants(
58+
input_eqs: &EquivalenceProperties,
59+
predicate: &Arc<dyn PhysicalExpr>,
60+
) -> Vec<ConstExpr> {
61+
/// Returns the `AcrossPartitions` value for `expr` if it is constant:
62+
/// either already known constant in `input_eqs`, or a `Literal`
63+
/// (which is inherently constant across all partitions).
64+
fn expr_constant_or_literal(
65+
expr: &Arc<dyn PhysicalExpr>,
66+
input_eqs: &EquivalenceProperties,
67+
) -> Option<AcrossPartitions> {
68+
input_eqs.is_expr_constant(expr).or_else(|| {
69+
expr.as_any()
70+
.downcast_ref::<Literal>()
71+
.map(|l| AcrossPartitions::Uniform(Some(l.value().clone())))
72+
})
73+
}
74+
75+
let mut constants = Vec::new();
76+
for conjunction in split_conjunction(predicate) {
77+
if let Some(binary) = conjunction.as_any().downcast_ref::<BinaryExpr>()
78+
&& binary.op() == &Operator::Eq
79+
{
80+
// Check if either side is constant — either already known
81+
// constant from the input equivalence properties, or a literal
82+
// value (which is inherently constant across all partitions).
83+
let left_const = expr_constant_or_literal(binary.left(), input_eqs);
84+
let right_const = expr_constant_or_literal(binary.right(), input_eqs);
85+
86+
if let Some(left_across) = left_const {
87+
// LEFT is constant, so RIGHT must also be constant.
88+
// Use RIGHT's known across value if available, otherwise
89+
// propagate LEFT's (e.g. Uniform from a literal).
90+
let across = right_const.unwrap_or(left_across);
91+
constants.push(ConstExpr::new(Arc::clone(binary.right()), across));
92+
} else if let Some(right_across) = right_const {
93+
// RIGHT is constant, so LEFT must also be constant.
94+
constants.push(ConstExpr::new(Arc::clone(binary.left()), right_across));
95+
}
96+
}
97+
}
98+
99+
constants
100+
}
101+
48102
/// Create a conjunction of the given predicates.
49103
/// If the input is empty, return a literal true.
50104
/// If the input contains a single predicate, return the predicate.
@@ -562,4 +616,30 @@ pub(crate) mod tests {
562616
assert_eq!(collect_columns(&expr3), expected);
563617
Ok(())
564618
}
619+
620+
#[test]
621+
fn test_collect_predicate_constants_propagates_uniform_literal_value() -> Result<()> {
622+
let schema = Arc::new(Schema::new(vec![Field::new(
623+
"ticker",
624+
DataType::Utf8,
625+
false,
626+
)]));
627+
let predicate = binary(
628+
col("ticker", schema.as_ref())?,
629+
Operator::Eq,
630+
lit(ScalarValue::Utf8(Some("NGJ26".to_string()))),
631+
schema.as_ref(),
632+
)?;
633+
let eq_properties = EquivalenceProperties::new(schema);
634+
635+
let constants = collect_predicate_constants(&eq_properties, &predicate);
636+
637+
assert_eq!(constants.len(), 1);
638+
assert_eq!(
639+
constants[0].across_partitions,
640+
AcrossPartitions::Uniform(Some(ScalarValue::Utf8(Some("NGJ26".to_string()))))
641+
);
642+
643+
Ok(())
644+
}
565645
}

datafusion/physical-plan/src/filter.rs

Lines changed: 6 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal, lit};
6363
use datafusion_physical_expr::intervals::utils::check_support;
6464
use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
6565
use datafusion_physical_expr::{
66-
AcrossPartitions, AnalysisContext, ConstExpr, EquivalenceProperties, ExprBoundaries,
67-
PhysicalExpr, analyze, conjunction, split_conjunction,
66+
AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, analyze,
67+
collect_predicate_constants, conjunction, split_conjunction,
6868
};
6969

7070
use datafusion_physical_expr_common::physical_expr::fmt_sql;
@@ -350,55 +350,6 @@ impl FilterExec {
350350
})
351351
}
352352

353-
/// Returns the `AcrossPartitions` value for `expr` if it is constant:
354-
/// either already known constant in `input_eqs`, or a `Literal`
355-
/// (which is inherently constant across all partitions).
356-
fn expr_constant_or_literal(
357-
expr: &Arc<dyn PhysicalExpr>,
358-
input_eqs: &EquivalenceProperties,
359-
) -> Option<AcrossPartitions> {
360-
input_eqs.is_expr_constant(expr).or_else(|| {
361-
expr.as_any()
362-
.downcast_ref::<Literal>()
363-
.map(|l| AcrossPartitions::Uniform(Some(l.value().clone())))
364-
})
365-
}
366-
367-
fn extend_constants(
368-
input: &Arc<dyn ExecutionPlan>,
369-
predicate: &Arc<dyn PhysicalExpr>,
370-
) -> Vec<ConstExpr> {
371-
let mut res_constants = Vec::new();
372-
let input_eqs = input.equivalence_properties();
373-
374-
let conjunctions = split_conjunction(predicate);
375-
for conjunction in conjunctions {
376-
if let Some(binary) = conjunction.as_any().downcast_ref::<BinaryExpr>()
377-
&& binary.op() == &Operator::Eq
378-
{
379-
// Check if either side is constant — either already known
380-
// constant from the input equivalence properties, or a literal
381-
// value (which is inherently constant across all partitions).
382-
let left_const = Self::expr_constant_or_literal(binary.left(), input_eqs);
383-
let right_const =
384-
Self::expr_constant_or_literal(binary.right(), input_eqs);
385-
386-
if let Some(left_across) = left_const {
387-
// LEFT is constant, so RIGHT must also be constant.
388-
// Use RIGHT's known across value if available, otherwise
389-
// propagate LEFT's (e.g. Uniform from a literal).
390-
let across = right_const.unwrap_or(left_across);
391-
res_constants
392-
.push(ConstExpr::new(Arc::clone(binary.right()), across));
393-
} else if let Some(right_across) = right_const {
394-
// RIGHT is constant, so LEFT must also be constant.
395-
res_constants
396-
.push(ConstExpr::new(Arc::clone(binary.left()), right_across));
397-
}
398-
}
399-
}
400-
res_constants
401-
}
402353
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
403354
fn compute_properties(
404355
input: &Arc<dyn ExecutionPlan>,
@@ -436,7 +387,10 @@ impl FilterExec {
436387
eq_properties.add_constants(constants)?;
437388
// This is for logical constant (for example: a = '1', then a could be marked as a constant)
438389
// to do: how to deal with multiple situation to represent = (for example c1 between 0 and 0)
439-
eq_properties.add_constants(Self::extend_constants(input, predicate))?;
390+
eq_properties.add_constants(collect_predicate_constants(
391+
input.equivalence_properties(),
392+
predicate,
393+
))?;
440394

441395
let mut output_partitioning = input.output_partitioning().clone();
442396
// If contains projection, update the PlanProperties.

0 commit comments

Comments
 (0)