Skip to content

Commit e4bcab0

Browse files
committed
Actually preserve predicate execution order in PushDownFilter
1 parent 26c6121 commit e4bcab0

1 file changed

Lines changed: 51 additions & 11 deletions

File tree

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -810,25 +810,22 @@ impl OptimizerRule for PushDownFilter {
810810

811811
match Arc::unwrap_or_clone(filter.input) {
812812
LogicalPlan::Filter(child_filter) => {
813-
let parents_predicates = split_conjunction_owned(filter.predicate);
814-
815-
// remove duplicated filters
816-
let child_predicates = split_conjunction_owned(child_filter.predicate);
817-
let new_predicates = parents_predicates
818-
.into_iter()
819-
.chain(child_predicates)
820-
// use IndexSet to remove dupes while preserving predicate order
821-
.collect::<IndexSet<_>>()
813+
// child filters first to preserve execution order
814+
let new_predicates = split_conjunction_owned(child_filter.predicate)
822815
.into_iter()
823-
.collect::<Vec<_>>();
816+
.chain(split_conjunction_owned(filter.predicate))
817+
// use IndexSet to remove duplicates while preserving predicate order
818+
.collect::<IndexSet<_>>();
824819

825820
let Some(new_predicate) = conjunction(new_predicates) else {
826821
return plan_err!("at least one expression exists");
827822
};
823+
828824
let new_filter = LogicalPlan::Filter(Filter::try_new(
829825
new_predicate,
830826
child_filter.input,
831827
)?);
828+
832829
self.rewrite(new_filter, config)
833830
}
834831
LogicalPlan::Repartition(repartition) => {
@@ -2474,7 +2471,7 @@ mod tests {
24742471
plan,
24752472
@r"
24762473
Projection: test.a
2477-
Filter: test.a >= Int64(1) AND test.a <= Int64(1)
2474+
Filter: test.a <= Int64(1) AND test.a >= Int64(1)
24782475
Limit: skip=0, fetch=1
24792476
TableScan: test
24802477
"
@@ -3253,6 +3250,28 @@ mod tests {
32533250
)
32543251
}
32553252

3253+
#[test]
3254+
fn multi_combined_two_filters() -> Result<()> {
3255+
let plan = table_scan_with_pushdown_provider_builder(
3256+
TableProviderFilterPushDown::Inexact,
3257+
vec![col("a").eq(lit(10i64)), col("b").gt(lit(11i64))],
3258+
Some(vec![0]),
3259+
)?
3260+
.filter(col("a").eq(lit(10i64)))?
3261+
.filter(col("b").gt(lit(11i64)))?
3262+
.project(vec![col("a"), col("b")])?
3263+
.build()?;
3264+
3265+
assert_optimized_plan_equal!(
3266+
plan,
3267+
@r"
3268+
Projection: a, b
3269+
Filter: a = Int64(10) AND b > Int64(11)
3270+
TableScan: test projection=[a], partial_filters=[a = Int64(10), b > Int64(11)]
3271+
"
3272+
)
3273+
}
3274+
32563275
#[test]
32573276
fn multi_combined_filter_exact() -> Result<()> {
32583277
let plan = table_scan_with_pushdown_provider_builder(
@@ -3273,6 +3292,27 @@ mod tests {
32733292
)
32743293
}
32753294

3295+
#[test]
3296+
fn multi_combined_two_filters_exact() -> Result<()> {
3297+
let plan = table_scan_with_pushdown_provider_builder(
3298+
TableProviderFilterPushDown::Exact,
3299+
vec![],
3300+
Some(vec![0]),
3301+
)?
3302+
.filter(col("a").eq(lit(10i64)))?
3303+
.filter(col("b").gt(lit(11i64)))?
3304+
.project(vec![col("a"), col("b")])?
3305+
.build()?;
3306+
3307+
assert_optimized_plan_equal!(
3308+
plan,
3309+
@r"
3310+
Projection: a, b
3311+
TableScan: test projection=[a], full_filters=[a = Int64(10), b > Int64(11)]
3312+
"
3313+
)
3314+
}
3315+
32763316
#[test]
32773317
fn test_filter_with_alias() -> Result<()> {
32783318
// in table scan the true col name is 'test.a',

0 commit comments

Comments
 (0)