Skip to content

Commit 9766470

Browse files
adriangbclaude
andcommitted
fix: prefer removing later duplicates in remove_redundant_entries
Swap the order of resolve_overlap checks so that when two orderings are exact duplicates, the later occurrence is removed instead of the earlier one. This preserves the declaration order of orderings when the same ordering appears multiple times (e.g. from both Pass 1 and Pass 2 of projected_orderings). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 70a99a6 commit 9766470

4 files changed

Lines changed: 14 additions & 11 deletions

File tree

datafusion/physical-expr/src/equivalence/ordering.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,18 +104,21 @@ impl OrderingEquivalenceClass {
104104
'outer: while idx < self.orderings.len() {
105105
let mut ordering_idx = idx + 1;
106106
while ordering_idx < self.orderings.len() {
107-
if let Some(remove) = self.resolve_overlap(idx, ordering_idx) {
107+
// Check the later index first so that for exact
108+
// duplicates we remove the later occurrence and
109+
// preserve the declaration order of orderings:
110+
if let Some(remove) = self.resolve_overlap(ordering_idx, idx) {
108111
work = true;
109112
if remove {
110-
self.orderings.remove(idx);
111-
continue 'outer;
113+
self.orderings.remove(ordering_idx);
114+
continue;
112115
}
113116
}
114-
if let Some(remove) = self.resolve_overlap(ordering_idx, idx) {
117+
if let Some(remove) = self.resolve_overlap(idx, ordering_idx) {
115118
work = true;
116119
if remove {
117-
self.orderings.remove(ordering_idx);
118-
continue;
120+
self.orderings.remove(idx);
121+
continue 'outer;
119122
}
120123
}
121124
ordering_idx += 1;

datafusion/sqllogictest/test_files/monotonic_projection_test.slt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ logical_plan
9797
01)Sort: a_big ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST
9898
02)--Projection: multiple_ordered_table.a, multiple_ordered_table.a AS a_big, multiple_ordered_table.b
9999
03)----TableScan: multiple_ordered_table projection=[a, b]
100-
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a_big@1 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true
100+
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], [a_big@1 ASC NULLS LAST]], file_type=csv, has_header=true
101101

102102
query TT
103103
EXPLAIN
@@ -109,7 +109,7 @@ logical_plan
109109
01)Sort: multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST
110110
02)--Projection: multiple_ordered_table.a, multiple_ordered_table.a AS a_big, multiple_ordered_table.b
111111
03)----TableScan: multiple_ordered_table projection=[a, b]
112-
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a_big@1 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true
112+
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], [a_big@1 ASC NULLS LAST]], file_type=csv, has_header=true
113113

114114

115115
# test for cast Utf8

datafusion/sqllogictest/test_files/topk.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ explain select number, letter, age, number as column4, letter as column5 from pa
371371
----
372372
physical_plan
373373
01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 ASC NULLS LAST, age@2 DESC], preserve_partitioning=[false], sort_prefix=[number@0 DESC, letter@1 ASC NULLS LAST]
374-
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age, number@0 as column4, letter@1 as column5], output_orderings=[[column4@3 DESC], [number@0 DESC, letter@1 ASC NULLS LAST], [number@0 DESC, column5@4 ASC NULLS LAST]], file_type=parquet, predicate=DynamicFilter [ empty ]
374+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age, number@0 as column4, letter@1 as column5], output_orderings=[[number@0 DESC, letter@1 ASC NULLS LAST], [column4@3 DESC], [number@0 DESC, column5@4 ASC NULLS LAST]], file_type=parquet, predicate=DynamicFilter [ empty ]
375375

376376
# Verify that the sort prefix is correctly computed over normalized, order-maintaining projections (number + 1, number, number + 1, age)
377377
query TT

datafusion/sqllogictest/test_files/window.slt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3145,7 +3145,7 @@ physical_plan
31453145
11)--------------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW": nullable Int64 }, frame: ROWS BETWEEN 5 PRECEDING AND CURRENT ROW], mode=[Sorted]
31463146
12)----------------------SortExec: expr=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, d@4 ASC NULLS LAST, c@3 ASC NULLS LAST], preserve_partitioning=[false]
31473147
13)------------------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING], mode=[Sorted]
3148-
14)--------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(c@3 AS Int64) as __common_expr_1, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, __common_expr_1@0 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]], file_type=csv, has_header=true
3148+
14)--------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(c@3 AS Int64) as __common_expr_1, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, __common_expr_1@0 ASC NULLS LAST]], file_type=csv, has_header=true
31493149

31503150
query IIIIIIIIIIIIIII
31513151
SELECT a, b, c,
@@ -3345,7 +3345,7 @@ logical_plan
33453345
physical_plan
33463346
01)ProjectionExec: expr=[sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum1, sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as sum2, sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum3, sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@6 as sum4]
33473347
02)--BoundedWindowAggExec: wdw=[sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Linear]
3348-
03)----RepartitionExec: partitioning=Hash([d@2], 2), input_partitions=2, preserve_order=true, sort_exprs=__common_expr_1@0 ASC NULLS LAST, a@1 ASC NULLS LAST
3348+
03)----RepartitionExec: partitioning=Hash([d@2], 2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST, __common_expr_1@0 ASC NULLS LAST
33493349
04)------ProjectionExec: expr=[__common_expr_1@0 as __common_expr_1, a@1 as a, d@4 as d, sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@6 as sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@7 as sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]
33503350
05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
33513351
06)----------RepartitionExec: partitioning=Hash([b@2, a@1], 2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST, __common_expr_1@0 ASC NULLS LAST

0 commit comments

Comments
 (0)