Skip to content

Commit 722ca96

Browse files
xudong963claude
andcommitted
Fix/aggregate output ordering streaming (#33)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 6796053 commit 722ca96

4 files changed

Lines changed: 80 additions & 35 deletions

File tree

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -928,6 +928,47 @@ fn add_hash_on_top(
928928
///
929929
/// * `input`: Current node.
930930
///
931+
/// Checks whether preserving the child's ordering enables the parent to
932+
/// run in streaming mode. Compares the parent's pipeline behavior with
933+
/// the ordered child vs. an unordered (coalesced) child. If removing the
934+
/// ordering would cause the parent to switch from streaming to blocking,
935+
/// keeping the order-preserving variant is beneficial.
936+
///
937+
/// Only applicable to single-child operators; returns false for multi-child
938+
/// operators (e.g. joins) where child substitution semantics are ambiguous.
939+
fn preserving_order_enables_streaming(
940+
parent: &Arc<dyn ExecutionPlan>,
941+
ordered_child: &Arc<dyn ExecutionPlan>,
942+
) -> bool {
943+
// Only applicable to single-child operators that maintain input order
944+
// (e.g. AggregateExec in PartiallySorted mode). Operators that don't
945+
// maintain input order (e.g. SortExec) handle ordering themselves —
946+
// preserving SPM for them is unnecessary.
947+
if parent.children().len() != 1 {
948+
return false;
949+
}
950+
if !parent.maintains_input_order()[0] {
951+
return false;
952+
}
953+
// Build parent with the ordered child
954+
let with_ordered =
955+
match Arc::clone(parent).with_new_children(vec![Arc::clone(ordered_child)]) {
956+
Ok(p) => p,
957+
Err(_) => return false,
958+
};
959+
if with_ordered.pipeline_behavior() == EmissionType::Final {
960+
// Parent is blocking even with ordering — no benefit
961+
return false;
962+
}
963+
// Build parent with an unordered child (simulating CoalescePartitionsExec)
964+
let unordered_child: Arc<dyn ExecutionPlan> =
965+
Arc::new(CoalescePartitionsExec::new(Arc::clone(ordered_child)));
966+
match Arc::clone(parent).with_new_children(vec![unordered_child]) {
967+
Ok(without_ordered) => without_ordered.pipeline_behavior() == EmissionType::Final,
968+
Err(_) => false,
969+
}
970+
}
971+
931972
/// # Returns
932973
///
933974
/// Updated node with an execution plan, where the desired single distribution
@@ -1351,7 +1392,10 @@ pub fn ensure_distribution(
13511392
.equivalence_properties()
13521393
.ordering_satisfy_requirement(sort_req.clone())?;
13531394

1395+
let streaming_benefit = child.data
1396+
&& preserving_order_enables_streaming(&plan, &child.plan);
13541397
if (!ordering_satisfied || !order_preserving_variants_desirable)
1398+
&& !streaming_benefit
13551399
&& child.data
13561400
{
13571401
child = replace_order_preserving_variants(child)?;
@@ -1376,8 +1420,15 @@ pub fn ensure_distribution(
13761420
match requirement {
13771421
// Operator requires specific distribution.
13781422
Distribution::SinglePartition | Distribution::HashPartitioned(_) => {
1379-
// Since there is no ordering requirement, preserving ordering is pointless
1380-
child = replace_order_preserving_variants(child)?;
1423+
let streaming_benefit = child.data
1424+
&& preserving_order_enables_streaming(
1425+
&plan,
1426+
&child.plan,
1427+
);
1428+
if !streaming_benefit {
1429+
child =
1430+
replace_order_preserving_variants(child)?;
1431+
}
13811432
}
13821433
Distribution::UnspecifiedDistribution => {
13831434
// Since ordering is lost, trying to preserve ordering is pointless

datafusion/sqllogictest/test_files/agg_func_substitute.slt

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,10 @@ logical_plan
4545
physical_plan
4646
01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
4747
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
48-
03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
49-
04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
50-
05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
51-
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
52-
07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
48+
03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
49+
04)------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
50+
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
51+
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
5352

5453

5554
query TT
@@ -64,11 +63,10 @@ logical_plan
6463
physical_plan
6564
01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
6665
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
67-
03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
68-
04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
69-
05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
70-
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
71-
07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
66+
03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
67+
04)------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
68+
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
69+
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
7270

7371
query TT
7472
EXPLAIN SELECT a, ARRAY_AGG(c ORDER BY c)[1 + 100] as result
@@ -82,11 +80,10 @@ logical_plan
8280
physical_plan
8381
01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
8482
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
85-
03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
86-
04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
87-
05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
88-
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
89-
07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
83+
03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
84+
04)------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
85+
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
86+
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
9087

9188
query II
9289
SELECT a, ARRAY_AGG(c ORDER BY c)[1] as result

datafusion/sqllogictest/test_files/group_by.slt

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3966,11 +3966,10 @@ logical_plan
39663966
02)--TableScan: multiple_ordered_table_with_pk projection=[b, c, d]
39673967
physical_plan
39683968
01)AggregateExec: mode=FinalPartitioned, gby=[c@0 as c, b@1 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
3969-
02)--SortExec: expr=[c@0 ASC NULLS LAST], preserve_partitioning=[true]
3970-
03)----RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8
3971-
04)------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
3972-
05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true
3973-
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])], file_type=csv, has_header=true
3969+
02)--RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@0 ASC NULLS LAST
3970+
03)----AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
3971+
04)------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true
3972+
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])], file_type=csv, has_header=true
39743973

