Skip to content

Commit 5a28beb

Browse files
committed
fix: FilterPushdown incorrectly remaps filters through ProjectionExec with duplicate column names
1 parent 067ba4b commit 5a28beb

2 files changed

Lines changed: 162 additions & 20 deletions

File tree

datafusion/core/tests/physical_optimizer/filter_pushdown.rs

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3374,3 +3374,142 @@ fn test_filter_pushdown_through_sort_with_projection() {
33743374
"
33753375
);
33763376
}
3377+
3378+
/// Reproduces a bug where FilterPushdown through ProjectionExec with duplicate
3379+
/// column names remaps filter predicates to the wrong source column.
3380+
///
3381+
/// The bug is in two functions:
3382+
/// 1. `collect_reverse_alias` (projection.rs) uses `column_with_name()` which
3383+
/// returns the FIRST match — when two projection outputs share a name, the
3384+
/// second overwrites the first in the HashMap.
3385+
/// 2. `FilterRemapper::try_remap` (filter_pushdown.rs) uses `index_of()` which
3386+
/// also returns the FIRST match, silently rewriting column indices.
3387+
///
3388+
/// We construct the physical plan directly (bypassing the logical optimizer)
3389+
/// to create the exact structure that triggers the bug:
3390+
///
3391+
/// ```text
3392+
/// FilterExec: id@0 IS NULL ← checks output col 0 (right side's id)
3393+
/// ProjectionExec: [right_id@2 as id, ← output col 0 (from RIGHT)
3394+
/// name@1,
3395+
/// left_id@0 as id] ← output col 2 (from LEFT)
3396+
/// HashJoinExec: Left
3397+
/// left: [left_id, name] (columns 0-1)
3398+
/// right: [right_id] (column 2)
3399+
/// ```
3400+
///
3401+
/// Bug 1 overwrites Column("id", 0) → left_id@0 instead of right_id@2.
3402+
/// The filter `id@0 IS NULL` gets remapped to `left_id@0 IS NULL` (wrong side).
3403+
#[tokio::test]
3404+
async fn test_filter_pushdown_projection_duplicate_column_names() {
3405+
use datafusion_common::JoinType;
3406+
use datafusion_physical_expr::expressions::is_null;
3407+
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
3408+
3409+
// Left table: orders (5 rows, all non-NULL left_id)
3410+
let left_batches = vec![
3411+
record_batch!(
3412+
("left_id", Int32, [1, 2, 3, 4, 5]),
3413+
("name", Utf8, ["Alice", "Bob", "Charlie", "Diana", "Eve"])
3414+
)
3415+
.unwrap(),
3416+
];
3417+
let left_schema = Arc::new(Schema::new(vec![
3418+
Field::new("left_id", DataType::Int32, false),
3419+
Field::new("name", DataType::Utf8, false),
3420+
]));
3421+
let left_scan = TestScanBuilder::new(Arc::clone(&left_schema))
3422+
.with_batches(left_batches)
3423+
.build();
3424+
3425+
// Right table: returns (2 rows matching orders 1 and 3)
3426+
let right_batches = vec![record_batch!(("right_id", Int32, [1, 3])).unwrap()];
3427+
let right_schema = Arc::new(Schema::new(vec![Field::new(
3428+
"right_id",
3429+
DataType::Int32,
3430+
false,
3431+
)]));
3432+
let right_scan = TestScanBuilder::new(Arc::clone(&right_schema))
3433+
.with_batches(right_batches)
3434+
.build();
3435+
3436+
// HashJoinExec: LEFT JOIN on left_id = right_id
3437+
// Join output schema: [left_id(0), name(1), right_id(2)]
3438+
let join = Arc::new(
3439+
HashJoinExec::try_new(
3440+
left_scan,
3441+
right_scan,
3442+
vec![(
3443+
col("left_id", &left_schema).unwrap(),
3444+
col("right_id", &right_schema).unwrap(),
3445+
)],
3446+
None,
3447+
&JoinType::Left,
3448+
None,
3449+
PartitionMode::CollectLeft,
3450+
datafusion_common::NullEquality::NullEqualsNothing,
3451+
false,
3452+
)
3453+
.unwrap(),
3454+
);
3455+
let join_schema = join.schema();
3456+
3457+
// ProjectionExec: creates duplicate "id" columns
3458+
// output col 0: right_id@2 AS id (from RIGHT side, NULL for unmatched)
3459+
// output col 1: name@1
3460+
// output col 2: left_id@0 AS id (from LEFT side, never NULL)
3461+
let projection = Arc::new(
3462+
ProjectionExec::try_new(
3463+
vec![
3464+
(col("right_id", &join_schema).unwrap(), "id".to_string()),
3465+
(col("name", &join_schema).unwrap(), "name".to_string()),
3466+
(col("left_id", &join_schema).unwrap(), "id".to_string()),
3467+
],
3468+
join,
3469+
)
3470+
.unwrap(),
3471+
);
3472+
// FilterExec: id@0 IS NULL
3473+
// This should check the RIGHT side's id (output col 0 = right_id).
3474+
// The anti-join pattern: find left rows with no match on the right.
3475+
let filter_expr = is_null(Arc::new(Column::new("id", 0))).unwrap();
3476+
let plan = Arc::new(FilterExec::try_new(filter_expr, projection).unwrap())
3477+
as Arc<dyn ExecutionPlan>;
3478+
3479+
// Show the plan BEFORE optimization
3480+
println!("=== Plan BEFORE FilterPushdown ===");
3481+
println!("{}", format_plan_for_test(&plan));
3482+
3483+
// Apply the physical FilterPushdown optimizer
3484+
let config = ConfigOptions::default();
3485+
let optimized = FilterPushdown::new()
3486+
.optimize(Arc::clone(&plan), &config)
3487+
.unwrap();
3488+
3489+
println!("\n=== Plan AFTER FilterPushdown ===");
3490+
println!("{}", format_plan_for_test(&optimized));
3491+
3492+
// Execute the optimized plan. The original and optimized plans share
3493+
// DataSourceExec nodes via Arc, so we can only execute one of them
3494+
// (the data source yields its batches once).
3495+
let session_ctx = SessionContext::new_with_config(SessionConfig::new());
3496+
session_ctx.register_object_store(
3497+
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
3498+
Arc::new(InMemory::new()),
3499+
);
3500+
let task_ctx = session_ctx.state().task_ctx();
3501+
3502+
let optimized_batches = collect(optimized, Arc::clone(&task_ctx)).await.unwrap();
3503+
let optimized_count: usize = optimized_batches.iter().map(|b| b.num_rows()).sum();
3504+
println!("\n=== Optimized results: {optimized_count} rows ===");
3505+
println!("{}", pretty_format_batches(&optimized_batches).unwrap());
3506+
3507+
// The filter id@0 IS NULL checks the right side's id (NULL for unmatched
3508+
// rows in a Left join). Orders 2, 4, 5 have no match → 3 rows expected.
3509+
//
3510+
// Before the fix, collect_reverse_alias used column_with_name() which
3511+
// overwrote duplicate entries, and FilterRemapper::try_remap used
3512+
// index_of() which returned the first match — both causing the filter
3513+
// to be remapped to the wrong source column (0 rows returned).
3514+
assert_eq!(optimized_count, 3, "optimized plan should return 3 rows");
3515+
}

