Skip to content

Commit 61f819b

Browse files
committed
fix: FilterPushdown incorrectly remaps filters through ProjectionExec with duplicate column names
collect_reverse_alias used column_with_name() which returns the first match for a given name. When a projection has duplicate column names (e.g. two "id" columns from both sides of a join), both entries get the same HashMap key and the second silently overwrites the first. gather_filters_for_pushdown used FilterRemapper::try_remap which calls index_of(), also returning the first match. A filter on id@2 (second occurrence) gets silently rewritten to id@0. Fix 1: Use enumerate index in collect_reverse_alias instead of column_with_name. Projection expressions are positionally aligned with the output schema. Fix 2: Replace FilterRemapper::try_remap with direct validation against the alias map's exact (name, index) keys via collect_columns.
1 parent 06b4791 commit 61f819b

2 files changed

Lines changed: 161 additions & 20 deletions

File tree

datafusion/core/tests/physical_optimizer/filter_pushdown.rs

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4805,3 +4805,142 @@ async fn test_hashjoin_dynamic_filter_pushdown_left_semi_join() {
48054805
"
48064806
);
48074807
}
4808+
4809+
/// Reproduces a bug where FilterPushdown through ProjectionExec with duplicate
4810+
/// column names remaps filter predicates to the wrong source column.
4811+
///
4812+
/// The bug is in two functions:
4813+
/// 1. `collect_reverse_alias` (projection.rs) uses `column_with_name()` which
4814+
/// returns the FIRST match — when two projection outputs share a name, the
4815+
/// second overwrites the first in the HashMap.
4816+
/// 2. `FilterRemapper::try_remap` (filter_pushdown.rs) uses `index_of()` which
4817+
/// also returns the FIRST match, silently rewriting column indices.
4818+
///
4819+
/// We construct the physical plan directly (bypassing the logical optimizer)
4820+
/// to create the exact structure that triggers the bug:
4821+
///
4822+
/// ```text
4823+
/// FilterExec: id@0 IS NULL ← checks output col 0 (right side's id)
4824+
/// ProjectionExec: [right_id@2 as id, ← output col 0 (from RIGHT)
4825+
/// name@1,
4826+
/// left_id@0 as id] ← output col 2 (from LEFT)
4827+
/// HashJoinExec: Left
4828+
/// left: [left_id, name] (columns 0-1)
4829+
/// right: [right_id] (column 2)
4830+
/// ```
4831+
///
4832+
/// Bug 1 overwrites Column("id", 0) → left_id@0 instead of right_id@2.
4833+
/// The filter `id@0 IS NULL` gets remapped to `left_id@0 IS NULL` (wrong side).
4834+
#[tokio::test]
4835+
async fn test_filter_pushdown_projection_duplicate_column_names() {
4836+
use datafusion_common::JoinType;
4837+
use datafusion_physical_expr::expressions::is_null;
4838+
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
4839+
4840+
// Left table: orders (5 rows, all non-NULL left_id)
4841+
let left_batches = vec![
4842+
record_batch!(
4843+
("left_id", Int32, [1, 2, 3, 4, 5]),
4844+
("name", Utf8, ["Alice", "Bob", "Charlie", "Diana", "Eve"])
4845+
)
4846+
.unwrap(),
4847+
];
4848+
let left_schema = Arc::new(Schema::new(vec![
4849+
Field::new("left_id", DataType::Int32, false),
4850+
Field::new("name", DataType::Utf8, false),
4851+
]));
4852+
let left_scan = TestScanBuilder::new(Arc::clone(&left_schema))
4853+
.with_batches(left_batches)
4854+
.build();
4855+
4856+
// Right table: returns (2 rows matching orders 1 and 3)
4857+
let right_batches = vec![record_batch!(("right_id", Int32, [1, 3])).unwrap()];
4858+
let right_schema = Arc::new(Schema::new(vec![Field::new(
4859+
"right_id",
4860+
DataType::Int32,
4861+
false,
4862+
)]));
4863+
let right_scan = TestScanBuilder::new(Arc::clone(&right_schema))
4864+
.with_batches(right_batches)
4865+
.build();
4866+
4867+
// HashJoinExec: LEFT JOIN on left_id = right_id
4868+
// Join output schema: [left_id(0), name(1), right_id(2)]
4869+
let join = Arc::new(
4870+
HashJoinExec::try_new(
4871+
left_scan,
4872+
right_scan,
4873+
vec![(
4874+
col("left_id", &left_schema).unwrap(),
4875+
col("right_id", &right_schema).unwrap(),
4876+
)],
4877+
None,
4878+
&JoinType::Left,
4879+
None,
4880+
PartitionMode::CollectLeft,
4881+
datafusion_common::NullEquality::NullEqualsNothing,
4882+
false,
4883+
)
4884+
.unwrap(),
4885+
);
4886+
let join_schema = join.schema();
4887+
4888+
// ProjectionExec: creates duplicate "id" columns
4889+
// output col 0: right_id@2 AS id (from RIGHT side, NULL for unmatched)
4890+
// output col 1: name@1
4891+
// output col 2: left_id@0 AS id (from LEFT side, never NULL)
4892+
let projection = Arc::new(
4893+
ProjectionExec::try_new(
4894+
vec![
4895+
(col("right_id", &join_schema).unwrap(), "id".to_string()),
4896+
(col("name", &join_schema).unwrap(), "name".to_string()),
4897+
(col("left_id", &join_schema).unwrap(), "id".to_string()),
4898+
],
4899+
join,
4900+
)
4901+
.unwrap(),
4902+
);
4903+
// FilterExec: id@0 IS NULL
4904+
// This should check the RIGHT side's id (output col 0 = right_id).
4905+
// The anti-join pattern: find left rows with no match on the right.
4906+
let filter_expr = is_null(Arc::new(Column::new("id", 0))).unwrap();
4907+
let plan = Arc::new(FilterExec::try_new(filter_expr, projection).unwrap())
4908+
as Arc<dyn ExecutionPlan>;
4909+
4910+
// Show the plan BEFORE optimization
4911+
println!("=== Plan BEFORE FilterPushdown ===");
4912+
println!("{}", format_plan_for_test(&plan));
4913+
4914+
// Apply the physical FilterPushdown optimizer
4915+
let config = ConfigOptions::default();
4916+
let optimized = FilterPushdown::new()
4917+
.optimize(Arc::clone(&plan), &config)
4918+
.unwrap();
4919+
4920+
println!("\n=== Plan AFTER FilterPushdown ===");
4921+
println!("{}", format_plan_for_test(&optimized));
4922+
4923+
// Execute the optimized plan. The original and optimized plans share
4924+
// DataSourceExec nodes via Arc, so we can only execute one of them
4925+
// (the data source yields its batches once).
4926+
let session_ctx = SessionContext::new_with_config(SessionConfig::new());
4927+
session_ctx.register_object_store(
4928+
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
4929+
Arc::new(InMemory::new()),
4930+
);
4931+
let task_ctx = session_ctx.state().task_ctx();
4932+
4933+
let optimized_batches = collect(optimized, Arc::clone(&task_ctx)).await.unwrap();
4934+
let optimized_count: usize = optimized_batches.iter().map(|b| b.num_rows()).sum();
4935+
println!("\n=== Optimized results: {optimized_count} rows ===");
4936+
println!("{}", pretty_format_batches(&optimized_batches).unwrap());
4937+
4938+
// The filter id@0 IS NULL checks the right side's id (NULL for unmatched
4939+
// rows in a Left join). Orders 2, 4, 5 have no match → 3 rows expected.
4940+
//
4941+
// Before the fix, collect_reverse_alias used column_with_name() which
4942+
// overwrote duplicate entries, and FilterRemapper::try_remap used
4943+
// index_of() which returned the first match — both causing the filter
4944+
// to be remapped to the wrong source column (0 rows returned).
4945+
assert_eq!(optimized_count, 3, "optimized plan should return 3 rows");
4946+
}

