Skip to content

Commit c7b55cc

Browse files
adriangbclaude
andcommitted
test: update expected output for Optional(DynamicFilter) wrapper
The OptionalFilterPhysicalExpr wrapper introduced for optional dynamic filters changes two things observable in tests: 1. Display format: `DynamicFilter [...]` becomes `Optional(DynamicFilter [...])` when wrapped. Update insta snapshots in filter_pushdown.rs and plan strings in joins.slt / push_down_filter_parquet.slt. 2. Adaptive tracker placement: new filters flow through the byte-ratio heuristic and optional filters can be skipped mid-stream. This zeroes pushdown metrics for cases where the tracker chose PostScan or dropped the filter. Row group / page pruning still runs via the pruning predicate, so output rows are unchanged. Also fix test_discover_dynamic_filters_via_expressions_api to walk each expression subtree so the inner DynamicFilterPhysicalExpr is still found when wrapped in OptionalFilterPhysicalExpr. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent cb694bf commit c7b55cc

3 files changed

Lines changed: 56 additions & 46 deletions

File tree

datafusion/core/tests/physical_optimizer/filter_pushdown.rs

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1031,7 +1031,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
10311031
// expect the predicate to be pushed down into the probe side DataSource
10321032
insta::assert_snapshot!(
10331033
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true),
1034-
@r"
1034+
@"
10351035
OptimizationTest:
10361036
input:
10371037
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
@@ -1049,7 +1049,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
10491049
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
10501050
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
10511051
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1052-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
1052+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=Optional(DynamicFilter [ empty ])
10531053
"
10541054
);
10551055

@@ -1076,14 +1076,14 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
10761076
#[cfg(not(feature = "force_hash_collisions"))]
10771077
insta::assert_snapshot!(
10781078
format!("{}", format_plan_for_test(&plan)),
1079-
@r"
1079+
@"
10801080
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
10811081
- CoalescePartitionsExec
10821082
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
10831083
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
10841084
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
10851085
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1086-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 5 THEN a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:ab,c1:bb}]) WHEN 8 THEN a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}]) ELSE false END ]
1086+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=Optional(DynamicFilter [ CASE hash_repartition % 12 WHEN 5 THEN a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:ab,c1:bb}]) WHEN 8 THEN a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}]) ELSE false END ])
10871087
"
10881088
);
10891089

@@ -1231,7 +1231,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() {
12311231
// expect the predicate to be pushed down into the probe side DataSource
12321232
insta::assert_snapshot!(
12331233
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true),
1234-
@r"
1234+
@"
12351235
OptimizationTest:
12361236
input:
12371237
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
@@ -1247,7 +1247,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() {
12471247
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
12481248
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
12491249
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1250-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
1250+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=Optional(DynamicFilter [ empty ])
12511251
"
12521252
);
12531253

@@ -1273,13 +1273,13 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() {
12731273
// Now check what our filter looks like
12741274
insta::assert_snapshot!(
12751275
format!("{}", format_plan_for_test(&plan)),
1276-
@r"
1276+
@"
12771277
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
12781278
- CoalescePartitionsExec
12791279
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
12801280
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
12811281
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1282-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ]
1282+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=Optional(DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ])
12831283
"
12841284
);
12851285

@@ -2404,12 +2404,12 @@ async fn test_hashjoin_dynamic_filter_all_partitions_empty() {
24042404

24052405
insta::assert_snapshot!(
24062406
format_plan_for_test(&plan),
2407-
@r"
2407+
@"
24082408
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
24092409
- RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1
24102410
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true
24112411
- RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1
2412-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
2412+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=Optional(DynamicFilter [ empty ])
24132413
"
24142414
);
24152415

@@ -2429,12 +2429,12 @@ async fn test_hashjoin_dynamic_filter_all_partitions_empty() {
24292429
// Test that filters are pushed down correctly to each side of the join
24302430
insta::assert_snapshot!(
24312431
format_plan_for_test(&plan),
2432-
@r"
2432+
@"
24332433
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
24342434
- RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1
24352435
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true
24362436
- RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1
2437-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ false ]
2437+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=Optional(DynamicFilter [ false ])
24382438
"
24392439
);
24402440
}
@@ -2954,18 +2954,28 @@ async fn test_discover_dynamic_filters_via_expressions_api() {
29542954
use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
29552955
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
29562956

2957+
fn count_in_expr(expr: &dyn PhysicalExpr) -> usize {
2958+
let mut count = 0;
2959+
if expr.downcast_ref::<DynamicFilterPhysicalExpr>().is_some() {
2960+
count += 1;
2961+
}
2962+
for child in expr.children() {
2963+
count += count_in_expr(child.as_ref());
2964+
}
2965+
count
2966+
}
2967+
29572968
fn count_dynamic_filters(plan: &Arc<dyn ExecutionPlan>) -> usize {
29582969
let mut count = 0;
29592970

2960-
// Check expressions from this node using apply_expressions
2971+
// Check expressions from this node using apply_expressions.
2972+
// Walk each expression subtree to find DynamicFilterPhysicalExpr even
2973+
// when wrapped (e.g. by OptionalFilterPhysicalExpr).
29612974
let _ = plan.apply_expressions(&mut |expr| {
2962-
if let Some(_df) = expr.downcast_ref::<DynamicFilterPhysicalExpr>() {
2963-
count += 1;
2964-
}
2975+
count += count_in_expr(expr);
29652976
Ok(TreeNodeRecursion::Continue)
29662977
});
29672978

2968-
// Recursively visit children
29692979
for child in plan.children() {
29702980
count += count_dynamic_filters(child);
29712981
}

datafusion/sqllogictest/test_files/joins.slt

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2865,7 +2865,7 @@ physical_plan
28652865
02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)]
28662866
03)----DataSourceExec: partitions=1, partition_sizes=[1]
28672867
04)----SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
2868-
05)------FilterExec: DynamicFilter [ empty ]
2868+
05)------FilterExec: Optional(DynamicFilter [ empty ])
28692869
06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
28702870
07)----------DataSourceExec: partitions=1, partition_sizes=[1]
28712871

