Skip to content

Commit 6da7168

Browse files
Dandandanclaude
andcommitted
Remove rename-only ProjectionExec by absorbing alias into AggregateExec
When a ProjectionExec only renames columns (all expressions are sequential Column references), absorb the rename into the child AggregateExec's output schema, eliminating the ProjectionExec entirely. This avoids per-batch overhead of expression evaluation, RecordBatch construction, and the stream wrapper layer for these trivial rename projections that appear in most GROUP BY queries. Only applies to final-mode aggregates (Final/FinalPartitioned/Single/ SinglePartitioned) to avoid breaking distribution requirements for intermediate aggregates. Changes: - Add `AggregateExec::with_output_schema()` that clones the exec with renamed output fields, updating both schema and output partitioning - Add `try_absorb_rename_projection()` in projection pushdown optimizer - Add `remap_partitioning()` helper to update Column references in hash partitioning when field names change Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent c17c87c commit 6da7168

25 files changed

Lines changed: 713 additions & 711 deletions

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use crate::{
3535
SendableRecordBatchStream, Statistics, check_if_same_properties,
3636
};
3737
use datafusion_common::config::ConfigOptions;
38+
use datafusion_physical_expr::Partitioning;
3839
use datafusion_physical_expr::utils::collect_columns;
3940
use parking_lot::Mutex;
4041
use std::collections::HashSet;
@@ -713,6 +714,37 @@ impl AggregateExec {
713714
&self.cache
714715
}
715716

717+
/// Clone this exec with a new output schema that only renames fields.
718+
pub fn with_output_schema(&self, schema: SchemaRef) -> Result<Self> {
719+
let eq_properties = self
720+
.cache
721+
.eq_properties
722+
.clone()
723+
.with_new_schema(Arc::clone(&schema))?;
724+
let output_partitioning = remap_partitioning(&self.cache.partitioning, &schema);
725+
let cache = PlanProperties::new(
726+
eq_properties,
727+
output_partitioning,
728+
self.cache.emission_type,
729+
self.cache.boundedness,
730+
);
731+
Ok(Self {
732+
schema,
733+
cache: Arc::new(cache),
734+
required_input_ordering: self.required_input_ordering.clone(),
735+
metrics: ExecutionPlanMetricsSet::new(),
736+
input_order_mode: self.input_order_mode.clone(),
737+
mode: self.mode,
738+
group_by: Arc::clone(&self.group_by),
739+
aggr_expr: Arc::clone(&self.aggr_expr),
740+
filter_expr: Arc::clone(&self.filter_expr),
741+
limit_options: self.limit_options,
742+
input: Arc::clone(&self.input),
743+
input_schema: Arc::clone(&self.input_schema),
744+
dynamic_filter: self.dynamic_filter.clone(),
745+
})
746+
}
747+
716748
/// Create a new hash aggregate execution plan
717749
pub fn try_new(
718750
mode: AggregateMode,
@@ -1598,6 +1630,35 @@ impl ExecutionPlan for AggregateExec {
15981630
}
15991631
}
16001632

