Skip to content

Commit b3976d6

Browse files
authored
Fix DELETE/UPDATE filter extraction when predicates are pushed down into TableScan (#19884)
## Which issue does this PR close? * Closes #19840. ## Rationale for this change Custom `TableProvider` implementations (notably those that support filter pushdown) expect the DML engine to pass `WHERE` predicates into `TableProvider::delete_from` / `TableProvider::update` so they can apply targeted changes. However, when the optimizer pushes predicates down into `TableScan.filters`, the logical plan may no longer contain a top-level `Filter` node. The existing DML planning code only collected predicates from `Filter` nodes, so `delete_from` / `update` could receive an empty filter list and unintentionally apply the operation to all rows. This PR fixes filter extraction to also consider pushdown predicates stored in `TableScan.filters`, while avoiding incorrect behavior in multi-table contexts. ## What changes are included in this PR? * Extend DML predicate extraction to collect filters from both: * `LogicalPlan::Filter` nodes (splitting `AND` conjunctions), and * `LogicalPlan::TableScan.filters` for the **target** table. * Add target scoping for filter extraction: * Determine the set of allowed table references for the target table (including simple aliases via `SubqueryAlias`). * For `Filter` nodes, retain only predicates whose column references belong to the target table (or are unqualified). * For `TableScan.filters`, only read filters from the target table scan to prevent accidentally pulling predicates from source tables. * Strip column qualifiers before passing expressions to the provider so filters evaluate against the provider schema. * Deduplicate extracted predicates (needed when partial/inexact pushdown causes the same predicate to appear in both a `Filter` node and `TableScan.filters`). * Add comprehensive tests for DELETE/UPDATE covering: * exact pushdown (filters live in `TableScan.filters`), * compound predicates, * mixed locations due to `Inexact` pushdown and per-filter decisions, * qualifier stripping behavior. * Temporarily reject `UPDATE ... FROM` during planning with a clear `not_impl` error and align sqllogictest expectations. * This avoids incorrect semantics caused by qualifier stripping / predicate selection in multi-table update scenarios. ## Are these changes tested? Yes. * Added new unit tests in `datafusion/core/tests/custom_sources_cases/dml_planning.rs` that validate: * pushed-down filters are extracted from `TableScan` and passed through to providers, * compound predicates are preserved (not over-deduped), * mixed pushdown/residual cases still deliver the full predicate set, * qualifiers are stripped from passed filters. * Updated sqllogictest coverage for `UPDATE ... FROM` to assert the feature is currently not implemented. ## Are there any user-facing changes? * **Bug fix:** `DELETE ... WHERE ...` and `UPDATE ... WHERE ...` against custom `TableProvider`s that use filter pushdown will now correctly pass predicates to the provider, preventing accidental full-table operations. * **Behavior change (planning error message / support level):** `UPDATE ... FROM` now consistently returns `This feature is not implemented: UPDATE ... FROM is not supported` (until the underlying issue is fixed). ## LLM-generated code disclosure This PR includes LLM-generated code and comments. All LLM-generated content has been manually reviewed and tested.
1 parent dd988f6 commit b3976d6

4 files changed

Lines changed: 610 additions & 43 deletions

File tree

datafusion/core/src/physical_planner.rs

Lines changed: 139 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
//! Planner for [`LogicalPlan`] to [`ExecutionPlan`]
1919
2020
use std::borrow::Cow;
21-
use std::collections::HashMap;
21+
use std::collections::{HashMap, HashSet};
2222
use std::sync::Arc;
2323

2424
use crate::datasource::file_format::file_type_to_format;
@@ -84,7 +84,7 @@ use datafusion_expr::expr::{
8484
};
8585
use datafusion_expr::expr_rewriter::unnormalize_cols;
8686
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
87-
use datafusion_expr::utils::split_conjunction;
87+
use datafusion_expr::utils::{expr_to_columns, split_conjunction};
8888
use datafusion_expr::{
8989
Analyze, BinaryExpr, DescribeTable, DmlStatement, Explain, ExplainFormat, Extension,
9090
FetchType, Filter, JoinType, Operator, RecursiveQuery, SkipType, StringifiedPlan,
@@ -757,7 +757,7 @@ impl DefaultPhysicalPlanner {
757757
if let Some(provider) =
758758
target.as_any().downcast_ref::<DefaultTableSource>()
759759
{
760-
let filters = extract_dml_filters(input)?;
760+
let filters = extract_dml_filters(input, table_name)?;
761761
provider
762762
.table_provider
763763
.delete_from(session_state, filters)
@@ -783,7 +783,7 @@ impl DefaultPhysicalPlanner {
783783
{
784784
// For UPDATE, the assignments are encoded in the projection of input
785785
// We pass the filters and let the provider handle the projection
786-
let filters = extract_dml_filters(input)?;
786+
let filters = extract_dml_filters(input, table_name)?;
787787
// Extract assignments from the projection in input plan
788788
let assignments = extract_update_assignments(input)?;
789789
provider
@@ -2067,24 +2067,149 @@ fn get_physical_expr_pair(
20672067
}
20682068

20692069
/// Extract filter predicates from a DML input plan (DELETE/UPDATE).
2070-
/// Walks the logical plan tree and collects Filter predicates,
2071-
/// splitting AND conjunctions into individual expressions.
2072-
/// Column qualifiers are stripped so expressions can be evaluated against
2073-
/// the TableProvider's schema.
20742070
///
2075-
fn extract_dml_filters(input: &Arc<LogicalPlan>) -> Result<Vec<Expr>> {
2071+
/// Walks the logical plan tree and collects Filter predicates and any filters
2072+
/// pushed down into TableScan nodes, splitting AND conjunctions into individual expressions.
2073+
///
2074+
/// For UPDATE...FROM queries involving multiple tables, this function only extracts predicates
2075+
/// that reference the target table. Filters from source table scans are excluded to prevent
2076+
/// incorrect filter semantics.
2077+
///
2078+
/// Column qualifiers are stripped so expressions can be evaluated against the TableProvider's
2079+
/// schema. Deduplication is performed because filters may appear in both Filter nodes and
2080+
/// TableScan.filters when the optimizer performs partial (Inexact) filter pushdown.
2081+
///
2082+
/// # Parameters
2083+
/// - `input`: The logical plan tree to extract filters from (typically a DELETE or UPDATE plan)
2084+
/// - `target`: The target table reference to scope filter extraction (prevents multi-table filter leakage)
2085+
///
2086+
/// # Returns
2087+
/// A vector of unqualified filter expressions that can be passed to the TableProvider for execution.
2088+
/// Returns an empty vector if no applicable filters are found.
2089+
///
2090+
fn extract_dml_filters(
2091+
input: &Arc<LogicalPlan>,
2092+
target: &TableReference,
2093+
) -> Result<Vec<Expr>> {
20762094
let mut filters = Vec::new();
2095+
let mut allowed_refs = vec![target.clone()];
2096+
2097+
// First pass: collect any alias references to the target table
2098+
input.apply(|node| {
2099+
if let LogicalPlan::SubqueryAlias(alias) = node
2100+
// Check if this alias points to the target table
2101+
&& let LogicalPlan::TableScan(scan) = alias.input.as_ref()
2102+
&& scan.table_name.resolved_eq(target)
2103+
{
2104+
allowed_refs.push(TableReference::bare(alias.alias.to_string()));
2105+
}
2106+
Ok(TreeNodeRecursion::Continue)
2107+
})?;
20772108

20782109
input.apply(|node| {
2079-
if let LogicalPlan::Filter(filter) = node {
2080-
// Split AND predicates into individual expressions
2081-
filters.extend(split_conjunction(&filter.predicate).into_iter().cloned());
2110+
match node {
2111+
LogicalPlan::Filter(filter) => {
2112+
// Split AND predicates into individual expressions
2113+
for predicate in split_conjunction(&filter.predicate) {
2114+
if predicate_is_on_target_multi(predicate, &allowed_refs)? {
2115+
filters.push(predicate.clone());
2116+
}
2117+
}
2118+
}
2119+
LogicalPlan::TableScan(TableScan {
2120+
table_name,
2121+
filters: scan_filters,
2122+
..
2123+
}) => {
2124+
// Only extract filters from the target table scan.
2125+
// This prevents incorrect filter extraction in UPDATE...FROM scenarios
2126+
// where multiple table scans may have filters.
2127+
if table_name.resolved_eq(target) {
2128+
for filter in scan_filters {
2129+
filters.extend(split_conjunction(filter).into_iter().cloned());
2130+
}
2131+
}
2132+
}
2133+
// Plans without filter information
2134+
LogicalPlan::EmptyRelation(_)
2135+
| LogicalPlan::Values(_)
2136+
| LogicalPlan::DescribeTable(_)
2137+
| LogicalPlan::Explain(_)
2138+
| LogicalPlan::Analyze(_)
2139+
| LogicalPlan::Distinct(_)
2140+
| LogicalPlan::Extension(_)
2141+
| LogicalPlan::Statement(_)
2142+
| LogicalPlan::Dml(_)
2143+
| LogicalPlan::Ddl(_)
2144+
| LogicalPlan::Copy(_)
2145+
| LogicalPlan::Unnest(_)
2146+
| LogicalPlan::RecursiveQuery(_) => {
2147+
// No filters to extract from leaf/meta plans
2148+
}
2149+
// Plans with inputs (may contain filters in children)
2150+
LogicalPlan::Projection(_)
2151+
| LogicalPlan::SubqueryAlias(_)
2152+
| LogicalPlan::Limit(_)
2153+
| LogicalPlan::Sort(_)
2154+
| LogicalPlan::Union(_)
2155+
| LogicalPlan::Join(_)
2156+
| LogicalPlan::Repartition(_)
2157+
| LogicalPlan::Aggregate(_)
2158+
| LogicalPlan::Window(_)
2159+
| LogicalPlan::Subquery(_) => {
2160+
// Filter information may appear in child nodes; continue traversal
2161+
// to extract filters from Filter/TableScan nodes deeper in the plan
2162+
}
20822163
}
20832164
Ok(TreeNodeRecursion::Continue)
20842165
})?;
20852166

2086-
// Strip table qualifiers from column references
2087-
filters.into_iter().map(strip_column_qualifiers).collect()
2167+
// Strip qualifiers and deduplicate. This ensures:
2168+
// 1. Only target-table predicates are retained from Filter nodes
2169+
// 2. Qualifiers stripped for TableProvider compatibility
2170+
// 3. Duplicates removed (from Filter nodes + TableScan.filters)
2171+
//
2172+
// Deduplication is necessary because filters may appear in both Filter nodes
2173+
// and TableScan.filters when the optimizer performs partial (Inexact) pushdown.
2174+
let mut seen_filters = HashSet::new();
2175+
filters
2176+
.into_iter()
2177+
.try_fold(Vec::new(), |mut deduped, filter| {
2178+
let unqualified = strip_column_qualifiers(filter).map_err(|e| {
2179+
e.context(format!(
2180+
"Failed to strip column qualifiers for DML filter on table '{target}'"
2181+
))
2182+
})?;
2183+
if seen_filters.insert(unqualified.clone()) {
2184+
deduped.push(unqualified);
2185+
}
2186+
Ok(deduped)
2187+
})
2188+
}
2189+
2190+
/// Determine whether a predicate references only columns from the target table
2191+
/// or its aliases.
2192+
///
2193+
/// Columns may be qualified with the target table name or any of its aliases.
2194+
/// Unqualified columns are also accepted as they implicitly belong to the target table.
2195+
fn predicate_is_on_target_multi(
2196+
expr: &Expr,
2197+
allowed_refs: &[TableReference],
2198+
) -> Result<bool> {
2199+
let mut columns = HashSet::new();
2200+
expr_to_columns(expr, &mut columns)?;
2201+
2202+
// Short-circuit on first mismatch: returns false if any column references a table not in allowed_refs.
2203+
// Columns are accepted if:
2204+
// 1. They are unqualified (no relation specified), OR
2205+
// 2. Their relation matches one of the allowed table references using resolved equality
2206+
Ok(!columns.iter().any(|column| {
2207+
column.relation.as_ref().is_some_and(|relation| {
2208+
!allowed_refs
2209+
.iter()
2210+
.any(|allowed| relation.resolved_eq(allowed))
2211+
})
2212+
}))
20882213
}
20892214

20902215
/// Strip table qualifiers from column references in an expression.

0 commit comments

Comments
 (0)