Skip to content

Commit 1900968

Browse files
address pr comments
- removes test_pushdown_through_aggregate_with_reordered_input_no_pushdown_on_agg_result as it is basically a duplicate of another test - adds slt tests for filter pushdown through aggregations (asserting correctness + plans)
1 parent 5c792af commit 1900968

3 files changed

Lines changed: 91 additions & 83 deletions

File tree

datafusion/core/tests/physical_optimizer/filter_pushdown.rs

Lines changed: 0 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -3224,85 +3224,6 @@ fn test_pushdown_through_aggregate_with_reordered_input_columns() {
32243224
);
32253225
}
32263226

3227-
#[test]
3228-
fn test_pushdown_through_aggregate_with_reordered_input_no_pushdown_on_agg_result() {
3229-
let scan = TestScanBuilder::new(schema()).with_support(true).build();
3230-
3231-
let reordered_schema = Arc::new(Schema::new(vec![
3232-
Field::new("c", DataType::Float64, false),
3233-
Field::new("a", DataType::Utf8, false),
3234-
Field::new("b", DataType::Utf8, false),
3235-
]));
3236-
let projection = Arc::new(
3237-
ProjectionExec::try_new(
3238-
vec![
3239-
(col("c", &schema()).unwrap(), "c".to_string()),
3240-
(col("a", &schema()).unwrap(), "a".to_string()),
3241-
(col("b", &schema()).unwrap(), "b".to_string()),
3242-
],
3243-
scan,
3244-
)
3245-
.unwrap(),
3246-
);
3247-
3248-
let aggregate_expr = vec![
3249-
AggregateExprBuilder::new(
3250-
count_udaf(),
3251-
vec![col("c", &reordered_schema).unwrap()],
3252-
)
3253-
.schema(reordered_schema.clone())
3254-
.alias("cnt")
3255-
.build()
3256-
.map(Arc::new)
3257-
.unwrap(),
3258-
];
3259-
3260-
let group_by = PhysicalGroupBy::new_single(vec![
3261-
(col("a", &reordered_schema).unwrap(), "a".to_string()),
3262-
(col("b", &reordered_schema).unwrap(), "b".to_string()),
3263-
]);
3264-
3265-
let aggregate = Arc::new(
3266-
AggregateExec::try_new(
3267-
AggregateMode::Final,
3268-
group_by,
3269-
aggregate_expr,
3270-
vec![None],
3271-
projection,
3272-
reordered_schema,
3273-
)
3274-
.unwrap(),
3275-
);
3276-
3277-
// Filter on cnt@2 (aggregate result, not a grouping column)
3278-
let agg_output_schema = aggregate.schema();
3279-
let predicate = Arc::new(BinaryExpr::new(
3280-
Arc::new(Column::new_with_schema("cnt", &agg_output_schema).unwrap()),
3281-
Operator::Gt,
3282-
Arc::new(Literal::new(ScalarValue::Int64(Some(5)))),
3283-
)) as Arc<dyn PhysicalExpr>;
3284-
let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap());
3285-
3286-
// The filter is not pushed down.
3287-
insta::assert_snapshot!(
3288-
OptimizationTest::new(plan, FilterPushdown::new(), true),
3289-
@r"
3290-
OptimizationTest:
3291-
input:
3292-
- FilterExec: cnt@2 > 5
3293-
- AggregateExec: mode=Final, gby=[a@1 as a, b@2 as b], aggr=[cnt]
3294-
- ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
3295-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
3296-
output:
3297-
Ok:
3298-
- FilterExec: cnt@2 > 5
3299-
- AggregateExec: mode=Final, gby=[a@1 as a, b@2 as b], aggr=[cnt]
3300-
- ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
3301-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
3302-
"
3303-
);
3304-
}
3305-
33063227
#[test]
33073228
fn test_pushdown_through_aggregate_grouping_sets_with_reordered_input() {
33083229
let scan = TestScanBuilder::new(schema()).with_support(true).build();

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -640,7 +640,8 @@ pub struct AggregateExec {
640640
limit_options: Option<LimitOptions>,
641641
/// Input plan, could be a partial aggregate or the input to the aggregate
642642
pub input: Arc<dyn ExecutionPlan>,
643-
/// Schema after the aggregate is applied
643+
/// Schema after the aggregate is applied. Contains the group by columns followed by the
644+
/// aggregate outputs.
644645
schema: SchemaRef,
645646
/// Input schema before any aggregation is applied. For partial aggregate this will be the
646647
/// same as input.schema() but for the final aggregate it will be the same as the input
@@ -1473,9 +1474,12 @@ impl ExecutionPlan for AggregateExec {
14731474
// This optimization is NOT safe for filters on aggregated columns (like filtering on
14741475
// the result of SUM or COUNT), as those require computing all groups first.
14751476

1476-
// Build grouping columns using output indices because parent filters reference the AggregateExec's output schema where grouping
1477-
// columns in the output schema. The grouping expressions reference
1478-
// input columns which may not match the output schema.
1477+
// Build grouping columns using output indices because parent filters reference the
1478+
// AggregateExec's output schema where grouping columns in the output schema. The
1479+
// grouping expressions reference input columns which may not match the output schema.
1480+
//
1481+
// It is safe to assume that the output_schema contains group by columns in the same order
1482+
// as the group by expression. See [`create_schema`] and [`AggregateExec`].
14791483
let output_schema = self.schema();
14801484
let grouping_columns: HashSet<_> = (0..self.group_by.expr().len())
14811485
.map(|i| Column::new(output_schema.field(i).name(), i))
@@ -1599,6 +1603,8 @@ impl ExecutionPlan for AggregateExec {
15991603
}
16001604
}
16011605

1606+
/// Creates the output schema for an [`AggregateExec`] containing the group by columns followed
1607+
/// by the aggregate columns.
16021608
fn create_schema(
16031609
input_schema: &Schema,
16041610
group_by: &PhysicalGroupBy,

datafusion/sqllogictest/test_files/push_down_filter_regression.slt

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,87 @@ physical_plan
185185
04)------AggregateExec: mode=Partial, gby=[(NULL as id), (id@0 as id)], aggr=[max(agg_dyn_test.id)]
186186
05)--------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 < 10, pruning_predicate=id_null_count@1 != row_count@2 AND id_min@0 < 10, required_guarantees=[]
187187

188+
statement ok
189+
set datafusion.execution.target_partitions = 1;
190+
191+
# Regression test for https://github.com/apache/datafusion/issues/21065
192+
# Ensure filter pushdown through AggregateExec still works when a ProjectionExec
193+
# reorders aggregate input columns.
194+
statement ok
195+
create external table agg_reordered_pushdown stored as parquet location '../../parquet-testing/data/alltypes_plain.parquet';
196+
197+
# Filter on grouping column b should push below the aggregate even though the
198+
# aggregate input is reordered to (c, a, b).
199+
query TT
200+
explain
201+
select a, b, cnt
202+
from (
203+
select a, b, count(c) as cnt
204+
from (
205+
select id as c, cast(string_col as varchar) as a, bool_col as b
206+
from agg_reordered_pushdown
207+
) t
208+
group by a, b
209+
) q
210+
where b = true;
211+
----
212+
physical_plan
213+
01)ProjectionExec: expr=[a@0 as a, b@1 as b, count(t.c)@2 as cnt]
214+
02)--AggregateExec: mode=Single, gby=[a@1 as a, b@2 as b], aggr=[count(t.c)]
215+
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id@0 as c, CAST(string_col@9 AS Utf8View) as a, bool_col@1 as b], file_type=parquet, predicate=bool_col@1, pruning_predicate=bool_col_min@0 OR bool_col_max@1, required_guarantees=[]
216+
217+
query TBI rowsort
218+
select a, b, cnt
219+
from (
220+
select a, b, count(c) as cnt
221+
from (
222+
select id as c, cast(string_col as varchar) as a, bool_col as b
223+
from agg_reordered_pushdown
224+
) t
225+
group by a, b
226+
) q
227+
where b = true;
228+
----
229+
0 true 4
230+
231+
# Filter on aggregate output must remain above the aggregate.
232+
query TT
233+
explain
234+
select a, b, cnt
235+
from (
236+
select a, b, count(c) as cnt
237+
from (
238+
select id as c, cast(string_col as varchar) as a, bool_col as b
239+
from agg_reordered_pushdown
240+
) t
241+
group by a, b
242+
) q
243+
where cnt > 1;
244+
----
245+
physical_plan
246+
01)ProjectionExec: expr=[a@0 as a, b@1 as b, count(t.c)@2 as cnt]
247+
02)--FilterExec: count(t.c)@2 > 1
248+
03)----AggregateExec: mode=Single, gby=[a@1 as a, b@2 as b], aggr=[count(t.c)]
249+
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id@0 as c, CAST(string_col@9 AS Utf8View) as a, bool_col@1 as b], file_type=parquet
250+
251+
query TBI rowsort
252+
select a, b, cnt
253+
from (
254+
select a, b, count(c) as cnt
255+
from (
256+
select id as c, cast(string_col as varchar) as a, bool_col as b
257+
from agg_reordered_pushdown
258+
) t
259+
group by a, b
260+
) q
261+
where cnt > 1;
262+
----
263+
0 true 4
264+
1 false 4
265+
266+
statement ok
267+
drop table agg_reordered_pushdown;
268+
188269
statement ok
189270
drop table agg_dyn_test;
190271

0 commit comments

Comments
 (0)