1633+
/// Remap Column references in a Partitioning to use new schema field names.
1634+
fn remap_partitioning(
1635+
partitioning: &Partitioning,
1636+
new_schema: &SchemaRef,
1637+
) -> Partitioning {
1638+
match partitioning {
1639+
Partitioning::Hash(exprs, n) => {
1640+
let new_exprs: Vec<Arc<dyn PhysicalExpr>> = exprs
1641+
.iter()
1642+
.map(|e| {
1643+
if let Some(col) = e.as_any().downcast_ref::<Column>() {
1644+
let idx = col.index();
1645+
if idx < new_schema.fields().len() {
1646+
Arc::new(Column::new(new_schema.field(idx).name(), idx))
1647+
as Arc<dyn PhysicalExpr>
1648+
} else {
1649+
Arc::clone(e)
1650+
}
1651+
} else {
1652+
Arc::clone(e)
1653+
}
1654+
})
1655+
.collect();
1656+
Partitioning::Hash(new_exprs, *n)
1657+
}
1658+
other => other.clone(),
1659+
}
1660+
}
1661+
16011662
/// Creates the output schema for an [`AggregateExec`] containing the group by columns followed
16021663
/// by the aggregate columns.
16031664
fn create_schema(

datafusion/physical-plan/src/projection.rs

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use super::{
2626
DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream,
2727
SendableRecordBatchStream, SortOrderPushdownResult, Statistics,
2828
};
29+
use crate::aggregates::{AggregateExec, AggregateMode};
2930
use crate::column_rewriter::PhysicalColumnRewriter;
3031
use crate::execution_plan::CardinalityEffect;
3132
use crate::filter_pushdown::{
@@ -39,7 +40,7 @@ use std::pin::Pin;
3940
use std::sync::Arc;
4041
use std::task::{Context, Poll};
4142

42-
use arrow::datatypes::SchemaRef;
43+
use arrow::datatypes::{Schema, SchemaRef};
4344
use arrow::record_batch::RecordBatch;
4445
use datafusion_common::config::ConfigOptions;
4546
use datafusion_common::tree_node::{
@@ -733,6 +734,10 @@ pub fn remove_unnecessary_projections(
733734
if is_projection_removable(projection) {
734735
return Ok(Transformed::yes(Arc::clone(projection.input())));
735736
}
737+
// Try to absorb rename-only projections into the child:
738+
if let Some(new_child) = try_absorb_rename_projection(projection)? {
739+
return Ok(Transformed::yes(new_child));
740+
}
736741
// If it does, check if we can push it under its child(ren):
737742
projection
738743
.input()
@@ -757,6 +762,59 @@ fn is_projection_removable(projection: &ProjectionExec) -> bool {
757762
}) && exprs.len() == projection.input().schema().fields().len()
758763
}
759764

765+
/// If a projection only renames columns, try to absorb the rename into
766+
/// the child operator, eliminating the projection entirely.
767+
fn try_absorb_rename_projection(
768+
projection: &ProjectionExec,
769+
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
770+
let exprs = projection.expr();
771+
let input = projection.input();
772+
let input_field_count = input.schema().fields().len();
773+
774+
let is_rename_only = exprs.len() == input_field_count
775+
&& exprs.iter().enumerate().all(|(idx, proj_expr)| {
776+
proj_expr
777+
.expr
778+
.as_any()
779+
.downcast_ref::<Column>()
780+
.map(|col| col.index() == idx)
781+
.unwrap_or(false)
782+
});
783+
784+
if !is_rename_only {
785+
return Ok(None);
786+
}
787+
788+
let old_schema = input.schema();
789+
let new_fields: Vec<Arc<arrow::datatypes::Field>> = exprs
790+
.iter()
791+
.enumerate()
792+
.map(|(idx, proj_expr)| {
793+
let field = old_schema.field(idx);
794+
Arc::new(field.as_ref().clone().with_name(&proj_expr.alias))
795+
})
796+
.collect();
797+
let new_schema = Arc::new(Schema::new_with_metadata(
798+
new_fields,
799+
old_schema.metadata().clone(),
800+
));
801+
802+
if let Some(agg) = input.downcast_ref::<AggregateExec>()
803+
&& matches!(
804+
agg.mode(),
805+
AggregateMode::Final
806+
| AggregateMode::FinalPartitioned
807+
| AggregateMode::Single
808+
| AggregateMode::SinglePartitioned
809+
)
810+
{
811+
let new_agg: AggregateExec = agg.with_output_schema(new_schema)?;
812+
return Ok(Some(Arc::new(new_agg)));
813+
}
814+
815+
Ok(None)
816+
}
817+
760818
/// Given the expression set of a projection, checks if the projection causes
761819
/// any renaming or constructs a non-`Column` physical expression.
762820
pub fn all_alias_free_columns(exprs: &[ProjectionExpr]) -> bool {

datafusion/sqllogictest/test_files/agg_func_substitute.slt

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,11 @@ logical_plan
4343
02)--Aggregate: groupBy=[[multiple_ordered_table.a]], aggr=[[nth_value(multiple_ordered_table.c, Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]]]
4444
03)----TableScan: multiple_ordered_table projection=[a, c]
4545
physical_plan
46-
01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
47-
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
48-
03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
49-
04)------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
50-
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
51-
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
46+
01)AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
47+
02)--RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
48+
03)----AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
49+
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
50+
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
5251

5352

5453
query TT
@@ -61,12 +60,11 @@ logical_plan
6160
02)--Aggregate: groupBy=[[multiple_ordered_table.a]], aggr=[[nth_value(multiple_ordered_table.c, Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]]]
6261
03)----TableScan: multiple_ordered_table projection=[a, c]
6362
physical_plan
64-
01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
65-
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
66-
03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
67-
04)------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
68-
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
69-
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
63+
01)AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
64+
02)--RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
65+
03)----AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
66+
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
67+
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
7068

