Skip to content

Commit fd882fb

Browse files
xiedeyantualamb
andauthored
feat: Optimize ORDER BY by Pruning Functionally Redundant Sort Keys (#21362)
## Which issue does this PR close? - Closes #21361 . ## Rationale for this change This PR adds functional-dependency-based simplification for `ORDER BY` clauses. When an earlier sort key already functionally determines a later key, the later key is redundant and can be removed without changing query semantics. This reduces unnecessary sorting work and avoids carrying extra sort keys through planning and execution. ## What changes are included in this PR? This PR extends the existing functional dependency utilities with a helper for pruning redundant sort keys, and wires that helper into `eliminate_duplicated_expr` so `Sort` nodes can be simplified during optimization. It also adds regression coverage for both the positive case, where a trailing sort key is removed, and the negative case, where sort order prevents pruning. ## Are these changes tested? Yes. I added unit tests covering: - removal of a functionally redundant trailing `ORDER BY` key - preservation of ordering when the dependent column appears before its determinant I also ran `cargo test -p datafusion-optimizer eliminate_duplicated_expr -- --nocapture` successfully, and `cargo fmt --all` passes. ## Are there any user-facing changes? Yes, but only in query planning behavior. Some queries with redundant `ORDER BY` keys may produce simpler plans and run more efficiently. There are no public API changes. --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 43d32a8 commit fd882fb

5 files changed

Lines changed: 141 additions & 13 deletions

File tree

datafusion/common/src/functional_dependencies.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -590,6 +590,53 @@ pub fn get_required_group_by_exprs_indices(
590590
.collect()
591591
}
592592

593+
/// Returns indices for the minimal subset of ORDER BY expressions that are
594+
/// functionally equivalent to the original set of ORDER BY expressions.
595+
pub fn get_required_sort_exprs_indices(
596+
schema: &DFSchema,
597+
sort_expr_names: &[String],
598+
) -> Vec<usize> {
599+
let dependencies = schema.functional_dependencies();
600+
let field_names = schema.field_names();
601+
602+
let mut known_field_indices = HashSet::new();
603+
let mut required_sort_expr_indices = Vec::new();
604+
605+
for (sort_expr_idx, sort_expr_name) in sort_expr_names.iter().enumerate() {
606+
// If the sort expression doesn't correspond to a known schema field
607+
// (e.g. a computed expression), we can't reason about it via functional
608+
// dependencies, so conservatively keep it.
609+
let Some(field_idx) = field_names
610+
.iter()
611+
.position(|field_name| field_name == sort_expr_name)
612+
else {
613+
required_sort_expr_indices.push(sort_expr_idx);
614+
continue;
615+
};
616+
617+
// A sort expression is removable if its value is functionally determined
618+
// by fields that already appear earlier in the sort order: if the earlier
619+
// fields are fixed, this one's value is fixed too, so it adds no ordering
620+
// information.
621+
let removable = dependencies.deps.iter().any(|dependency| {
622+
dependency.target_indices.contains(&field_idx)
623+
&& dependency
624+
.source_indices
625+
.iter()
626+
.all(|source_idx| known_field_indices.contains(source_idx))
627+
});
628+
629+
if removable {
630+
continue;
631+
}
632+
633+
known_field_indices.insert(field_idx);
634+
required_sort_expr_indices.push(sort_expr_idx);
635+
}
636+
637+
required_sort_expr_indices
638+
}
639+
593640
/// Updates entries inside the `entries` vector with their corresponding
594641
/// indices inside the `proj_indices` vector.
595642
fn update_elements_with_matching_indices(

datafusion/common/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ pub use file_options::file_type::{
8282
pub use functional_dependencies::{
8383
Constraint, Constraints, Dependency, FunctionalDependence, FunctionalDependencies,
8484
aggregate_functional_dependencies, get_required_group_by_exprs_indices,
85-
get_target_functional_dependencies,
85+
get_required_sort_exprs_indices, get_target_functional_dependencies,
8686
};
8787
use hashbrown::DefaultHashBuilder;
8888
pub use join_type::{JoinConstraint, JoinSide, JoinType};

datafusion/optimizer/src/eliminate_duplicated_expr.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
2020
use crate::optimizer::ApplyOrder;
2121
use crate::{OptimizerConfig, OptimizerRule};
22-
use datafusion_common::Result;
2322
use datafusion_common::tree_node::Transformed;
23+
use datafusion_common::{Result, get_required_sort_exprs_indices, internal_err};
2424
use datafusion_expr::logical_plan::LogicalPlan;
2525
use datafusion_expr::{Aggregate, Expr, Sort, SortExpr};
2626
use std::hash::{Hash, Hasher};
@@ -76,12 +76,36 @@ impl OptimizerRule for EliminateDuplicatedExpr {
7676
.map(|wrapper| wrapper.0)
7777
.collect();
7878

79+
let sort_expr_names = unique_exprs
80+
.iter()
81+
.map(|sort_expr| sort_expr.expr.schema_name().to_string())
82+
.collect::<Vec<_>>();
83+
let required_indices = get_required_sort_exprs_indices(
84+
sort.input.schema().as_ref(),
85+
&sort_expr_names,
86+
);
87+
88+
let unique_exprs = if required_indices.len() < unique_exprs.len() {
89+
required_indices
90+
.into_iter()
91+
.map(|idx| unique_exprs[idx].clone())
92+
.collect()
93+
} else {
94+
unique_exprs
95+
};
96+
7997
let transformed = if len != unique_exprs.len() {
8098
Transformed::yes
8199
} else {
82100
Transformed::no
83101
};
84102

103+
if unique_exprs.is_empty() {
104+
return internal_err!(
105+
"FD pruning unexpectedly removed all ORDER BY expressions"
106+
);
107+
}
108+
85109
Ok(transformed(LogicalPlan::Sort(Sort {
86110
expr: unique_exprs,
87111
input: sort.input,
@@ -130,7 +154,8 @@ mod tests {
130154
@ $expected:literal $(,)?
131155
) => {{
132156
let optimizer_ctx = OptimizerContext::new().with_max_passes(1);
133-
let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![Arc::new(EliminateDuplicatedExpr::new())];
157+
let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> =
158+
vec![Arc::new(EliminateDuplicatedExpr::new())];
134159
assert_optimized_plan_eq_snapshot!(
135160
optimizer_ctx,
136161
rules,

datafusion/sqllogictest/test_files/order.slt

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,64 @@ physical_plan
260260
02)--SortExec: expr=[c2@1 ASC NULLS LAST, c3@2 ASC NULLS LAST], preserve_partitioning=[false]
261261
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], file_type=csv, has_header=true
262262

263+
264+
# eliminate redundant fd sort expr
265+
query TT
266+
explain SELECT c2, SUM(c3) AS total_sal FROM aggregate_test_100 GROUP BY c2 ORDER BY c2, total_sal
267+
----
268+
logical_plan
269+
01)Sort: aggregate_test_100.c2 ASC NULLS LAST
270+
02)--Projection: aggregate_test_100.c2, sum(aggregate_test_100.c3) AS total_sal
271+
03)----Aggregate: groupBy=[[aggregate_test_100.c2]], aggr=[[sum(CAST(aggregate_test_100.c3 AS Int64))]]
272+
04)------TableScan: aggregate_test_100 projection=[c2, c3]
273+
physical_plan
274+
01)SortPreservingMergeExec: [c2@0 ASC NULLS LAST]
275+
02)--SortExec: expr=[c2@0 ASC NULLS LAST], preserve_partitioning=[true]
276+
03)----ProjectionExec: expr=[c2@0 as c2, sum(aggregate_test_100.c3)@1 as total_sal]
277+
04)------AggregateExec: mode=FinalPartitioned, gby=[c2@0 as c2], aggr=[sum(aggregate_test_100.c3)]
278+
05)--------RepartitionExec: partitioning=Hash([c2@0], 4), input_partitions=4
279+
06)----------AggregateExec: mode=Partial, gby=[c2@0 as c2], aggr=[sum(aggregate_test_100.c3)]
280+
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
281+
08)--------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], file_type=csv, has_header=true
282+
283+
# keep order by when dependency comes later
284+
query TT
285+
explain SELECT c2, SUM(c3) AS total_sal FROM aggregate_test_100 GROUP BY c2 ORDER BY total_sal, c2
286+
----
287+
logical_plan
288+
01)Sort: total_sal ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST
289+
02)--Projection: aggregate_test_100.c2, sum(aggregate_test_100.c3) AS total_sal
290+
03)----Aggregate: groupBy=[[aggregate_test_100.c2]], aggr=[[sum(CAST(aggregate_test_100.c3 AS Int64))]]
291+
04)------TableScan: aggregate_test_100 projection=[c2, c3]
292+
physical_plan
293+
01)SortPreservingMergeExec: [total_sal@1 ASC NULLS LAST, c2@0 ASC NULLS LAST]
294+
02)--SortExec: expr=[total_sal@1 ASC NULLS LAST, c2@0 ASC NULLS LAST], preserve_partitioning=[true]
295+
03)----ProjectionExec: expr=[c2@0 as c2, sum(aggregate_test_100.c3)@1 as total_sal]
296+
04)------AggregateExec: mode=FinalPartitioned, gby=[c2@0 as c2], aggr=[sum(aggregate_test_100.c3)]
297+
05)--------RepartitionExec: partitioning=Hash([c2@0], 4), input_partitions=4
298+
06)----------AggregateExec: mode=Partial, gby=[c2@0 as c2], aggr=[sum(aggregate_test_100.c3)]
299+
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
300+
08)--------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], file_type=csv, has_header=true
301+
302+
# eliminate redundant sort expr even with non schema expr
303+
query TT
304+
explain SELECT c2, SUM(c3) AS total_sal FROM aggregate_test_100 GROUP BY c2 ORDER BY c2, total_sal, abs(c2)
305+
----
306+
logical_plan
307+
01)Sort: aggregate_test_100.c2 ASC NULLS LAST, abs(aggregate_test_100.c2) ASC NULLS LAST
308+
02)--Projection: aggregate_test_100.c2, sum(aggregate_test_100.c3) AS total_sal
309+
03)----Aggregate: groupBy=[[aggregate_test_100.c2]], aggr=[[sum(CAST(aggregate_test_100.c3 AS Int64))]]
310+
04)------TableScan: aggregate_test_100 projection=[c2, c3]
311+
physical_plan
312+
01)SortPreservingMergeExec: [c2@0 ASC NULLS LAST, abs(c2@0) ASC NULLS LAST]
313+
02)--SortExec: expr=[c2@0 ASC NULLS LAST, abs(c2@0) ASC NULLS LAST], preserve_partitioning=[true]
314+
03)----ProjectionExec: expr=[c2@0 as c2, sum(aggregate_test_100.c3)@1 as total_sal]
315+
04)------AggregateExec: mode=FinalPartitioned, gby=[c2@0 as c2], aggr=[sum(aggregate_test_100.c3)]
316+
05)--------RepartitionExec: partitioning=Hash([c2@0], 4), input_partitions=4
317+
06)----------AggregateExec: mode=Partial, gby=[c2@0 as c2], aggr=[sum(aggregate_test_100.c3)]
318+
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
319+
08)--------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], file_type=csv, has_header=true
320+
263321
query II
264322
SELECT c2, c3 FROM aggregate_test_100 ORDER BY c2, c3, c2
265323
----
@@ -1521,7 +1579,7 @@ query TT
15211579
EXPLAIN SELECT c1, c2 FROM table_with_ordered_pk ORDER BY c1, c2;
15221580
----
15231581
logical_plan
1524-
01)Sort: table_with_ordered_pk.c1 ASC NULLS LAST, table_with_ordered_pk.c2 ASC NULLS LAST
1582+
01)Sort: table_with_ordered_pk.c1 ASC NULLS LAST
15251583
02)--TableScan: table_with_ordered_pk projection=[c1, c2]
15261584
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, projection=[c1, c2], output_ordering=[c1@0 ASC NULLS LAST], constraints=[PrimaryKey([0])], file_type=csv, has_header=true
15271585