39753974
# drop table multiple_ordered_table_with_pk
39763975
statement ok
@@ -4006,10 +4005,9 @@ logical_plan
40064005
02)--TableScan: multiple_ordered_table_with_pk projection=[b, c, d]
40074006
physical_plan
40084007
01)AggregateExec: mode=FinalPartitioned, gby=[c@0 as c, b@1 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
4009-
02)--SortExec: expr=[c@0 ASC NULLS LAST], preserve_partitioning=[true]
4010-
03)----RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8
4011-
04)------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
4012-
05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true
4008+
02)--RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@0 ASC NULLS LAST
4009+
03)----AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
4010+
04)------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true
40134011
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])], file_type=csv, has_header=true
40144012

40154013
statement ok

datafusion/sqllogictest/test_files/unnest.slt

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -987,15 +987,14 @@ logical_plan
987987
physical_plan
988988
01)ProjectionExec: expr=[array_agg(unnested.ar)@1 as array_agg(unnested.ar)]
989989
02)--AggregateExec: mode=FinalPartitioned, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
990-
03)----SortExec: expr=[generated_id@0 ASC NULLS LAST], preserve_partitioning=[true]
991-
04)------RepartitionExec: partitioning=Hash([generated_id@0], 4), input_partitions=4
992-
05)--------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
993-
06)----------ProjectionExec: expr=[generated_id@0 as generated_id, __unnest_placeholder(make_array(range().value),depth=1)@1 as ar]
994-
07)------------UnnestExec
995-
08)--------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as __unnest_placeholder(make_array(range().value))]
996-
09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
997-
10)------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
998-
11)--------------------LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192]
990+
03)----RepartitionExec: partitioning=Hash([generated_id@0], 4), input_partitions=4, preserve_order=true, sort_exprs=generated_id@0 ASC NULLS LAST
991+
04)------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
992+
05)--------ProjectionExec: expr=[generated_id@0 as generated_id, __unnest_placeholder(make_array(range().value),depth=1)@1 as ar]
993+
06)----------UnnestExec
994+
07)------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as __unnest_placeholder(make_array(range().value))]
995+
08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
996+
09)----------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
997+
10)------------------LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192]
999998

1000999
# Unnest array where data is already ordered by column2 (100, 200, 300, 400)
10011000
statement ok

0 commit comments

Comments
 (0)