Skip to content

Commit b80bf2c

Browse files
haohuaijinadriangb
andauthored
fix: filter pushdown when merge filter (#20110)
## Which issue does this PR close? - Closes #20109 ## Rationale for this change see issue #20109 ## What changes are included in this PR? 1. Remap parent filter expressions: When a FilterExec has a projection, remap unsupported parent filter expressions from output schema coordinates to input schema coordinates using `reassign_expr_columns()` before combining them with the current filter's predicates. 2. Preserve projection: When creating the merged FilterExec, preserve the original projection instead of discarding it . ## Are these changes tested? yes, add some test case ## Are there any user-facing changes? --------- Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com>
1 parent 5981d62 commit b80bf2c

2 files changed

Lines changed: 110 additions & 7 deletions

File tree

datafusion/core/tests/physical_optimizer/filter_pushdown.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3720,3 +3720,90 @@ async fn test_hashjoin_dynamic_filter_pushdown_is_used() {
37203720
);
37213721
}
37223722
}
3723+
3724+
/// Regression test for https://github.com/apache/datafusion/issues/20109
3725+
#[tokio::test]
3726+
async fn test_filter_with_projection_pushdown() {
3727+
use arrow::array::{Int64Array, RecordBatch, StringArray};
3728+
use datafusion_physical_plan::collect;
3729+
use datafusion_physical_plan::filter::FilterExecBuilder;
3730+
3731+
// Create schema: [time, event, size]
3732+
let schema = Arc::new(Schema::new(vec![
3733+
Field::new("time", DataType::Int64, false),
3734+
Field::new("event", DataType::Utf8, false),
3735+
Field::new("size", DataType::Int64, false),
3736+
]));
3737+
3738+
// Create sample data
3739+
let timestamps = vec![100i64, 200, 300, 400, 500];
3740+
let events = vec!["Ingestion", "Ingestion", "Query", "Ingestion", "Query"];
3741+
let sizes = vec![10i64, 20, 30, 40, 50];
3742+
3743+
let batch = RecordBatch::try_new(
3744+
schema.clone(),
3745+
vec![
3746+
Arc::new(Int64Array::from(timestamps)),
3747+
Arc::new(StringArray::from(events)),
3748+
Arc::new(Int64Array::from(sizes)),
3749+
],
3750+
)
3751+
.unwrap();
3752+
3753+
// Create data source
3754+
let memory_exec = datafusion_datasource::memory::MemorySourceConfig::try_new_exec(
3755+
&[vec![batch]],
3756+
schema.clone(),
3757+
None,
3758+
)
3759+
.unwrap();
3760+
3761+
// First FilterExec: time < 350 with projection=[event@1, size@2]
3762+
let time_col = col("time", &memory_exec.schema()).unwrap();
3763+
let time_filter = Arc::new(BinaryExpr::new(
3764+
time_col,
3765+
Operator::Lt,
3766+
Arc::new(Literal::new(ScalarValue::Int64(Some(350)))),
3767+
));
3768+
let filter1 = Arc::new(
3769+
FilterExecBuilder::new(time_filter, memory_exec)
3770+
.apply_projection(Some(vec![1, 2]))
3771+
.unwrap()
3772+
.build()
3773+
.unwrap(),
3774+
);
3775+
3776+
// Second FilterExec: event = 'Ingestion' with projection=[size@1]
3777+
let event_col = col("event", &filter1.schema()).unwrap();
3778+
let event_filter = Arc::new(BinaryExpr::new(
3779+
event_col,
3780+
Operator::Eq,
3781+
Arc::new(Literal::new(ScalarValue::Utf8(Some(
3782+
"Ingestion".to_string(),
3783+
)))),
3784+
));
3785+
let filter2 = Arc::new(
3786+
FilterExecBuilder::new(event_filter, filter1)
3787+
.apply_projection(Some(vec![1]))
3788+
.unwrap()
3789+
.build()
3790+
.unwrap(),
3791+
);
3792+
3793+
// Apply filter pushdown optimization
3794+
let config = ConfigOptions::default();
3795+
let optimized_plan = FilterPushdown::new()
3796+
.optimize(Arc::clone(&filter2) as Arc<dyn ExecutionPlan>, &config)
3797+
.unwrap();
3798+
3799+
// Execute the optimized plan - this should not error
3800+
let ctx = SessionContext::new();
3801+
let result = collect(optimized_plan, ctx.task_ctx()).await.unwrap();
3802+
3803+
// Verify results: should return rows where time < 350 AND event = 'Ingestion'
3804+
// That's rows with time=100,200 (both have event='Ingestion'), so sizes 10,20
3805+
let expected = [
3806+
"+------+", "| size |", "+------+", "| 10 |", "| 20 |", "+------+",
3807+
];
3808+
assert_batches_eq!(expected, &result);
3809+
}

datafusion/physical-plan/src/filter.rs

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ use datafusion_expr::Operator;
5757
use datafusion_physical_expr::equivalence::ProjectionMapping;
5858
use datafusion_physical_expr::expressions::{BinaryExpr, Column, lit};
5959
use datafusion_physical_expr::intervals::utils::check_support;
60-
use datafusion_physical_expr::utils::collect_columns;
60+
use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
6161
use datafusion_physical_expr::{
6262
AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, analyze,
6363
conjunction, split_conjunction,
@@ -623,10 +623,26 @@ impl ExecutionPlan for FilterExec {
623623
return Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
624624
}
625625
// We absorb any parent filters that were not handled by our children
626-
let unsupported_parent_filters =
627-
child_pushdown_result.parent_filters.iter().filter_map(|f| {
628-
matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter))
629-
});
626+
let mut unsupported_parent_filters: Vec<Arc<dyn PhysicalExpr>> =
627+
child_pushdown_result
628+
.parent_filters
629+
.iter()
630+
.filter_map(|f| {
631+
matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter))
632+
})
633+
.collect();
634+
635+
// If this FilterExec has a projection, the unsupported parent filters
636+
// are in the output schema (after projection) coordinates. We need to
637+
// remap them to the input schema coordinates before combining with self filters.
638+
if self.projection.is_some() {
639+
let input_schema = self.input().schema();
640+
unsupported_parent_filters = unsupported_parent_filters
641+
.into_iter()
642+
.map(|expr| reassign_expr_columns(expr, &input_schema))
643+
.collect::<Result<Vec<_>>>()?;
644+
}
645+
630646
let unsupported_self_filters = child_pushdown_result
631647
.self_filters
632648
.first()
@@ -674,7 +690,7 @@ impl ExecutionPlan for FilterExec {
674690
// The new predicate is the same as our current predicate
675691
None
676692
} else {
677-
// Create a new FilterExec with the new predicate
693+
// Create a new FilterExec with the new predicate, preserving the projection
678694
let new = FilterExec {
679695
predicate: Arc::clone(&new_predicate),
680696
input: Arc::clone(&filter_input),
@@ -686,7 +702,7 @@ impl ExecutionPlan for FilterExec {
686702
self.default_selectivity,
687703
self.projection.as_ref(),
688704
)?,
689-
projection: None,
705+
projection: self.projection.clone(),
690706
batch_size: self.batch_size,
691707
fetch: self.fetch,
692708
};

0 commit comments

Comments
 (0)