datafusion/sqllogictest/test_files/window.slt

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2425,11 +2425,9 @@ SELECT c9, rn1 FROM (SELECT c9,
24252425
145294611 96
24262426

24272427
# test_c9_rn_ordering_alias_opposite_direction3
2428-
# These test check for whether datafusion is aware of the ordering of the column generated by ROW_NUMBER() window function.
2429-
# Physical plan should have a SortExec after BoundedWindowAggExec.
2430-
# The reason is that ordering of the table after BoundedWindowAggExec can be described as rn1 ASC, and also c9 DESC.
2431-
# However, the requirement is rn1 ASC, c9 ASC (lexicographical order). Hence existing ordering cannot satisfy requirement
2432-
# (Requirement is finer than existing ordering)
2428+
# These tests check whether DataFusion tracks the ordering of the column generated by ROW_NUMBER() window function.
2429+
# The outer ORDER BY can be simplified by ordering equivalence, so the plan should not need an additional SortExec
2430+
# beyond the one required to satisfy the window input order.
24332431
query TT
24342432
EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
24352433
ROW_NUMBER() OVER(ORDER BY c9 DESC) as rn1
@@ -2439,13 +2437,13 @@ EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
24392437
LIMIT 5
24402438
----
24412439
logical_plan
2442-
01)Sort: rn1 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST, fetch=5
2440+
01)Sort: rn1 ASC NULLS LAST, fetch=5
24432441
02)--Projection: aggregate_test_100.c9, row_number() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
24442442
03)----WindowAggr: windowExpr=[[row_number() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
24452443
04)------TableScan: aggregate_test_100 projection=[c9]
24462444
physical_plan
2447-
01)SortExec: TopK(fetch=5), expr=[rn1@1 ASC NULLS LAST, c9@0 ASC NULLS LAST], preserve_partitioning=[false], sort_prefix=[rn1@1 ASC NULLS LAST]
2448-
02)--ProjectionExec: expr=[c9@0 as c9, row_number() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
2445+
01)ProjectionExec: expr=[c9@0 as c9, row_number() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn1]
2446+
02)--GlobalLimitExec: skip=0, fetch=5
24492447
03)----BoundedWindowAggExec: wdw=[row_number() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
24502448
04)------SortExec: expr=[c9@0 DESC], preserve_partitioning=[false]
24512449
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], file_type=csv, has_header=true
@@ -2514,7 +2512,7 @@ EXPLAIN SELECT c9, rn1 FROM (SELECT c9,
25142512
LIMIT 5
25152513
----
25162514
logical_plan
2517-
01)Sort: rn1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST, fetch=5
2515+
01)Sort: rn1 ASC NULLS LAST, fetch=5
25182516
02)--Projection: aggregate_test_100.c9, row_number() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
25192517
03)----WindowAggr: windowExpr=[[row_number() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
25202518
04)------TableScan: aggregate_test_100 projection=[c9]

0 commit comments

Comments
 (0)