Skip to content

Commit 3f07243

Browse files
xudong963claude
andcommitted
Fix/aggregate output ordering streaming (#33)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 6796053 commit 3f07243

6 files changed

Lines changed: 104 additions & 63 deletions

File tree

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -928,6 +928,47 @@ fn add_hash_on_top(
928928
///
929929
/// * `input`: Current node.
930930
///
931+
/// Checks whether preserving the child's ordering enables the parent to
932+
/// run in streaming mode. Compares the parent's pipeline behavior with
933+
/// the ordered child vs. an unordered (coalesced) child. If removing the
934+
/// ordering would cause the parent to switch from streaming to blocking,
935+
/// keeping the order-preserving variant is beneficial.
936+
///
937+
/// Only applicable to single-child operators; returns false for multi-child
938+
/// operators (e.g. joins) where child substitution semantics are ambiguous.
939+
fn preserving_order_enables_streaming(
940+
parent: &Arc<dyn ExecutionPlan>,
941+
ordered_child: &Arc<dyn ExecutionPlan>,
942+
) -> bool {
943+
// Only applicable to single-child operators that maintain input order
944+
// (e.g. AggregateExec in PartiallySorted mode). Operators that don't
945+
// maintain input order (e.g. SortExec) handle ordering themselves —
946+
// preserving SPM for them is unnecessary.
947+
if parent.children().len() != 1 {
948+
return false;
949+
}
950+
if !parent.maintains_input_order()[0] {
951+
return false;
952+
}
953+
// Build parent with the ordered child
954+
let with_ordered =
955+
match Arc::clone(parent).with_new_children(vec![Arc::clone(ordered_child)]) {
956+
Ok(p) => p,
957+
Err(_) => return false,
958+
};
959+
if with_ordered.pipeline_behavior() == EmissionType::Final {
960+
// Parent is blocking even with ordering — no benefit
961+
return false;
962+
}
963+
// Build parent with an unordered child (simulating CoalescePartitionsExec)
964+
let unordered_child: Arc<dyn ExecutionPlan> =
965+
Arc::new(CoalescePartitionsExec::new(Arc::clone(ordered_child)));
966+
match Arc::clone(parent).with_new_children(vec![unordered_child]) {
967+
Ok(without_ordered) => without_ordered.pipeline_behavior() == EmissionType::Final,
968+
Err(_) => false,
969+
}
970+
}
971+
931972
/// # Returns
932973
///
933974
/// Updated node with an execution plan, where the desired single distribution
@@ -1351,7 +1392,10 @@ pub fn ensure_distribution(
13511392
.equivalence_properties()
13521393
.ordering_satisfy_requirement(sort_req.clone())?;
13531394

1395+
let streaming_benefit = child.data
1396+
&& preserving_order_enables_streaming(&plan, &child.plan);
13541397
if (!ordering_satisfied || !order_preserving_variants_desirable)
1398+
&& !streaming_benefit
13551399
&& child.data
13561400
{
13571401
child = replace_order_preserving_variants(child)?;
@@ -1376,8 +1420,15 @@ pub fn ensure_distribution(
13761420
match requirement {
13771421
// Operator requires specific distribution.
13781422
Distribution::SinglePartition | Distribution::HashPartitioned(_) => {
1379-
// Since there is no ordering requirement, preserving ordering is pointless
1380-
child = replace_order_preserving_variants(child)?;
1423+
let streaming_benefit = child.data
1424+
&& preserving_order_enables_streaming(
1425+
&plan,
1426+
&child.plan,
1427+
);
1428+
if !streaming_benefit {
1429+
child =
1430+
replace_order_preserving_variants(child)?;
1431+
}
13811432
}
13821433
Distribution::UnspecifiedDistribution => {
13831434
// Since ordering is lost, trying to preserve ordering is pointless

datafusion/sqllogictest/test_files/agg_func_substitute.slt

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,10 @@ logical_plan
4545
physical_plan
4646
01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
4747
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
48-
03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
49-
04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
50-
05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
51-
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
52-
07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
48+
03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
49+
04)------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
50+
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
51+
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
5352

5453

5554
query TT
@@ -64,11 +63,10 @@ logical_plan
6463
physical_plan
6564
01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
6665
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
67-
03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
68-
04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
69-
05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
70-
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
71-
07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
66+
03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
67+
04)------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
68+
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
69+
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
7270

7371
query TT
7472
EXPLAIN SELECT a, ARRAY_AGG(c ORDER BY c)[1 + 100] as result
@@ -82,11 +80,10 @@ logical_plan
8280
physical_plan
8381
01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
8482
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
85-
03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
86-
04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
87-
05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
88-
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
89-
07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
83+
03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST
84+
04)------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
85+
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
86+
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true
9087

9188
query II
9289
SELECT a, ARRAY_AGG(c ORDER BY c)[1] as result

datafusion/sqllogictest/test_files/group_by.slt

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3966,11 +3966,10 @@ logical_plan
39663966
02)--TableScan: multiple_ordered_table_with_pk projection=[b, c, d]
39673967
physical_plan
39683968
01)AggregateExec: mode=FinalPartitioned, gby=[c@0 as c, b@1 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
3969-
02)--SortExec: expr=[c@0 ASC NULLS LAST], preserve_partitioning=[true]
3970-
03)----RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8
3971-
04)------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
3972-
05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true
3973-
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])], file_type=csv, has_header=true
3969+
02)--RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@0 ASC NULLS LAST
3970+
03)----AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
3971+
04)------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true
3972+
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])], file_type=csv, has_header=true
39743973