7169
query TT
7270
EXPLAIN SELECT a, ARRAY_AGG(c ORDER BY c)[1 + 100] as result
@@ -78,12 +76,11 @@ logical_plan
7876
02)--Aggregate: groupBy=[[multiple_ordered_table.a]], aggr=[[nth_value(multiple_ordered_table.c, Int64(101)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] AS nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]]]
7977
03)----TableScan: multiple_ordered_table projection=[a, c]
8078
physical_plan
81-
01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
82-
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
83-
03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
84-
04)------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
85-
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
86-
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
79+
01)AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
80+
02)--RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
81+
03)----AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
82+
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
83+
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
8784

8885
query II
8986
SELECT a, ARRAY_AGG(c ORDER BY c)[1] as result

datafusion/sqllogictest/test_files/aggregate.slt

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1646,14 +1646,13 @@ logical_plan
16461646
03)----Aggregate: groupBy=[[t.c AS alias1]], aggr=[[]]
16471647
04)------TableScan: t projection=[c]
16481648
physical_plan
1649-
01)ProjectionExec: expr=[median(alias1)@0 as median(DISTINCT t.c)]
1650-
02)--AggregateExec: mode=Final, gby=[], aggr=[median(alias1)]
1651-
03)----CoalescePartitionsExec
1652-
04)------AggregateExec: mode=Partial, gby=[], aggr=[median(alias1)]
1653-
05)--------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[]
1654-
06)----------RepartitionExec: partitioning=Hash([alias1@0], 4), input_partitions=1
1655-
07)------------AggregateExec: mode=Partial, gby=[c@0 as alias1], aggr=[]
1656-
08)--------------DataSourceExec: partitions=1, partition_sizes=[1]
1649+
01)AggregateExec: mode=Final, gby=[], aggr=[median(alias1)]
1650+
02)--CoalescePartitionsExec
1651+
03)----AggregateExec: mode=Partial, gby=[], aggr=[median(alias1)]
1652+
04)------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[]
1653+
05)--------RepartitionExec: partitioning=Hash([alias1@0], 4), input_partitions=1
1654+
06)----------AggregateExec: mode=Partial, gby=[c@0 as alias1], aggr=[]
1655+
07)------------DataSourceExec: partitions=1, partition_sizes=[1]
16571656

16581657
statement ok
16591658
drop table t;
@@ -7898,12 +7897,11 @@ logical_plan
78987897
03)----Aggregate: groupBy=[[]], aggr=[[count(aggregate_test_100.c5)]]
78997898
04)------TableScan: aggregate_test_100 projection=[c5]
79007899
physical_plan
7901-
01)ProjectionExec: expr=[count(aggregate_test_100.c5)@0 as count_c5]
7902-
02)--AggregateExec: mode=Final, gby=[], aggr=[count(aggregate_test_100.c5)]
7903-
03)----CoalescePartitionsExec
7904-
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(aggregate_test_100.c5)]
7905-
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
7906-
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c5], file_type=csv, has_header=true
7900+
01)AggregateExec: mode=Final, gby=[], aggr=[count(aggregate_test_100.c5)]
7901+
02)--CoalescePartitionsExec
7902+
03)----AggregateExec: mode=Partial, gby=[], aggr=[count(aggregate_test_100.c5)]
7903+
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
7904+
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c5], file_type=csv, has_header=true
79077905

79087906
statement count 0
79097907
drop table aggregate_test_100;

datafusion/sqllogictest/test_files/aggregate_repartition.slt

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,11 @@ logical_plan
7575
02)--Aggregate: groupBy=[[dim_csv.env]], aggr=[[count(Int64(1))]]
7676
03)----TableScan: dim_csv projection=[env]
7777
physical_plan
78-
01)ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)]
79-
02)--AggregateExec: mode=FinalPartitioned, gby=[env@0 as env], aggr=[count(Int64(1))]
80-
03)----RepartitionExec: partitioning=Hash([env@0], 4), input_partitions=4
81-
04)------AggregateExec: mode=Partial, gby=[env@0 as env], aggr=[count(Int64(1))]
82-
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
83-
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.csv]]}, projection=[env], file_type=csv, has_header=true
78+
01)AggregateExec: mode=FinalPartitioned, gby=[env@0 as env], aggr=[count(Int64(1))]
79+
02)--RepartitionExec: partitioning=Hash([env@0], 4), input_partitions=4
80+
03)----AggregateExec: mode=Partial, gby=[env@0 as env], aggr=[count(Int64(1))]
81+
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
82+
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.csv]]}, projection=[env], file_type=csv, has_header=true
8483

