@@ -21,10 +21,11 @@ pub use guarantee::{Guarantee, LiteralGuarantee};
2121use std:: borrow:: Borrow ;
2222use std:: sync:: Arc ;
2323
24- use crate :: PhysicalExpr ;
25- use crate :: PhysicalSortExpr ;
26- use crate :: expressions:: { BinaryExpr , Column } ;
24+ use crate :: expressions:: { BinaryExpr , Column , Literal } ;
2725use crate :: tree_node:: ExprContext ;
26+ use crate :: {
27+ AcrossPartitions , ConstExpr , EquivalenceProperties , PhysicalExpr , PhysicalSortExpr ,
28+ } ;
2829
2930use arrow:: datatypes:: Schema ;
3031use datafusion_common:: tree_node:: {
@@ -45,6 +46,59 @@ pub fn split_conjunction(
4546 split_impl ( Operator :: And , predicate, vec ! [ ] )
4647}
4748
49+ /// Collects predicate-derived constants from equality conjunctions.
50+ ///
51+ /// For each equality predicate of the form `lhs = rhs`, if either side is
52+ /// already known constant according to `input_eqs`, or is a literal, then the
53+ /// other side is also constant and will be returned as a [`ConstExpr`].
54+ ///
55+ /// Literals are treated as uniform constants across partitions, so
56+ /// `col = literal` produces a constant for `col` with the literal value.
57+ pub fn collect_predicate_constants (
58+ input_eqs : & EquivalenceProperties ,
59+ predicate : & Arc < dyn PhysicalExpr > ,
60+ ) -> Vec < ConstExpr > {
61+ /// Returns the `AcrossPartitions` value for `expr` if it is constant:
62+ /// either already known constant in `input_eqs`, or a `Literal`
63+ /// (which is inherently constant across all partitions).
64+ fn expr_constant_or_literal (
65+ expr : & Arc < dyn PhysicalExpr > ,
66+ input_eqs : & EquivalenceProperties ,
67+ ) -> Option < AcrossPartitions > {
68+ input_eqs. is_expr_constant ( expr) . or_else ( || {
69+ expr. as_any ( )
70+ . downcast_ref :: < Literal > ( )
71+ . map ( |l| AcrossPartitions :: Uniform ( Some ( l. value ( ) . clone ( ) ) ) )
72+ } )
73+ }
74+
75+ let mut constants = Vec :: new ( ) ;
76+ for conjunction in split_conjunction ( predicate) {
77+ if let Some ( binary) = conjunction. as_any ( ) . downcast_ref :: < BinaryExpr > ( )
78+ && binary. op ( ) == & Operator :: Eq
79+ {
80+ // Check if either side is constant — either already known
81+ // constant from the input equivalence properties, or a literal
82+ // value (which is inherently constant across all partitions).
83+ let left_const = expr_constant_or_literal ( binary. left ( ) , input_eqs) ;
84+ let right_const = expr_constant_or_literal ( binary. right ( ) , input_eqs) ;
85+
86+ if let Some ( left_across) = left_const {
87+ // LEFT is constant, so RIGHT must also be constant.
88+ // Use RIGHT's known across value if available, otherwise
89+ // propagate LEFT's (e.g. Uniform from a literal).
90+ let across = right_const. unwrap_or ( left_across) ;
91+ constants. push ( ConstExpr :: new ( Arc :: clone ( binary. right ( ) ) , across) ) ;
92+ } else if let Some ( right_across) = right_const {
93+ // RIGHT is constant, so LEFT must also be constant.
94+ constants. push ( ConstExpr :: new ( Arc :: clone ( binary. left ( ) ) , right_across) ) ;
95+ }
96+ }
97+ }
98+
99+ constants
100+ }
101+
48102/// Create a conjunction of the given predicates.
49103/// If the input is empty, return a literal true.
50104/// If the input contains a single predicate, return the predicate.
@@ -562,4 +616,30 @@ pub(crate) mod tests {
562616 assert_eq ! ( collect_columns( & expr3) , expected) ;
563617 Ok ( ( ) )
564618 }
619+
620+ #[ test]
621+ fn test_collect_predicate_constants_propagates_uniform_literal_value ( ) -> Result < ( ) > {
622+ let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new(
623+ "ticker" ,
624+ DataType :: Utf8 ,
625+ false ,
626+ ) ] ) ) ;
627+ let predicate = binary (
628+ col ( "ticker" , schema. as_ref ( ) ) ?,
629+ Operator :: Eq ,
630+ lit ( ScalarValue :: Utf8 ( Some ( "NGJ26" . to_string ( ) ) ) ) ,
631+ schema. as_ref ( ) ,
632+ ) ?;
633+ let eq_properties = EquivalenceProperties :: new ( schema) ;
634+
635+ let constants = collect_predicate_constants ( & eq_properties, & predicate) ;
636+
637+ assert_eq ! ( constants. len( ) , 1 ) ;
638+ assert_eq ! (
639+ constants[ 0 ] . across_partitions,
640+ AcrossPartitions :: Uniform ( Some ( ScalarValue :: Utf8 ( Some ( "NGJ26" . to_string( ) ) ) ) )
641+ ) ;
642+
643+ Ok ( ( ) )
644+ }
565645}
0 commit comments