Skip to content

Commit 5fbb6eb

Browse files
adriangbclaude
andcommitted
fix: preserve declaration order of orderings in EXPLAIN display
Three changes to ensure projected orderings match the original declaration order: 1. In OrderingEquivalenceClass::remove_redundant_entries(), use Vec::remove() instead of Vec::swap_remove() to preserve ordering during dedup. 2. In projected_orderings(), swap chain order so dependency-map-based orderings (which preserve declaration order via IndexMap) come before newly derived orderings from projection mapping. 3. In substitute_oeq_class(), push the original sort expression first before substituted (derived) expressions so the cartesian product preserves declaration order. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent da8a629 commit 5fbb6eb

6 files changed

Lines changed: 20 additions & 16 deletions

File tree

datafusion/physical-expr/src/equivalence/ordering.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,14 +107,14 @@ impl OrderingEquivalenceClass {
107107
if let Some(remove) = self.resolve_overlap(idx, ordering_idx) {
108108
work = true;
109109
if remove {
110-
self.orderings.swap_remove(idx);
110+
self.orderings.remove(idx);
111111
continue 'outer;
112112
}
113113
}
114114
if let Some(remove) = self.resolve_overlap(ordering_idx, idx) {
115115
work = true;
116116
if remove {
117-
self.orderings.swap_remove(ordering_idx);
117+
self.orderings.remove(ordering_idx);
118118
continue;
119119
}
120120
}

datafusion/physical-expr/src/equivalence/properties/mod.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -871,7 +871,9 @@ impl EquivalenceProperties {
871871
.map(|(source, _target)| source)
872872
.filter(|source| expr_refers(source, &sort_expr.expr))
873873
.cloned();
874-
let mut result = vec![];
874+
// Start with the original expression so that
875+
// declared orderings appear before derived ones:
876+
let mut result = vec![sort_expr.clone()];
875877
// The sort expression comes from this schema, so the
876878
// following call to `unwrap` is safe.
877879
let expr_type = sort_expr.expr.data_type(schema).unwrap();
@@ -883,7 +885,6 @@ impl EquivalenceProperties {
883885
result.push(substituted);
884886
}
885887
}
886-
result.push(sort_expr);
887888
result
888889
})
889890
// Generate all valid orderings given substituted expressions:
@@ -1122,8 +1123,11 @@ impl EquivalenceProperties {
11221123
prefixes
11231124
});
11241125

1125-
// Simplify each ordering by removing redundant sections:
1126-
orderings.chain(projected_orderings).collect()
1126+
// Simplify each ordering by removing redundant sections.
1127+
// Place projected_orderings first so that orderings reconstructed
1128+
// from the dependency map (which preserves declaration order via
1129+
// IndexMap) appear before newly derived orderings from Pass 1:
1130+
projected_orderings.chain(orderings).collect()
11271131
}
11281132

11291133
/// Projects constraints according to the given projection mapping.

datafusion/sqllogictest/test_files/group_by.slt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2279,7 +2279,7 @@ logical_plan
22792279
02)--TableScan: multiple_ordered_table projection=[a0, a, b, c, d]
22802280
physical_plan
22812281
01)SortExec: expr=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, d@4 ASC NULLS LAST], preserve_partitioning=[false]
2282-
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true
2282+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]], file_type=csv, has_header=true
22832283

22842284
query TT
22852285
EXPLAIN SELECT a, b, ARRAY_AGG(d ORDER BY d)
@@ -5004,7 +5004,7 @@ logical_plan
50045004
03)----TableScan: multiple_ordered_table projection=[a, b, c]
50055005
physical_plan
50065006
01)AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[array_agg(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST]], ordering_mode=Sorted
5007-
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_orderings=[[c@2 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]], file_type=csv, has_header=true
5007+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], file_type=csv, has_header=true
50085008

50095009
query II?
50105010
SELECT a, b, ARRAY_AGG(c ORDER BY c DESC)

datafusion/sqllogictest/test_files/monotonic_projection_test.slt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ logical_plan
9797
01)Sort: a_big ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST
9898
02)--Projection: multiple_ordered_table.a, multiple_ordered_table.a AS a_big, multiple_ordered_table.b
9999
03)----TableScan: multiple_ordered_table projection=[a, b]
100-
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], [a_big@1 ASC NULLS LAST]], file_type=csv, has_header=true
100+
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a_big@1 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true
101101