datafusion/physical-plan/src/projection.rs

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use crate::column_rewriter::PhysicalColumnRewriter;
3030
use crate::execution_plan::CardinalityEffect;
3131
use crate::filter_pushdown::{
3232
ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
33-
FilterPushdownPropagation, FilterRemapper, PushedDownPredicate,
33+
FilterPushdownPropagation, PushedDownPredicate,
3434
};
3535
use crate::joins::utils::{ColumnIndex, JoinFilter, JoinOn, JoinOnRef};
3636
use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr, check_if_same_properties};
@@ -46,7 +46,7 @@ use datafusion_common::config::ConfigOptions;
4646
use datafusion_common::tree_node::{
4747
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
4848
};
49-
use datafusion_common::{DataFusionError, JoinSide, Result, internal_err};
49+
use datafusion_common::{JoinSide, Result, internal_err};
5050
use datafusion_execution::TaskContext;
5151
use datafusion_expr::ExpressionPlacement;
5252
use datafusion_physical_expr::equivalence::ProjectionMapping;
@@ -207,18 +207,14 @@ impl ProjectionExec {
207207
&self,
208208
) -> Result<datafusion_common::HashMap<Column, Arc<dyn PhysicalExpr>>> {
209209
let mut alias_map = datafusion_common::HashMap::new();
210-
for projection in self.projection_expr().iter() {
211-
let (aliased_index, _output_field) = self
212-
.projector
213-
.output_schema()
214-
.column_with_name(&projection.alias)
215-
.ok_or_else(|| {
216-
DataFusionError::Internal(format!(
217-
"Expr {} with alias {} not found in output schema",
218-
projection.expr, projection.alias
219-
))
220-
})?;
221-
let aliased_col = Column::new(&projection.alias, aliased_index);
210+
// Use the enumerate index directly rather than `column_with_name`,
211+
// because the output schema columns are ordered identically to the
212+
// projection expressions. `column_with_name` returns the *first*
213+
// column with a given name, which silently produces duplicate HashMap
214+
// keys (and overwrites earlier entries) when the projection contains
215+
// same-named columns from different join sides.
216+
for (idx, projection) in self.projection_expr().iter().enumerate() {
217+
let aliased_col = Column::new(&projection.alias, idx);
222218
alias_map.insert(aliased_col, Arc::clone(&projection.expr));
223219
}
224220
Ok(alias_map)
@@ -405,16 +401,22 @@ impl ExecutionPlan for ProjectionExec {
405401
) -> Result<FilterDescription> {
406402
// expand alias column to original expr in parent filters
407403
let invert_alias_map = self.collect_reverse_alias()?;
408-
let output_schema = self.schema();
409-
let remapper = FilterRemapper::new(output_schema);
410404
let mut child_parent_filters = Vec::with_capacity(parent_filters.len());
411405

412406
for filter in parent_filters {
413-
// Check that column exists in child, then reassign column indices to match child schema
414-
if let Some(reassigned) = remapper.try_remap(&filter)? {
415-
// rewrite filter expression using invert alias map
407+
// Validate that every column referenced by the filter exists in
408+
// the reverse-alias map (keyed by exact (name, index) pair).
409+
// We must NOT use FilterRemapper::try_remap here because its
410+
// index_of lookup returns the *first* column with a given name,
411+
// which silently re-targets the filter to the wrong column when
412+
// the projection output contains duplicate column names (e.g.
413+
// after a join where both sides have an `id` column).
414+
let columns = collect_columns(&filter);
415+
let all_in_alias_map =
416+
columns.iter().all(|col| invert_alias_map.contains_key(col));
417+
if all_in_alias_map {
416418
let mut rewriter = PhysicalColumnRewriter::new(&invert_alias_map);
417-
let rewritten = reassigned.rewrite(&mut rewriter)?.data;
419+
let rewritten = filter.rewrite(&mut rewriter)?.data;
418420
child_parent_filters.push(PushedDownPredicate::supported(rewritten));
419421
} else {
420422
child_parent_filters.push(PushedDownPredicate::unsupported(filter));

0 commit comments

Comments
 (0)