1818//! Planner for [`LogicalPlan`] to [`ExecutionPlan`]
1919
2020use std:: borrow:: Cow ;
21- use std:: collections:: HashMap ;
21+ use std:: collections:: { HashMap , HashSet } ;
2222use std:: sync:: Arc ;
2323
2424use crate :: datasource:: file_format:: file_type_to_format;
@@ -84,7 +84,7 @@ use datafusion_expr::expr::{
8484} ;
8585use datafusion_expr:: expr_rewriter:: unnormalize_cols;
8686use datafusion_expr:: logical_plan:: builder:: wrap_projection_for_join_if_necessary;
87- use datafusion_expr:: utils:: split_conjunction;
87+ use datafusion_expr:: utils:: { expr_to_columns , split_conjunction} ;
8888use datafusion_expr:: {
8989 Analyze , BinaryExpr , DescribeTable , DmlStatement , Explain , ExplainFormat , Extension ,
9090 FetchType , Filter , JoinType , Operator , RecursiveQuery , SkipType , StringifiedPlan ,
@@ -757,7 +757,7 @@ impl DefaultPhysicalPlanner {
757757 if let Some ( provider) =
758758 target. as_any ( ) . downcast_ref :: < DefaultTableSource > ( )
759759 {
760- let filters = extract_dml_filters ( input) ?;
760+ let filters = extract_dml_filters ( input, table_name ) ?;
761761 provider
762762 . table_provider
763763 . delete_from ( session_state, filters)
@@ -783,7 +783,7 @@ impl DefaultPhysicalPlanner {
783783 {
784784 // For UPDATE, the assignments are encoded in the projection of input
785785 // We pass the filters and let the provider handle the projection
786- let filters = extract_dml_filters ( input) ?;
786+ let filters = extract_dml_filters ( input, table_name ) ?;
787787 // Extract assignments from the projection in input plan
788788 let assignments = extract_update_assignments ( input) ?;
789789 provider
@@ -2067,24 +2067,149 @@ fn get_physical_expr_pair(
20672067}
20682068
20692069/// Extract filter predicates from a DML input plan (DELETE/UPDATE).
2070- /// Walks the logical plan tree and collects Filter predicates,
2071- /// splitting AND conjunctions into individual expressions.
2072- /// Column qualifiers are stripped so expressions can be evaluated against
2073- /// the TableProvider's schema.
20742070///
2075- fn extract_dml_filters ( input : & Arc < LogicalPlan > ) -> Result < Vec < Expr > > {
2071+ /// Walks the logical plan tree and collects Filter predicates and any filters
2072+ /// pushed down into TableScan nodes, splitting AND conjunctions into individual expressions.
2073+ ///
2074+ /// For UPDATE...FROM queries involving multiple tables, this function only extracts predicates
2075+ /// that reference the target table. Filters from source table scans are excluded to prevent
2076+ /// incorrect filter semantics.
2077+ ///
2078+ /// Column qualifiers are stripped so expressions can be evaluated against the TableProvider's
2079+ /// schema. Deduplication is performed because filters may appear in both Filter nodes and
2080+ /// TableScan.filters when the optimizer performs partial (Inexact) filter pushdown.
2081+ ///
2082+ /// # Parameters
2083+ /// - `input`: The logical plan tree to extract filters from (typically a DELETE or UPDATE plan)
2084+ /// - `target`: The target table reference to scope filter extraction (prevents multi-table filter leakage)
2085+ ///
2086+ /// # Returns
2087+ /// A vector of unqualified filter expressions that can be passed to the TableProvider for execution.
2088+ /// Returns an empty vector if no applicable filters are found.
2089+ ///
2090+ fn extract_dml_filters (
2091+ input : & Arc < LogicalPlan > ,
2092+ target : & TableReference ,
2093+ ) -> Result < Vec < Expr > > {
20762094 let mut filters = Vec :: new ( ) ;
2095+ let mut allowed_refs = vec ! [ target. clone( ) ] ;
2096+
2097+ // First pass: collect any alias references to the target table
2098+ input. apply ( |node| {
2099+ if let LogicalPlan :: SubqueryAlias ( alias) = node
2100+ // Check if this alias points to the target table
2101+ && let LogicalPlan :: TableScan ( scan) = alias. input . as_ref ( )
2102+ && scan. table_name . resolved_eq ( target)
2103+ {
2104+ allowed_refs. push ( TableReference :: bare ( alias. alias . to_string ( ) ) ) ;
2105+ }
2106+ Ok ( TreeNodeRecursion :: Continue )
2107+ } ) ?;
20772108
20782109 input. apply ( |node| {
2079- if let LogicalPlan :: Filter ( filter) = node {
2080- // Split AND predicates into individual expressions
2081- filters. extend ( split_conjunction ( & filter. predicate ) . into_iter ( ) . cloned ( ) ) ;
2110+ match node {
2111+ LogicalPlan :: Filter ( filter) => {
2112+ // Split AND predicates into individual expressions
2113+ for predicate in split_conjunction ( & filter. predicate ) {
2114+ if predicate_is_on_target_multi ( predicate, & allowed_refs) ? {
2115+ filters. push ( predicate. clone ( ) ) ;
2116+ }
2117+ }
2118+ }
2119+ LogicalPlan :: TableScan ( TableScan {
2120+ table_name,
2121+ filters : scan_filters,
2122+ ..
2123+ } ) => {
2124+ // Only extract filters from the target table scan.
2125+ // This prevents incorrect filter extraction in UPDATE...FROM scenarios
2126+ // where multiple table scans may have filters.
2127+ if table_name. resolved_eq ( target) {
2128+ for filter in scan_filters {
2129+ filters. extend ( split_conjunction ( filter) . into_iter ( ) . cloned ( ) ) ;
2130+ }
2131+ }
2132+ }
2133+ // Plans without filter information
2134+ LogicalPlan :: EmptyRelation ( _)
2135+ | LogicalPlan :: Values ( _)
2136+ | LogicalPlan :: DescribeTable ( _)
2137+ | LogicalPlan :: Explain ( _)
2138+ | LogicalPlan :: Analyze ( _)
2139+ | LogicalPlan :: Distinct ( _)
2140+ | LogicalPlan :: Extension ( _)
2141+ | LogicalPlan :: Statement ( _)
2142+ | LogicalPlan :: Dml ( _)
2143+ | LogicalPlan :: Ddl ( _)
2144+ | LogicalPlan :: Copy ( _)
2145+ | LogicalPlan :: Unnest ( _)
2146+ | LogicalPlan :: RecursiveQuery ( _) => {
2147+ // No filters to extract from leaf/meta plans
2148+ }
2149+ // Plans with inputs (may contain filters in children)
2150+ LogicalPlan :: Projection ( _)
2151+ | LogicalPlan :: SubqueryAlias ( _)
2152+ | LogicalPlan :: Limit ( _)
2153+ | LogicalPlan :: Sort ( _)
2154+ | LogicalPlan :: Union ( _)
2155+ | LogicalPlan :: Join ( _)
2156+ | LogicalPlan :: Repartition ( _)
2157+ | LogicalPlan :: Aggregate ( _)
2158+ | LogicalPlan :: Window ( _)
2159+ | LogicalPlan :: Subquery ( _) => {
2160+ // Filter information may appear in child nodes; continue traversal
2161+ // to extract filters from Filter/TableScan nodes deeper in the plan
2162+ }
20822163 }
20832164 Ok ( TreeNodeRecursion :: Continue )
20842165 } ) ?;
20852166
2086- // Strip table qualifiers from column references
2087- filters. into_iter ( ) . map ( strip_column_qualifiers) . collect ( )
2167+ // Strip qualifiers and deduplicate. This ensures:
2168+ // 1. Only target-table predicates are retained from Filter nodes
2169+ // 2. Qualifiers stripped for TableProvider compatibility
2170+ // 3. Duplicates removed (from Filter nodes + TableScan.filters)
2171+ //
2172+ // Deduplication is necessary because filters may appear in both Filter nodes
2173+ // and TableScan.filters when the optimizer performs partial (Inexact) pushdown.
2174+ let mut seen_filters = HashSet :: new ( ) ;
2175+ filters
2176+ . into_iter ( )
2177+ . try_fold ( Vec :: new ( ) , |mut deduped, filter| {
2178+ let unqualified = strip_column_qualifiers ( filter) . map_err ( |e| {
2179+ e. context ( format ! (
2180+ "Failed to strip column qualifiers for DML filter on table '{target}'"
2181+ ) )
2182+ } ) ?;
2183+ if seen_filters. insert ( unqualified. clone ( ) ) {
2184+ deduped. push ( unqualified) ;
2185+ }
2186+ Ok ( deduped)
2187+ } )
2188+ }
2189+
2190+ /// Determine whether a predicate references only columns from the target table
2191+ /// or its aliases.
2192+ ///
2193+ /// Columns may be qualified with the target table name or any of its aliases.
2194+ /// Unqualified columns are also accepted as they implicitly belong to the target table.
2195+ fn predicate_is_on_target_multi (
2196+ expr : & Expr ,
2197+ allowed_refs : & [ TableReference ] ,
2198+ ) -> Result < bool > {
2199+ let mut columns = HashSet :: new ( ) ;
2200+ expr_to_columns ( expr, & mut columns) ?;
2201+
2202+ // Short-circuit on first mismatch: returns false if any column references a table not in allowed_refs.
2203+ // Columns are accepted if:
2204+ // 1. They are unqualified (no relation specified), OR
2205+ // 2. Their relation matches one of the allowed table references using resolved equality
2206+ Ok ( !columns. iter ( ) . any ( |column| {
2207+ column. relation . as_ref ( ) . is_some_and ( |relation| {
2208+ !allowed_refs
2209+ . iter ( )
2210+ . any ( |allowed| relation. resolved_eq ( allowed) )
2211+ } )
2212+ } ) )
20882213}
20892214
20902215/// Strip table qualifiers from column references in an expression.
0 commit comments