Skip to content

Commit c931c89

Browse files
ensure dynamic filters are correctly pushed down through aggregations (#21059)
## Which issue does this PR close? - Closes #21065. ## Rationale for this change In plans such as the following, dynamic filters are not pushed down through the aggregation ``` CREATE TABLE data (a VARCHAR, ts TIMESTAMP, value DOUBLE) AS VALUES ('h1', '2024-01-01T00:05:00', 1.0), ('h1', '2024-01-01T00:15:00', 2.0), ('h2', '2024-01-01T00:25:00', 3.0), ('h3', '2024-01-01T00:35:00', 4.0); SELECT * FROM contexts c INNER JOIN ( SELECT a, date_bin(interval '1 hour', ts) AS bucket, min(value) AS min_val FROM (SELECT value, a, ts FROM data) GROUP BY a, date_bin(interval '1 hour', ts) ) agg ON c.a = agg.a; ``` ``` HashJoinExec: mode=Auto, join_type=Inner, on=[(a@0, a@0)] DataSourceExec: partitions=1 ProjectionExec: [a@0, date_bin(1h, ts)@1 as bucket, min(value)@2 as min_val] AggregateExec: mode=FinalPartitioned, gby=[a@0, date_bin(1h, ts)@1], aggr=[min(value)] AggregateExec: mode=Partial, gby=[a@1, date_bin(1h, ts@2)], aggr=[min(value)] ProjectionExec: [value@2, a@0, ts@1] ← reorders columns DataSourceExec: partitions=1 ``` ## What changes are included in this PR? `AggregateExec::gather_filters_for_pushdown` compared parent filter columns (output schema indices) against grouping expression columns (input schema indices). When a `ProjectionExec` below the aggregate reorders columns, the index mismatch causes filters (such as HashJoin dynamic filters) to be incorrectly blocked. This change fixes the column index mapping in `AggregateExec::gather_filters_for_pushdown` ## Are these changes tested? - `test_pushdown_through_aggregate_with_reordered_input_columns` — filter on grouping column with reordered input is pushed down - `test_pushdown_through_aggregate_with_reordered_input_no_pushdown_on_agg_result` — filter on aggregate result column is not pushed down - `test_pushdown_through_aggregate_grouping_sets_with_reordered_input` — GROUPING SETS: filter on common column pushed, filter on missing column blocked - `test_hashjoin_dynamic_filter_pushdown_through_aggregate_with_reordered_input` — HashJoin dynamic filter pushes through aggregate with reordered input and is populated with values after execution - All tests verified to fail without the fix ## Are there any user-facing changes? No.
1 parent 8b5c9b4 commit c931c89

3 files changed

Lines changed: 395 additions & 9 deletions

File tree

datafusion/core/tests/physical_optimizer/filter_pushdown.rs

Lines changed: 342 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use datafusion::{
3434
scalar::ScalarValue,
3535
};
3636
use datafusion_catalog::memory::DataSourceExec;
37+
use datafusion_common::JoinType;
3738
use datafusion_common::config::ConfigOptions;
3839
use datafusion_datasource::{
3940
PartitionedFile, file_groups::FileGroup, file_scan_config::FileScanConfigBuilder,
@@ -53,6 +54,7 @@ use datafusion_physical_expr::{
5354
use datafusion_physical_optimizer::{
5455
PhysicalOptimizerRule, filter_pushdown::FilterPushdown,
5556
};
57+
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
5658
use datafusion_physical_plan::{
5759
ExecutionPlan,
5860
aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
@@ -3489,6 +3491,346 @@ fn test_pushdown_with_empty_group_by() {
34893491
);
34903492
}
34913493

3494+
#[test]
3495+
fn test_pushdown_through_aggregate_with_reordered_input_columns() {
3496+
let scan = TestScanBuilder::new(schema()).with_support(true).build();
3497+
3498+
// Reorder scan output from (a, b, c) to (c, a, b)
3499+
let reordered_schema = Arc::new(Schema::new(vec![
3500+
Field::new("c", DataType::Float64, false),
3501+
Field::new("a", DataType::Utf8, false),
3502+
Field::new("b", DataType::Utf8, false),
3503+
]));
3504+
let projection = Arc::new(
3505+
ProjectionExec::try_new(
3506+
vec![
3507+
(col("c", &schema()).unwrap(), "c".to_string()),
3508+
(col("a", &schema()).unwrap(), "a".to_string()),
3509+
(col("b", &schema()).unwrap(), "b".to_string()),
3510+
],
3511+
scan,
3512+
)
3513+
.unwrap(),
3514+
);
3515+
3516+
let aggregate_expr = vec![
3517+
AggregateExprBuilder::new(
3518+
count_udaf(),
3519+
vec![col("c", &reordered_schema).unwrap()],
3520+
)
3521+
.schema(reordered_schema.clone())
3522+
.alias("cnt")
3523+
.build()
3524+
.map(Arc::new)
3525+
.unwrap(),
3526+
];
3527+
3528+
// Group by a@1, b@2 (input indices in reordered schema)
3529+
let group_by = PhysicalGroupBy::new_single(vec![
3530+
(col("a", &reordered_schema).unwrap(), "a".to_string()),
3531+
(col("b", &reordered_schema).unwrap(), "b".to_string()),
3532+
]);
3533+
3534+
let aggregate = Arc::new(
3535+
AggregateExec::try_new(
3536+
AggregateMode::Final,
3537+
group_by,
3538+
aggregate_expr,
3539+
vec![None],
3540+
projection,
3541+
reordered_schema,
3542+
)
3543+
.unwrap(),
3544+
);
3545+
3546+
// Filter on b@1 in aggregate's output schema (a@0, b@1, cnt@2)
3547+
// The grouping expr for b references input index 2, but output index is 1.
3548+
let agg_output_schema = aggregate.schema();
3549+
let predicate = col_lit_predicate("b", "bar", &agg_output_schema);
3550+
let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap());
3551+
3552+
// The filter should be pushed down
3553+
insta::assert_snapshot!(
3554+
OptimizationTest::new(plan, FilterPushdown::new(), true),
3555+
@r"
3556+
OptimizationTest:
3557+
input:
3558+
- FilterExec: b@1 = bar
3559+
- AggregateExec: mode=Final, gby=[a@1 as a, b@2 as b], aggr=[cnt]
3560+
- ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
3561+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
3562+
output:
3563+
Ok:
3564+
- AggregateExec: mode=Final, gby=[a@1 as a, b@2 as b], aggr=[cnt], ordering_mode=PartiallySorted([1])
3565+
- ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
3566+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 = bar
3567+
"
3568+
);
3569+
}
3570+
3571+
#[test]
3572+
fn test_pushdown_through_aggregate_grouping_sets_with_reordered_input() {
3573+
let scan = TestScanBuilder::new(schema()).with_support(true).build();
3574+
3575+
let reordered_schema = Arc::new(Schema::new(vec![
3576+
Field::new("c", DataType::Float64, false),
3577+
Field::new("a", DataType::Utf8, false),
3578+
Field::new("b", DataType::Utf8, false),
3579+
]));
3580+
let projection = Arc::new(
3581+
ProjectionExec::try_new(
3582+
vec![
3583+
(col("c", &schema()).unwrap(), "c".to_string()),
3584+
(col("a", &schema()).unwrap(), "a".to_string()),
3585+
(col("b", &schema()).unwrap(), "b".to_string()),
3586+
],
3587+
scan,
3588+
)
3589+
.unwrap(),
3590+
);
3591+
3592+
let aggregate_expr = vec![
3593+
AggregateExprBuilder::new(
3594+
count_udaf(),
3595+
vec![col("c", &reordered_schema).unwrap()],
3596+
)
3597+
.schema(reordered_schema.clone())
3598+
.alias("cnt")
3599+
.build()
3600+
.map(Arc::new)
3601+
.unwrap(),
3602+
];
3603+
3604+
// Use grouping sets (a, b) and (b).
3605+
let group_by = PhysicalGroupBy::new(
3606+
vec![
3607+
(col("a", &reordered_schema).unwrap(), "a".to_string()),
3608+
(col("b", &reordered_schema).unwrap(), "b".to_string()),
3609+
],
3610+
vec![
3611+
(
3612+
Arc::new(Literal::new(ScalarValue::Utf8(None))),
3613+
"a".to_string(),
3614+
),
3615+
(
3616+
Arc::new(Literal::new(ScalarValue::Utf8(None))),
3617+
"b".to_string(),
3618+
),
3619+
],
3620+
vec![vec![false, false], vec![true, false]],
3621+
true,
3622+
);
3623+
3624+
let aggregate = Arc::new(
3625+
AggregateExec::try_new(
3626+
AggregateMode::Final,
3627+
group_by,
3628+
aggregate_expr,
3629+
vec![None],
3630+
projection,
3631+
reordered_schema,
3632+
)
3633+
.unwrap(),
3634+
);
3635+
3636+
let agg_output_schema = aggregate.schema();
3637+
3638+
// Filter on b (present in all grouping sets) should be pushed down
3639+
let predicate = col_lit_predicate("b", "bar", &agg_output_schema);
3640+
let plan = Arc::new(FilterExec::try_new(predicate, aggregate.clone()).unwrap());
3641+
3642+
insta::assert_snapshot!(
3643+
OptimizationTest::new(plan, FilterPushdown::new(), true),
3644+
@r"
3645+
OptimizationTest:
3646+
input:
3647+
- FilterExec: b@1 = bar
3648+
- AggregateExec: mode=Final, gby=[(a@1 as a, b@2 as b), (NULL as a, b@2 as b)], aggr=[cnt]
3649+
- ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
3650+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
3651+
output:
3652+
Ok:
3653+
- AggregateExec: mode=Final, gby=[(a@1 as a, b@2 as b), (NULL as a, b@2 as b)], aggr=[cnt], ordering_mode=PartiallySorted([1])
3654+
- ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
3655+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 = bar
3656+
"
3657+
);
3658+
3659+
// Filter on a (missing from second grouping set) should not be pushed down
3660+
let predicate = col_lit_predicate("a", "foo", &agg_output_schema);
3661+
let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap());
3662+
3663+
insta::assert_snapshot!(
3664+
OptimizationTest::new(plan, FilterPushdown::new(), true),
3665+
@r"
3666+
OptimizationTest:
3667+
input:
3668+
- FilterExec: a@0 = foo
3669+
- AggregateExec: mode=Final, gby=[(a@1 as a, b@2 as b), (NULL as a, b@2 as b)], aggr=[cnt]
3670+
- ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
3671+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
3672+
output:
3673+
Ok:
3674+
- FilterExec: a@0 = foo
3675+
- AggregateExec: mode=Final, gby=[(a@1 as a, b@2 as b), (NULL as a, b@2 as b)], aggr=[cnt]
3676+
- ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
3677+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
3678+
"
3679+
);
3680+
}
3681+
3682+
/// Regression test for https://github.com/apache/datafusion/issues/21065.
3683+
///
3684+
/// Given a plan similar to the following, ensure that the filter is pushed down
3685+
/// through an AggregateExec whose input columns are reordered by a ProjectionExec.
3686+
#[tokio::test]
3687+
async fn test_hashjoin_dynamic_filter_pushdown_through_aggregate_with_reordered_input() {
3688+
// Build side
3689+
let build_batches = vec![record_batch!(("a", Utf8, ["h1", "h2"])).unwrap()];
3690+
let build_schema =
3691+
Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)]));
3692+
let build_scan = TestScanBuilder::new(Arc::clone(&build_schema))
3693+
.with_support(true)
3694+
.with_batches(build_batches)
3695+
.build();
3696+
3697+
// Probe side
3698+
let probe_batches = vec![
3699+
record_batch!(
3700+
("a", Utf8, ["h1", "h2", "h3", "h4"]),
3701+
("value", Float64, [1.0, 2.0, 3.0, 4.0])
3702+
)
3703+
.unwrap(),
3704+
];
3705+
let probe_schema = Arc::new(Schema::new(vec![
3706+
Field::new("a", DataType::Utf8, false),
3707+
Field::new("value", DataType::Float64, false),
3708+
]));
3709+
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_schema))
3710+
.with_support(true)
3711+
.with_batches(probe_batches)
3712+
.build();
3713+
3714+
// ProjectionExec reorders (a, value) → (value, a)
3715+
let reordered_schema = Arc::new(Schema::new(vec![
3716+
Field::new("value", DataType::Float64, false),
3717+
Field::new("a", DataType::Utf8, false),
3718+
]));
3719+
let projection = Arc::new(
3720+
ProjectionExec::try_new(
3721+
vec![
3722+
(col("value", &probe_schema).unwrap(), "value".to_string()),
3723+
(col("a", &probe_schema).unwrap(), "a".to_string()),
3724+
],
3725+
probe_scan,
3726+
)
3727+
.unwrap(),
3728+
);
3729+
3730+
// AggregateExec: GROUP BY a@1, min(value@0)
3731+
let aggregate_expr = vec![
3732+
AggregateExprBuilder::new(
3733+
min_udaf(),
3734+
vec![col("value", &reordered_schema).unwrap()],
3735+
)
3736+
.schema(reordered_schema.clone())
3737+
.alias("min_value")
3738+
.build()
3739+
.map(Arc::new)
3740+
.unwrap(),
3741+
];
3742+
let group_by = PhysicalGroupBy::new_single(vec![(
3743+
col("a", &reordered_schema).unwrap(), // a@1 in input
3744+
"a".to_string(),
3745+
)]);
3746+
3747+
let aggregate = Arc::new(
3748+
AggregateExec::try_new(
3749+
AggregateMode::Single,
3750+
group_by,
3751+
aggregate_expr,
3752+
vec![None],
3753+
projection,
3754+
reordered_schema,
3755+
)
3756+
.unwrap(),
3757+
);
3758+
3759+
// Aggregate output schema: (a@0, min_value@1)
3760+
let agg_output_schema = aggregate.schema();
3761+
3762+
// Join the build and probe side
3763+
let plan = Arc::new(
3764+
HashJoinExec::try_new(
3765+
build_scan,
3766+
aggregate,
3767+
vec![(
3768+
col("a", &build_schema).unwrap(),
3769+
col("a", &agg_output_schema).unwrap(),
3770+
)],
3771+
None,
3772+
&JoinType::Inner,
3773+
None,
3774+
PartitionMode::CollectLeft,
3775+
datafusion_common::NullEquality::NullEqualsNothing,
3776+
false,
3777+
)
3778+
.unwrap(),
3779+
) as Arc<dyn ExecutionPlan>;
3780+
3781+
// The HashJoin's dynamic filter on `a` should push
3782+
// through the aggregate and reach the probe-side DataSource.
3783+
insta::assert_snapshot!(
3784+
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true),
3785+
@r"
3786+
OptimizationTest:
3787+
input:
3788+
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)]
3789+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a], file_type=test, pushdown_supported=true
3790+
- AggregateExec: mode=Single, gby=[a@1 as a], aggr=[min_value]
3791+
- ProjectionExec: expr=[value@1 as value, a@0 as a]
3792+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, value], file_type=test, pushdown_supported=true
3793+
output:
3794+
Ok:
3795+
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)]
3796+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a], file_type=test, pushdown_supported=true
3797+
- AggregateExec: mode=Single, gby=[a@1 as a], aggr=[min_value]
3798+
- ProjectionExec: expr=[value@1 as value, a@0 as a]
3799+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, value], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
3800+
"
3801+
);
3802+
3803+
// Actually execute the plan to verify the dynamic filter is populated
3804+
let mut config = ConfigOptions::default();
3805+
config.execution.parquet.pushdown_filters = true;
3806+
let plan = FilterPushdown::new_post_optimization()
3807+
.optimize(plan, &config)
3808+
.unwrap();
3809+
3810+
let session_config = SessionConfig::new().with_batch_size(10);
3811+
let session_ctx = SessionContext::new_with_config(session_config);
3812+
session_ctx.register_object_store(
3813+
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
3814+
Arc::new(InMemory::new()),
3815+
);
3816+
let state = session_ctx.state();
3817+
let task_ctx = state.task_ctx();
3818+
let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap();
3819+
stream.next().await.unwrap().unwrap();
3820+
3821+
// After execution, the dynamic filter should be populated with values
3822+
insta::assert_snapshot!(
3823+
format!("{}", format_plan_for_test(&plan)),
3824+
@r"
3825+
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)]
3826+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a], file_type=test, pushdown_supported=true
3827+
- AggregateExec: mode=Single, gby=[a@1 as a], aggr=[min_value]
3828+
- ProjectionExec: expr=[value@1 as value, a@0 as a]
3829+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, value], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= h1 AND a@0 <= h2 AND a@0 IN (SET) ([h1, h2]) ]
3830+
"
3831+
);
3832+
}
3833+
34923834
#[test]
34933835
fn test_pushdown_with_computed_grouping_key() {
34943836
// Test filter pushdown with computed grouping expression

0 commit comments

Comments
 (0)