@@ -2901,7 +2901,7 @@ physical_plan
29012901
02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)]
29022902
03)----DataSourceExec: partitions=1, partition_sizes=[1]
29032903
04)----SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
2904-
05)------FilterExec: DynamicFilter [ empty ]
2904+
05)------FilterExec: Optional(DynamicFilter [ empty ])
29052905
06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
29062906
07)----------DataSourceExec: partitions=1, partition_sizes=[1]
29072907

@@ -2958,7 +2958,7 @@ physical_plan
29582958
02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)]
29592959
03)----DataSourceExec: partitions=1, partition_sizes=[1]
29602960
04)----SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
2961-
05)------FilterExec: DynamicFilter [ empty ]
2961+
05)------FilterExec: Optional(DynamicFilter [ empty ])
29622962
06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
29632963
07)----------DataSourceExec: partitions=1, partition_sizes=[1]
29642964

@@ -2994,7 +2994,7 @@ physical_plan
29942994
02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)]
29952995
03)----DataSourceExec: partitions=1, partition_sizes=[1]
29962996
04)----SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
2997-
05)------FilterExec: DynamicFilter [ empty ]
2997+
05)------FilterExec: Optional(DynamicFilter [ empty ])
29982998
06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
29992999
07)----------DataSourceExec: partitions=1, partition_sizes=[1]
30003000

@@ -3052,7 +3052,7 @@ physical_plan
30523052
02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@1 != t1_name@0
30533053
03)----DataSourceExec: partitions=1, partition_sizes=[1]
30543054
04)----SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
3055-
05)------FilterExec: DynamicFilter [ empty ]
3055+
05)------FilterExec: Optional(DynamicFilter [ empty ])
30563056
06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
30573057
07)----------DataSourceExec: partitions=1, partition_sizes=[1]
30583058

@@ -3069,7 +3069,7 @@ physical_plan
30693069
02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@0 != t1_name@1
30703070
03)----DataSourceExec: partitions=1, partition_sizes=[1]
30713071
04)----SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
3072-
05)------FilterExec: DynamicFilter [ empty ]
3072+
05)------FilterExec: Optional(DynamicFilter [ empty ])
30733073
06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
30743074
07)----------DataSourceExec: partitions=1, partition_sizes=[1]
30753075

@@ -3124,7 +3124,7 @@ physical_plan
31243124
02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@1 != t1_name@0
31253125
03)----DataSourceExec: partitions=1, partition_sizes=[1]
31263126
04)----SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
3127-
05)------FilterExec: DynamicFilter [ empty ]
3127+
05)------FilterExec: Optional(DynamicFilter [ empty ])
31283128
06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
31293129
07)----------DataSourceExec: partitions=1, partition_sizes=[1]
31303130

@@ -3141,7 +3141,7 @@ physical_plan
31413141
02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)], filter=t2_name@0 != t1_name@1
31423142
03)----DataSourceExec: partitions=1, partition_sizes=[1]
31433143
04)----SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
3144-
05)------FilterExec: DynamicFilter [ empty ]
3144+
05)------FilterExec: Optional(DynamicFilter [ empty ])
31453145
06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
31463146
07)----------DataSourceExec: partitions=1, partition_sizes=[1]
31473147

@@ -4149,7 +4149,7 @@ physical_plan
41494149
01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(b@1, y@1)], filter=a@0 < x@1
41504150
02)--DataSourceExec: partitions=1, partition_sizes=[0]
41514151
03)--SortExec: expr=[x@0 ASC NULLS LAST], preserve_partitioning=[false]
4152-
04)----FilterExec: DynamicFilter [ empty ]
4152+
04)----FilterExec: Optional(DynamicFilter [ empty ])
41534153
05)------DataSourceExec: partitions=1, partition_sizes=[0]
41544154

41554155
# Test full join with limit
@@ -4452,7 +4452,7 @@ physical_plan
44524452
04)------FilterExec: b@1 > 3, projection=[a@0]
44534453
05)--------DataSourceExec: partitions=2, partition_sizes=[1, 1]
44544454
06)----SortExec: expr=[c@2 DESC], preserve_partitioning=[true]
4455-
07)------FilterExec: DynamicFilter [ empty ]
4455+
07)------FilterExec: Optional(DynamicFilter [ empty ])
44564456
08)--------DataSourceExec: partitions=2, partition_sizes=[1, 1]
44574457

44584458
query TT
@@ -4473,7 +4473,7 @@ physical_plan
44734473
04)------FilterExec: b@1 > 3, projection=[a@0]
44744474
05)--------DataSourceExec: partitions=2, partition_sizes=[1, 1]
44754475
06)----SortExec: expr=[c@2 DESC NULLS LAST], preserve_partitioning=[true]
4476-
07)------FilterExec: DynamicFilter [ empty ]
4476+
07)------FilterExec: Optional(DynamicFilter [ empty ])
44774477
08)--------DataSourceExec: partitions=2, partition_sizes=[1, 1]
44784478

44794479
query III

0 commit comments

Comments
 (0)