39753974
# drop table multiple_ordered_table_with_pk
39763975
statement ok
@@ -4006,11 +4005,10 @@ logical_plan
40064005
02)--TableScan: multiple_ordered_table_with_pk projection=[b, c, d]
40074006
physical_plan
40084007
01)AggregateExec: mode=FinalPartitioned, gby=[c@0 as c, b@1 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
4009-
02)--SortExec: expr=[c@0 ASC NULLS LAST], preserve_partitioning=[true]
4010-
03)----RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8
4011-
04)------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
4012-
05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true
4013-
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])], file_type=csv, has_header=true
4008+
02)--RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@0 ASC NULLS LAST
4009+
03)----AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
4010+
04)------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true
4011+
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], constraints=[PrimaryKey([3])], file_type=csv, has_header=true
40144012

40154013
statement ok
40164014
set datafusion.execution.target_partitions = 1;

datafusion/sqllogictest/test_files/preserve_file_partitioning.slt

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -288,10 +288,9 @@ physical_plan
288288
01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
289289
02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as count(*), avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)]
290290
03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
291-
04)------SortExec: expr=[f_dkey@0 ASC NULLS LAST], preserve_partitioning=[true]
292-
05)--------RepartitionExec: partitioning=Hash([f_dkey@0], 3), input_partitions=3
293-
06)----------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
294-
07)------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet
291+
04)------RepartitionExec: partitioning=Hash([f_dkey@0], 3), input_partitions=3, preserve_order=true, sort_exprs=f_dkey@0 ASC NULLS LAST
292+
05)--------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey], aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
293+
06)----------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet
295294

296295
# Verify results without optimization
297296
query TIR
@@ -361,16 +360,15 @@ physical_plan
361360
01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
362361
02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, max(d.env)@1 as max(d.env), max(d.service)@2 as max(d.service), count(Int64(1))@3 as count(*), sum(f.value)@4 as sum(f.value)]
363362
03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey], aggr=[max(d.env), max(d.service), count(Int64(1)), sum(f.value)], ordering_mode=Sorted
364-
04)------SortExec: expr=[f_dkey@0 ASC NULLS LAST], preserve_partitioning=[true]
365-
05)--------RepartitionExec: partitioning=Hash([f_dkey@0], 3), input_partitions=3
366-
06)----------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey], aggr=[max(d.env), max(d.service), count(Int64(1)), sum(f.value)], ordering_mode=Sorted
367-
07)------------ProjectionExec: expr=[value@2 as value, f_dkey@3 as f_dkey, env@0 as env, service@1 as service]
368-
08)--------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_dkey@0, f_dkey@1)], projection=[env@1, service@2, value@3, f_dkey@4]
369-
09)----------------CoalescePartitionsExec
370-
10)------------------FilterExec: service@2 = log
371-
11)--------------------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
372-
12)----------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension/data.parquet]]}, projection=[d_dkey, env, service], file_type=parquet, predicate=service@2 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)]
373-
13)----------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ]
363+
04)------RepartitionExec: partitioning=Hash([f_dkey@0], 3), input_partitions=3, preserve_order=true, sort_exprs=f_dkey@0 ASC NULLS LAST
364+
05)--------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey], aggr=[max(d.env), max(d.service), count(Int64(1)), sum(f.value)], ordering_mode=Sorted
365+
06)----------ProjectionExec: expr=[value@2 as value, f_dkey@3 as f_dkey, env@0 as env, service@1 as service]
366+
07)------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_dkey@0, f_dkey@1)], projection=[env@1, service@2, value@3, f_dkey@4]
367+
08)--------------CoalescePartitionsExec
368+
09)----------------FilterExec: service@2 = log
369+
10)------------------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
370+
11)--------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension/data.parquet]]}, projection=[d_dkey, env, service], file_type=parquet, predicate=service@2 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)]
371+
12)--------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ]
374372

375373
# Verify results without optimization
376374
query TTTIR rowsort

0 commit comments

Comments
 (0)