102102
query TT
103103
EXPLAIN
@@ -109,7 +109,7 @@ logical_plan
109109
01)Sort: multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST
110110
02)--Projection: multiple_ordered_table.a, multiple_ordered_table.a AS a_big, multiple_ordered_table.b
111111
03)----TableScan: multiple_ordered_table projection=[a, b]
112-
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], [a_big@1 ASC NULLS LAST]], file_type=csv, has_header=true
112+
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a_big@1 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true
113113

114114

115115
# test for cast Utf8

datafusion/sqllogictest/test_files/topk.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ explain select number, letter, age, number as column4, letter as column5 from pa
371371
----
372372
physical_plan
373373
01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 ASC NULLS LAST, age@2 DESC], preserve_partitioning=[false], sort_prefix=[number@0 DESC, letter@1 ASC NULLS LAST]
374-
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age, number@0 as column4, letter@1 as column5], output_orderings=[[number@0 DESC, column5@4 ASC NULLS LAST], [column4@3 DESC], [number@0 DESC, letter@1 ASC NULLS LAST]], file_type=parquet, predicate=DynamicFilter [ empty ]
374+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age, number@0 as column4, letter@1 as column5], output_orderings=[[column4@3 DESC], [number@0 DESC, letter@1 ASC NULLS LAST], [number@0 DESC, column5@4 ASC NULLS LAST]], file_type=parquet, predicate=DynamicFilter [ empty ]
375375

376376
# Verify that the sort prefix is correctly computed over normalized, order-maintaining projections (number + 1, number, number + 1, age)
377377
query TT

datafusion/sqllogictest/test_files/window.slt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3145,7 +3145,7 @@ physical_plan
31453145
11)--------------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW": nullable Int64 }, frame: ROWS BETWEEN 5 PRECEDING AND CURRENT ROW], mode=[Sorted]
31463146
12)----------------------SortExec: expr=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, d@4 ASC NULLS LAST, c@3 ASC NULLS LAST], preserve_partitioning=[false]
31473147
13)------------------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING], mode=[Sorted]
3148-
14)--------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(c@3 AS Int64) as __common_expr_1, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, __common_expr_1@0 ASC NULLS LAST]], file_type=csv, has_header=true
3148+
14)--------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(c@3 AS Int64) as __common_expr_1, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, __common_expr_1@0 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]], file_type=csv, has_header=true
31493149

31503150
query IIIIIIIIIIIIIII
31513151
SELECT a, b, c,
@@ -3508,7 +3508,7 @@ logical_plan
35083508
physical_plan
35093509
01)BoundedWindowAggExec: wdw=[sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
35103510
02)--FilterExec: b@2 = 0
3511-
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true
3511+
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]], file_type=csv, has_header=true
35123512

35133513
# Since column b is constant after filter b=0,
35143514
# window requirement b ASC, d ASC can be satisfied
@@ -3526,7 +3526,7 @@ physical_plan
35263526
01)BoundedWindowAggExec: wdw=[sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST, multiple_ordered_table.d ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST, multiple_ordered_table.d ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
35273527
02)--SortExec: expr=[d@4 ASC NULLS LAST], preserve_partitioning=[false]
35283528
03)----FilterExec: b@2 = 0
3529-
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true
3529+
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]], file_type=csv, has_header=true
35303530

35313531

35323532
# Create an unbounded source where there is multiple orderings.
@@ -3562,7 +3562,7 @@ physical_plan
35623562
02)--BoundedWindowAggExec: wdw=[min(multiple_ordered_table.d) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "min(multiple_ordered_table.d) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int32 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
35633563
03)----ProjectionExec: expr=[c@2 as c, d@3 as d, max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]
35643564
04)------BoundedWindowAggExec: wdw=[max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int32 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
3565-
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[c@2 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]], file_type=csv, has_header=true
3565+
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], file_type=csv, has_header=true
35663566

35673567
query TT
35683568
EXPLAIN SELECT MAX(c) OVER(PARTITION BY d ORDER BY c ASC) as max_c
@@ -3606,7 +3606,7 @@ logical_plan
36063606
physical_plan
36073607
01)ProjectionExec: expr=[sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]
36083608
02)--BoundedWindowAggExec: wdw=[sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
3609-
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[c@2 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]], file_type=csv, has_header=true
3609+
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], file_type=csv, has_header=true
36103610

36113611
query I
36123612
SELECT SUM(d) OVER(PARTITION BY c, a ORDER BY b ASC)

0 commit comments

Comments
 (0)