datafusion/physical-plan/src/projection.rs

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use crate::column_rewriter::PhysicalColumnRewriter;
3030
use crate::execution_plan::CardinalityEffect;
3131
use crate::filter_pushdown::{
3232
ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
33-
FilterPushdownPropagation, FilterRemapper, PushedDownPredicate,
33+
FilterPushdownPropagation, PushedDownPredicate,
3434
};
3535
use crate::joins::utils::{ColumnIndex, JoinFilter, JoinOn, JoinOnRef};
3636
use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr, check_if_same_properties};
@@ -45,11 +45,12 @@ use datafusion_common::config::ConfigOptions;
4545
use datafusion_common::tree_node::{
4646
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
4747
};
48-
use datafusion_common::{DataFusionError, JoinSide, Result, internal_err};
48+
use datafusion_common::{JoinSide, Result, internal_err};
4949
use datafusion_execution::TaskContext;
5050
use datafusion_expr::ExpressionPlacement;
5151
use datafusion_physical_expr::equivalence::ProjectionMapping;
5252
use datafusion_physical_expr::projection::Projector;
53+
use datafusion_physical_expr::utils::collect_columns;
5354
use datafusion_physical_expr_common::physical_expr::{PhysicalExprRef, fmt_sql};
5455
use datafusion_physical_expr_common::sort_expr::{
5556
LexOrdering, LexRequirement, PhysicalSortExpr,
@@ -205,18 +206,14 @@ impl ProjectionExec {
205206
&self,
206207
) -> Result<datafusion_common::HashMap<Column, Arc<dyn PhysicalExpr>>> {
207208
let mut alias_map = datafusion_common::HashMap::new();
208-
for projection in self.projection_expr().iter() {
209-
let (aliased_index, _output_field) = self
210-
.projector
211-
.output_schema()
212-
.column_with_name(&projection.alias)
213-
.ok_or_else(|| {
214-
DataFusionError::Internal(format!(
215-
"Expr {} with alias {} not found in output schema",
216-
projection.expr, projection.alias
217-
))
218-
})?;
219-
let aliased_col = Column::new(&projection.alias, aliased_index);
209+
// Use the enumerate index directly rather than `column_with_name`,
210+
// because the output schema columns are ordered identically to the
211+
// projection expressions. `column_with_name` returns the *first*
212+
// column with a given name, which silently produces duplicate HashMap
213+
// keys (and overwrites earlier entries) when the projection contains
214+
// same-named columns from different join sides.
215+
for (idx, projection) in self.projection_expr().iter().enumerate() {
216+
let aliased_col = Column::new(&projection.alias, idx);
220217
alias_map.insert(aliased_col, Arc::clone(&projection.expr));
221218
}
222219
Ok(alias_map)
@@ -399,16 +396,22 @@ impl ExecutionPlan for ProjectionExec {
399396
) -> Result<FilterDescription> {
400397
// expand alias column to original expr in parent filters
401398
let invert_alias_map = self.collect_reverse_alias()?;
402-
let output_schema = self.schema();
403-
let remapper = FilterRemapper::new(output_schema);
404399
let mut child_parent_filters = Vec::with_capacity(parent_filters.len());
405400

406401
for filter in parent_filters {
407-
// Check that column exists in child, then reassign column indices to match child schema
408-
if let Some(reassigned) = remapper.try_remap(&filter)? {
409-
// rewrite filter expression using invert alias map
402+
// Validate that every column referenced by the filter exists in
403+
// the reverse-alias map (keyed by exact (name, index) pair).
404+
// We must NOT use FilterRemapper::try_remap here because its
405+
// index_of lookup returns the *first* column with a given name,
406+
// which silently re-targets the filter to the wrong column when
407+
// the projection output contains duplicate column names (e.g.
408+
// after a join where both sides have an `id` column).
409+
let columns = collect_columns(&filter);
410+
let all_in_alias_map =
411+
columns.iter().all(|col| invert_alias_map.contains_key(col));
412+
if all_in_alias_map {
410413
let mut rewriter = PhysicalColumnRewriter::new(&invert_alias_map);
411-
let rewritten = reassigned.rewrite(&mut rewriter)?.data;
414+
let rewritten = filter.rewrite(&mut rewriter)?.data;
412415
child_parent_filters.push(PushedDownPredicate::supported(rewritten));
413416
} else {
414417
child_parent_filters.push(PushedDownPredicate::unsupported(filter));

0 commit comments

Comments
 (0)