Skip to content

Commit 14fea41

Browse files
adriangbclaude
andcommitted
fix: preserve declaration order of orderings in EXPLAIN display
Three changes to ensure projected orderings match the original declaration order: 1. In OrderingEquivalenceClass::remove_redundant_entries(), use Vec::remove() instead of Vec::swap_remove() to preserve ordering during dedup. 2. In projected_orderings(), swap chain order so dependency-map-based orderings (which preserve declaration order via IndexMap) come before newly derived orderings from projection mapping. 3. In substitute_oeq_class(), push the original sort expression first before substituted (derived) expressions so the cartesian product preserves declaration order. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 73df07e commit 14fea41

6 files changed

Lines changed: 20 additions & 16 deletions

File tree

datafusion/physical-expr/src/equivalence/ordering.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,14 +107,14 @@ impl OrderingEquivalenceClass {
107107
if let Some(remove) = self.resolve_overlap(idx, ordering_idx) {
108108
work = true;
109109
if remove {
110-
self.orderings.swap_remove(idx);
110+
self.orderings.remove(idx);
111111
continue 'outer;
112112
}
113113
}
114114
if let Some(remove) = self.resolve_overlap(ordering_idx, idx) {
115115
work = true;
116116
if remove {
117-
self.orderings.swap_remove(ordering_idx);
117+
self.orderings.remove(ordering_idx);
118118
continue;
119119
}
120120
}

datafusion/physical-expr/src/equivalence/properties/mod.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -838,7 +838,9 @@ impl EquivalenceProperties {
838838
.map(|(source, _target)| source)
839839
.filter(|source| expr_refers(source, &sort_expr.expr))
840840
.cloned();
841-
let mut result = vec![];
841+
// Start with the original expression so that
842+
// declared orderings appear before derived ones:
843+
let mut result = vec![sort_expr.clone()];
842844
// The sort expression comes from this schema, so the
843845
// following call to `unwrap` is safe.
844846
let expr_type = sort_expr.expr.data_type(schema).unwrap();
@@ -860,7 +862,6 @@ impl EquivalenceProperties {
860862
}
861863
}
862864
}
863-
result.push(sort_expr);
864865
result
865866
})
866867
// Generate all valid orderings given substituted expressions:
@@ -1099,8 +1100,11 @@ impl EquivalenceProperties {
10991100
prefixes
11001101
});
11011102

1102-
// Simplify each ordering by removing redundant sections:
1103-
orderings.chain(projected_orderings).collect()
1103+
// Simplify each ordering by removing redundant sections.
1104+
// Place projected_orderings first so that orderings reconstructed
1105+
// from the dependency map (which preserves declaration order via
1106+
// IndexMap) appear before newly derived orderings from Pass 1:
1107+
projected_orderings.chain(orderings).collect()
11041108
}
11051109

11061110
/// Projects constraints according to the given projection mapping.

datafusion/sqllogictest/test_files/group_by.slt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2274,7 +2274,7 @@ logical_plan
22742274
02)--TableScan: multiple_ordered_table projection=[a0, a, b, c, d]
22752275
physical_plan
22762276
01)SortExec: expr=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, d@4 ASC NULLS LAST], preserve_partitioning=[false]
2277-
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true
2277+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]], file_type=csv, has_header=true
22782278

22792279
query TT
22802280
EXPLAIN SELECT a, b, ARRAY_AGG(d ORDER BY d)
@@ -5001,7 +5001,7 @@ logical_plan
50015001
03)----TableScan: multiple_ordered_table projection=[a, b, c]
50025002
physical_plan
50035003
01)AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[array_agg(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST]], ordering_mode=Sorted
5004-
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_orderings=[[c@2 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]], file_type=csv, has_header=true
5004+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], file_type=csv, has_header=true
50055005

50065006
query II?
50075007
SELECT a, b, ARRAY_AGG(c ORDER BY c DESC)

datafusion/sqllogictest/test_files/monotonic_projection_test.slt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ logical_plan
9797
01)Sort: a_big ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST
9898
02)--Projection: multiple_ordered_table.a, multiple_ordered_table.a AS a_big, multiple_ordered_table.b
9999
03)----TableScan: multiple_ordered_table projection=[a, b]
100-
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], [a_big@1 ASC NULLS LAST]], file_type=csv, has_header=true
100+
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a_big@1 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true
101101