8584
# Test 2: EXPLAIN query for Parquet table with GROUP BY
8685

@@ -92,11 +91,10 @@ logical_plan
9291
02)--Aggregate: groupBy=[[dim_parquet.env]], aggr=[[count(Int64(1))]]
9392
03)----TableScan: dim_parquet projection=[env]
9493
physical_plan
95-
01)ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)]
96-
02)--AggregateExec: mode=FinalPartitioned, gby=[env@0 as env], aggr=[count(Int64(1))]
97-
03)----RepartitionExec: partitioning=Hash([env@0], 4), input_partitions=1
98-
04)------AggregateExec: mode=Partial, gby=[env@0 as env], aggr=[count(Int64(1))]
99-
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.parquet]]}, projection=[env], file_type=parquet
94+
01)AggregateExec: mode=FinalPartitioned, gby=[env@0 as env], aggr=[count(Int64(1))]
95+
02)--RepartitionExec: partitioning=Hash([env@0], 4), input_partitions=1
96+
03)----AggregateExec: mode=Partial, gby=[env@0 as env], aggr=[count(Int64(1))]
97+
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.parquet]]}, projection=[env], file_type=parquet
10098

10199
# Verify the queries actually work and return the same results
102100
query TI rowsort
@@ -125,9 +123,8 @@ logical_plan
125123
02)--Aggregate: groupBy=[[dim_parquet.env]], aggr=[[count(Int64(1))]]
126124
03)----TableScan: dim_parquet projection=[env]
127125
physical_plan
128-
01)ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)]
129-
02)--AggregateExec: mode=Single, gby=[env@0 as env], aggr=[count(Int64(1))]
130-
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.parquet]]}, projection=[env], file_type=parquet
126+
01)AggregateExec: mode=Single, gby=[env@0 as env], aggr=[count(Int64(1))]
127+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.parquet]]}, projection=[env], file_type=parquet
131128

132129
# Config reset
133130

datafusion/sqllogictest/test_files/aggregates_simplify.slt

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -166,15 +166,14 @@ logical_plan
166166
04)------Projection: sum_simplify_t.column1 + Int64(1) AS __common_expr_1
167167
05)--------TableScan: sum_simplify_t projection=[column1]
168168
physical_plan
169-
01)ProjectionExec: expr=[sum(alias1)@0 as sum(DISTINCT sum_simplify_t.column1 + Int64(1)), sum(alias2)@1 as sum(sum_simplify_t.column1 + Int64(1))]
170-
02)--AggregateExec: mode=Final, gby=[], aggr=[sum(alias1), sum(alias2)]
171-
03)----CoalescePartitionsExec
172-
04)------AggregateExec: mode=Partial, gby=[], aggr=[sum(alias1), sum(alias2)]
173-
05)--------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[alias2]
174-
06)----------RepartitionExec: partitioning=Hash([alias1@0], 4), input_partitions=1
175-
07)------------AggregateExec: mode=Partial, gby=[__common_expr_1@0 as alias1], aggr=[alias2]
176-
08)--------------ProjectionExec: expr=[column1@0 + 1 as __common_expr_1]
177-
09)----------------DataSourceExec: partitions=1, partition_sizes=[1]
169+
01)AggregateExec: mode=Final, gby=[], aggr=[sum(alias1), sum(alias2)]
170+
02)--CoalescePartitionsExec
171+
03)----AggregateExec: mode=Partial, gby=[], aggr=[sum(alias1), sum(alias2)]
172+
04)------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[alias2]
173+
05)--------RepartitionExec: partitioning=Hash([alias1@0], 4), input_partitions=1
174+
06)----------AggregateExec: mode=Partial, gby=[__common_expr_1@0 as alias1], aggr=[alias2]
175+
07)------------ProjectionExec: expr=[column1@0 + 1 as __common_expr_1]
176+
08)--------------DataSourceExec: partitions=1, partition_sizes=[1]
178177

179178
# FILTER clauses with different aggregate arguments
180179
query II

0 commit comments

Comments
 (0)