Skip to content

Commit e0d3a94

Browse files
xudong963claude
andcommitted
Fix/aggregate output ordering streaming (#33)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent a3dc8fa commit e0d3a94

6 files changed

Lines changed: 100 additions & 62 deletions

File tree

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -928,6 +928,43 @@ fn add_hash_on_top(
928928
///
929929
/// * `input`: Current node.
930930
///
931+
/// Checks whether preserving the child's ordering enables the parent to
932+
/// run in streaming mode. Compares the parent's pipeline behavior with
933+
/// the ordered child vs. an unordered (coalesced) child. If removing the
934+
/// ordering would cause the parent to switch from streaming to blocking,
935+
/// keeping the order-preserving variant is beneficial.
936+
///
937+
/// Only applicable to single-child operators; returns `Ok(false)` for
938+
/// multi-child operators (e.g. joins) where child substitution semantics are
939+
/// ambiguous.
940+
fn preserving_order_enables_streaming(
941+
parent: &Arc<dyn ExecutionPlan>,
942+
ordered_child: &Arc<dyn ExecutionPlan>,
943+
) -> Result<bool> {
944+
// Only applicable to single-child operators that maintain input order
945+
// (e.g. AggregateExec in PartiallySorted mode). Operators that don't
946+
// maintain input order (e.g. SortExec) handle ordering themselves —
947+
// preserving SPM for them is unnecessary.
948+
if parent.children().len() != 1 {
949+
return Ok(false);
950+
}
951+
if !parent.maintains_input_order()[0] {
952+
return Ok(false);
953+
}
954+
// Build parent with the ordered child
955+
let with_ordered =
956+
Arc::clone(parent).with_new_children(vec![Arc::clone(ordered_child)])?;
957+
if with_ordered.pipeline_behavior() == EmissionType::Final {
958+
// Parent is blocking even with ordering — no benefit
959+
return Ok(false);
960+
}
961+
// Build parent with an unordered child via CoalescePartitionsExec.
962+
let unordered_child: Arc<dyn ExecutionPlan> =
963+
Arc::new(CoalescePartitionsExec::new(Arc::clone(ordered_child)));
964+
let without_ordered = Arc::clone(parent).with_new_children(vec![unordered_child])?;
965+
Ok(without_ordered.pipeline_behavior() == EmissionType::Final)
966+
}
967+
931968
/// # Returns
932969
///
933970
/// Updated node with an execution plan, where the desired single distribution
@@ -1351,7 +1388,13 @@ pub fn ensure_distribution(
13511388
.equivalence_properties()
13521389
.ordering_satisfy_requirement(sort_req.clone())?;
13531390

1391+
let streaming_benefit = if child.data {
1392+
preserving_order_enables_streaming(&plan, &child.plan)?
1393+
} else {
1394+
false
1395+
};
13541396
if (!ordering_satisfied || !order_preserving_variants_desirable)
1397+
&& !streaming_benefit
13551398
&& child.data
13561399
{
13571400
child = replace_order_preserving_variants(child)?;
@@ -1372,6 +1415,11 @@ pub fn ensure_distribution(
13721415
// Stop tracking distribution changing operators
13731416
child.data = false;
13741417
} else {
1418+
let streaming_benefit = if child.data {
1419+
preserving_order_enables_streaming(&plan, &child.plan)?
1420+
} else {
1421+
false
1422+
};
13751423
// no ordering requirement
13761424
match requirement {
13771425
// Operator requires specific distribution.
@@ -1380,7 +1428,7 @@ pub fn ensure_distribution(
13801428
// ordering is pointless. However, if it does maintain
13811429
// input order, we keep order-preserving variants so
13821430
// ordering can flow through to ancestors that need it.
1383-
if !maintains {
1431+
if !maintains && !streaming_benefit {
13841432
child = replace_order_preserving_variants(child)?;
13851433
}
13861434
}

datafusion/sqllogictest/test_files/agg_func_substitute.slt

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,10 @@ logical_plan
4545
physical_plan
4646
01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
4747
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
48-
03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
49-
04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
50-
05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
51-
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
52-
07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
48+
03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
49+
04)------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
50+
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
51+
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
5352

5453

5554
query TT
@@ -64,11 +63,10 @@ logical_plan
6463
physical_plan
6564
01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
6665
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
67-
03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
68-
04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
69-
05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
70-
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
71-
07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
66+
03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
67+
04)------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
68+
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
69+
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
7270

7371
query TT
7472
EXPLAIN SELECT a, ARRAY_AGG(c ORDER BY c)[1 + 100] as result
@@ -82,11 +80,10 @@ logical_plan
8280
physical_plan
8381
01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
8482
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
85-
03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
86-
04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
87-
05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
88-
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
89-
07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
83+
03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
84+
04)------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
85+
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
86+
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
9087

9188
query II
9289
SELECT a, ARRAY_AGG(c ORDER BY c)[1] as result

datafusion/sqllogictest/test_files/group_by.slt

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3971,11 +3971,10 @@ logical_plan
39713971
02)--TableScan: multiple_ordered_table_with_pk projection=[b, c, d]
39723972
physical_plan
39733973
01)AggregateExec: mode=FinalPartitioned, gby=[c@0 as c, b@1 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
3974-
02)--SortExec: expr=[c@0 ASC NULLS LAST], preserve_partitioning=[true]
3975-
03)----RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8
3976-
04)------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
3977-
05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true
3978-
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])], file_type=csv, has_header=true
3974+
02)--RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@0 ASC NULLS LAST
3975+
03)----AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
3976+
04)------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true
3977+
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])], file_type=csv, has_header=true
39793978

