Skip to content

Commit f67ef0b

Browse files
address pr comments
- removes test_pushdown_through_aggregate_with_reordered_input_no_pushdown_on_agg_result as it is basically a duplicate of another test - adds slt tests for filter pushdown through aggregations (asserting correctness + plans)
1 parent f8579f6 commit f67ef0b

3 files changed

Lines changed: 91 additions & 83 deletions

File tree

datafusion/core/tests/physical_optimizer/filter_pushdown.rs

Lines changed: 0 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -3568,85 +3568,6 @@ fn test_pushdown_through_aggregate_with_reordered_input_columns() {
35683568
);
35693569
}
35703570

3571-
#[test]
3572-
fn test_pushdown_through_aggregate_with_reordered_input_no_pushdown_on_agg_result() {
3573-
let scan = TestScanBuilder::new(schema()).with_support(true).build();
3574-
3575-
let reordered_schema = Arc::new(Schema::new(vec![
3576-
Field::new("c", DataType::Float64, false),
3577-
Field::new("a", DataType::Utf8, false),
3578-
Field::new("b", DataType::Utf8, false),
3579-
]));
3580-
let projection = Arc::new(
3581-
ProjectionExec::try_new(
3582-
vec![
3583-
(col("c", &schema()).unwrap(), "c".to_string()),
3584-
(col("a", &schema()).unwrap(), "a".to_string()),
3585-
(col("b", &schema()).unwrap(), "b".to_string()),
3586-
],
3587-
scan,
3588-
)
3589-
.unwrap(),
3590-
);
3591-
3592-
let aggregate_expr = vec![
3593-
AggregateExprBuilder::new(
3594-
count_udaf(),
3595-
vec![col("c", &reordered_schema).unwrap()],
3596-
)
3597-
.schema(reordered_schema.clone())
3598-
.alias("cnt")
3599-
.build()
3600-
.map(Arc::new)
3601-
.unwrap(),
3602-
];
3603-
3604-
let group_by = PhysicalGroupBy::new_single(vec![
3605-
(col("a", &reordered_schema).unwrap(), "a".to_string()),
3606-
(col("b", &reordered_schema).unwrap(), "b".to_string()),
3607-
]);
3608-
3609-
let aggregate = Arc::new(
3610-
AggregateExec::try_new(
3611-
AggregateMode::Final,
3612-
group_by,
3613-
aggregate_expr,
3614-
vec![None],
3615-
projection,
3616-
reordered_schema,
3617-
)
3618-
.unwrap(),
3619-
);
3620-
3621-
// Filter on cnt@2 (aggregate result, not a grouping column)
3622-
let agg_output_schema = aggregate.schema();
3623-
let predicate = Arc::new(BinaryExpr::new(
3624-
Arc::new(Column::new_with_schema("cnt", &agg_output_schema).unwrap()),
3625-
Operator::Gt,
3626-
Arc::new(Literal::new(ScalarValue::Int64(Some(5)))),
3627-
)) as Arc<dyn PhysicalExpr>;
3628-
let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap());
3629-
3630-
// The filter is not pushed down.
3631-
insta::assert_snapshot!(
3632-
OptimizationTest::new(plan, FilterPushdown::new(), true),
3633-
@r"
3634-
OptimizationTest:
3635-
input:
3636-
- FilterExec: cnt@2 > 5
3637-
- AggregateExec: mode=Final, gby=[a@1 as a, b@2 as b], aggr=[cnt]
3638-
- ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
3639-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
3640-
output:
3641-
Ok:
3642-
- FilterExec: cnt@2 > 5
3643-
- AggregateExec: mode=Final, gby=[a@1 as a, b@2 as b], aggr=[cnt]
3644-
- ProjectionExec: expr=[c@2 as c, a@0 as a, b@1 as b]
3645-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
3646-
"
3647-
);
3648-
}
3649-
36503571
#[test]
36513572
fn test_pushdown_through_aggregate_grouping_sets_with_reordered_input() {
36523573
let scan = TestScanBuilder::new(schema()).with_support(true).build();

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -640,7 +640,8 @@ pub struct AggregateExec {
640640
limit_options: Option<LimitOptions>,
641641
/// Input plan, could be a partial aggregate or the input to the aggregate
642642
pub input: Arc<dyn ExecutionPlan>,
643-
/// Schema after the aggregate is applied
643+
/// Schema after the aggregate is applied. Contains the group by columns followed by the
644+
/// aggregate outputs.
644645
schema: SchemaRef,
645646
/// Input schema before any aggregation is applied. For partial aggregate this will be the
646647
/// same as input.schema() but for the final aggregate it will be the same as the input
@@ -1473,9 +1474,12 @@ impl ExecutionPlan for AggregateExec {
14731474
// This optimization is NOT safe for filters on aggregated columns (like filtering on
14741475
// the result of SUM or COUNT), as those require computing all groups first.
14751476

1476-
// Build grouping columns using output indices because parent filters reference the AggregateExec's output schema where grouping
1477-
// columns in the output schema. The grouping expressions reference
1478-
// input columns which may not match the output schema.
1477+
// Build grouping columns using output indices because parent filters reference the
1478+
// AggregateExec's output schema where grouping columns in the output schema. The
1479+
// grouping expressions reference input columns which may not match the output schema.
1480+
//
1481+
// It is safe to assume that the output_schema contains group by columns in the same order
1482+
// as the group by expression. See [`create_schema`] and [`AggregateExec`].
14791483
let output_schema = self.schema();
14801484
let grouping_columns: HashSet<_> = (0..self.group_by.expr().len())
14811485
.map(|i| Column::new(output_schema.field(i).name(), i))
@@ -1599,6 +1603,8 @@ impl ExecutionPlan for AggregateExec {
15991603
}
16001604
}
16011605

1606+
/// Creates the output schema for an [`AggregateExec`] containing the group by columns followed
1607+
/// by the aggregate columns.
16021608
fn create_schema(
16031609
input_schema: &Schema,
16041610
group_by: &PhysicalGroupBy,

datafusion/sqllogictest/test_files/push_down_filter_regression.slt

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,87 @@ reset datafusion.optimizer.enable_dynamic_filter_pushdown;
207207
statement ok
208208
reset datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown;
209209

210+
statement ok
211+
set datafusion.execution.target_partitions = 1;
212+
213+
# Regression test for https://github.com/apache/datafusion/issues/21065
214+
# Ensure filter pushdown through AggregateExec still works when a ProjectionExec
215+
# reorders aggregate input columns.
216+
statement ok
217+
create external table agg_reordered_pushdown stored as parquet location '../../parquet-testing/data/alltypes_plain.parquet';
218+
219+
# Filter on grouping column b should push below the aggregate even though the
220+
# aggregate input is reordered to (c, a, b).
221+
query TT
222+
explain
223+
select a, b, cnt
224+
from (
225+
select a, b, count(c) as cnt
226+
from (
227+
select id as c, cast(string_col as varchar) as a, bool_col as b
228+
from agg_reordered_pushdown
229+
) t
230+
group by a, b
231+
) q
232+
where b = true;
233+
----
234+
physical_plan
235+
01)ProjectionExec: expr=[a@0 as a, b@1 as b, count(t.c)@2 as cnt]
236+
02)--AggregateExec: mode=Single, gby=[a@1 as a, b@2 as b], aggr=[count(t.c)]
237+
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id@0 as c, CAST(string_col@9 AS Utf8View) as a, bool_col@1 as b], file_type=parquet, predicate=bool_col@1, pruning_predicate=bool_col_min@0 OR bool_col_max@1, required_guarantees=[]
238+
239+
query TBI rowsort
240+
select a, b, cnt
241+
from (
242+
select a, b, count(c) as cnt
243+
from (
244+
select id as c, cast(string_col as varchar) as a, bool_col as b
245+
from agg_reordered_pushdown
246+
) t
247+
group by a, b
248+
) q
249+
where b = true;
250+
----
251+
0 true 4
252+
253+
# Filter on aggregate output must remain above the aggregate.
254+
query TT
255+
explain
256+
select a, b, cnt
257+
from (
258+
select a, b, count(c) as cnt
259+
from (
260+
select id as c, cast(string_col as varchar) as a, bool_col as b
261+
from agg_reordered_pushdown
262+
) t
263+
group by a, b
264+
) q
265+
where cnt > 1;
266+
----
267+
physical_plan
268+
01)ProjectionExec: expr=[a@0 as a, b@1 as b, count(t.c)@2 as cnt]
269+
02)--FilterExec: count(t.c)@2 > 1
270+
03)----AggregateExec: mode=Single, gby=[a@1 as a, b@2 as b], aggr=[count(t.c)]
271+
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id@0 as c, CAST(string_col@9 AS Utf8View) as a, bool_col@1 as b], file_type=parquet
272+
273+
query TBI rowsort
274+
select a, b, cnt
275+
from (
276+
select a, b, count(c) as cnt
277+
from (
278+
select id as c, cast(string_col as varchar) as a, bool_col as b
279+
from agg_reordered_pushdown
280+
) t
281+
group by a, b
282+
) q
283+
where cnt > 1;
284+
----
285+
0 true 4
286+
1 false 4
287+
288+
statement ok
289+
drop table agg_reordered_pushdown;
290+
210291
statement ok
211292
drop table agg_dyn_test;
212293

0 commit comments

Comments
 (0)