102102
query TT
103103
EXPLAIN
@@ -109,7 +109,7 @@ logical_plan
109109
01)Sort: multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST
110110
02)--Projection: multiple_ordered_table.a, multiple_ordered_table.a AS a_big, multiple_ordered_table.b
111111
03)----TableScan: multiple_ordered_table projection=[a, b]
112-
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], [a_big@1 ASC NULLS LAST]], file_type=csv, has_header=true
112+
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a_big@1 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true
113113

114114

115115
# test for cast Utf8

datafusion/sqllogictest/test_files/topk.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ explain select number, letter, age, number as column4, letter as column5 from pa
371371
----
372372
physical_plan
373373
01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 ASC NULLS LAST, age@2 DESC], preserve_partitioning=[false], sort_prefix=[number@0 DESC, letter@1 ASC NULLS LAST]
374-
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age, number@0 as column4, letter@1 as column5], output_orderings=[[number@0 DESC, column5@4 ASC NULLS LAST], [column4@3 DESC], [number@0 DESC, letter@1 ASC NULLS LAST]], file_type=parquet, predicate=DynamicFilter [ empty ]
374+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age, number@0 as column4, letter@1 as column5], output_orderings=[[column4@3 DESC], [number@0 DESC, letter@1 ASC NULLS LAST], [number@0 DESC, column5@4 ASC NULLS LAST]], file_type=parquet, predicate=DynamicFilter [ empty ]
375375

376376
# Verify that the sort prefix is correctly computed over normalized, order-maintaining projections (number + 1, number, number + 1, age)
377377
query TT

datafusion/sqllogictest/test_files/window.slt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3145,7 +3145,7 @@ physical_plan
31453145
11)--------------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW": nullable Int64 }, frame: ROWS BETWEEN 5 PRECEDING AND CURRENT ROW], mode=[Sorted]
31463146
12)----------------------SortExec: expr=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, d@4 ASC NULLS LAST, c@3 ASC NULLS LAST], preserve_partitioning=[false]
31473147
13)------------------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING], mode=[Sorted]
3148-
14)--------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(c@3 AS Int64) as __common_expr_1, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, __common_expr_1@0 ASC NULLS LAST]], file_type=csv, has_header=true
3148+
14)--------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(c@3 AS Int64) as __common_expr_1, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, __common_expr_1@0 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]], file_type=csv, has_header=true
31493149

31503150
query IIIIIIIIIIIIIII
31513151
SELECT a, b, c,
@@ -3507,7 +3507,7 @@ logical_plan
35073507
physical_plan
35083508
01)BoundedWindowAggExec: wdw=[sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
35093509
02)--FilterExec: b@2 = 0
3510-
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true
3510+
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]], file_type=csv, has_header=true
35113511

35123512
# Since column b is constant after filter b=0,
35133513
# window requirement b ASC, d ASC can be satisfied
@@ -3525,7 +3525,7 @@ physical_plan
35253525
01)BoundedWindowAggExec: wdw=[sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST, multiple_ordered_table.d ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST, multiple_ordered_table.d ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
35263526
02)--SortExec: expr=[d@4 ASC NULLS LAST], preserve_partitioning=[false]
35273527
03)----FilterExec: b@2 = 0
3528-
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true
3528+
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]], file_type=csv, has_header=true
35293529

35303530

35313531
# Create an unbounded source where there is multiple orderings.
@@ -3561,7 +3561,7 @@ physical_plan
35613561
02)--BoundedWindowAggExec: wdw=[min(multiple_ordered_table.d) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "min(multiple_ordered_table.d) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int32 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
35623562
03)----ProjectionExec: expr=[c@2 as c, d@3 as d, max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]
35633563
04)------BoundedWindowAggExec: wdw=[max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int32 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
3564-
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[c@2 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]], file_type=csv, has_header=true
3564+
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], file_type=csv, has_header=true
35653565

35663566
query TT
35673567
EXPLAIN SELECT MAX(c) OVER(PARTITION BY d ORDER BY c ASC) as max_c
@@ -3605,7 +3605,7 @@ logical_plan
36053605
physical_plan
36063606
01)ProjectionExec: expr=[sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]
36073607
02)--BoundedWindowAggExec: wdw=[sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
3608-
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[c@2 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]], file_type=csv, has_header=true
3608+
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], file_type=csv, has_header=true
36093609

36103610
query I
36113611
SELECT SUM(d) OVER(PARTITION BY c, a ORDER BY b ASC)

0 commit comments

Comments
 (0)