39803979
# drop table multiple_ordered_table_with_pk
39813980
statement ok
@@ -4011,11 +4010,10 @@ logical_plan
40114010
02)--TableScan: multiple_ordered_table_with_pk projection=[b, c, d]
40124011
physical_plan
40134012
01)AggregateExec: mode=FinalPartitioned, gby=[c@0 as c, b@1 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
4014-
02)--SortExec: expr=[c@0 ASC NULLS LAST], preserve_partitioning=[true]
4015-
03)----RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8
4016-
04)------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
4017-
05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true
4018-
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])], file_type=csv, has_header=true
4013+
02)--RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@0 ASC NULLS LAST
4014+
03)----AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
4015+
04)------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true
4016+
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])], file_type=csv, has_header=true
40194017

40204018
statement ok
40214019
set datafusion.execution.target_partitions = 1;

datafusion/sqllogictest/test_files/preserve_file_partitioning.slt

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -288,10 +288,9 @@ physical_plan
288288
01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
289289
02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*), avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)]
290290
03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
291-
04)------SortExec: expr=[f_dkey@0 ASC NULLS LAST], preserve_partitioning=[true]
292-
05)--------RepartitionExec: partitioning=Hash([f_dkey@0], 3), input_partitions=3
293-
06)----------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
294-
07)------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet
291+
04)------RepartitionExec: partitioning=Hash([f_dkey@0], 3), input_partitions=3, preserve_order=true, sort_exprs=f_dkey@0 ASC NULLS LAST
292+
05)--------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
293+
06)----------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet
295294

296295
# Verify results without optimization
297296
query TIR
@@ -361,16 +360,15 @@ physical_plan
361360
01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
362361
02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, max(d.env)@1 as max(d.env), max(d.service)@2 as max(d.service), count(Int64(1))@3 as count(*), sum(f.value)@4 as sum(f.value)]
363362
03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey], aggr=[max(d.env), max(d.service), count(Int64(1)), sum(f.value)], ordering_mode=Sorted
364-
04)------SortExec: expr=[f_dkey@0 ASC NULLS LAST], preserve_partitioning=[true]
365-
05)--------RepartitionExec: partitioning=Hash([f_dkey@0], 3), input_partitions=3
366-
06)----------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey], aggr=[max(d.env), max(d.service), count(Int64(1)), sum(f.value)], ordering_mode=Sorted
367-
07)------------ProjectionExec: expr=[value@2 as value, f_dkey@3 as f_dkey, env@0 as env, service@1 as service]
368-
08)--------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_dkey@0, f_dkey@1)], projection=[env@1, service@2, value@3, f_dkey@4]
369-
09)----------------CoalescePartitionsExec
370-
10)------------------FilterExec: service@2 = log
371-
11)--------------------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
372-
12)----------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension/data.parquet]]}, projection=[d_dkey, env, service], file_type=parquet, predicate=service@2 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)]
373-
13)----------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ]
363+
04)------RepartitionExec: partitioning=Hash([f_dkey@0], 3), input_partitions=3, preserve_order=true, sort_exprs=f_dkey@0 ASC NULLS LAST
364+
05)--------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey], aggr=[max(d.env), max(d.service), count(Int64(1)), sum(f.value)], ordering_mode=Sorted
365+
06)----------ProjectionExec: expr=[value@2 as value, f_dkey@3 as f_dkey, env@0 as env, service@1 as service]
366+
07)------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_dkey@0, f_dkey@1)], projection=[env@1, service@2, value@3, f_dkey@4]
367+
08)--------------CoalescePartitionsExec
368+
09)----------------FilterExec: service@2 = log
369+
10)------------------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
370+
11)--------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension/data.parquet]]}, projection=[d_dkey, env, service], file_type=parquet, predicate=service@2 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)]
371+
12)--------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ]
374372

375373
# Verify results without optimization
376374
query TTTIR rowsort

0 commit comments

Comments
 (0)