Skip to content

Commit 77308cc

Browse files
committed
support simplify order by deptno, total_sal, abs(deptno) to order by deptno, abs(deptno)
1 parent dc15b02 commit 77308cc

3 files changed

Lines changed: 72 additions & 48 deletions

File tree

datafusion/common/src/functional_dependencies.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -598,19 +598,19 @@ pub fn get_required_sort_exprs_indices(
598598
) -> Option<Vec<usize>> {
599599
let dependencies = schema.functional_dependencies();
600600
let field_names = schema.field_names();
601-
let sort_expr_indices = sort_expr_names
602-
.iter()
603-
.map(|sort_expr_name| {
604-
field_names
605-
.iter()
606-
.position(|field_name| field_name == sort_expr_name)
607-
})
608-
.collect::<Option<Vec<_>>>()?;
609601

610602
let mut known_field_indices = HashSet::new();
611603
let mut required_sort_expr_indices = Vec::new();
612604

613-
for (sort_expr_idx, field_idx) in sort_expr_indices.into_iter().enumerate() {
605+
for (sort_expr_idx, sort_expr_name) in sort_expr_names.iter().enumerate() {
606+
let Some(field_idx) = field_names
607+
.iter()
608+
.position(|field_name| field_name == sort_expr_name)
609+
else {
610+
required_sort_expr_indices.push(sort_expr_idx);
611+
continue;
612+
};
613+
614614
let removable = dependencies.deps.iter().any(|dependency| {
615615
dependency.target_indices.contains(&field_idx)
616616
&& dependency

datafusion/optimizer/src/eliminate_duplicated_expr.rs

Lines changed: 2 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,6 @@ mod tests {
160160
use crate::assert_optimized_plan_eq_snapshot;
161161
use crate::test::*;
162162
use datafusion_expr::{col, logical_plan::builder::LogicalPlanBuilder};
163-
use datafusion_functions_aggregate::sum::sum;
164163
use std::sync::Arc;
165164

166165
macro_rules! assert_optimized_plan_equal {
@@ -169,7 +168,8 @@ mod tests {
169168
@ $expected:literal $(,)?
170169
) => {{
171170
let optimizer_ctx = OptimizerContext::new().with_max_passes(1);
172-
let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![Arc::new(EliminateDuplicatedExpr::new())];
171+
let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> =
172+
vec![Arc::new(EliminateDuplicatedExpr::new())];
173173
assert_optimized_plan_eq_snapshot!(
174174
optimizer_ctx,
175175
rules,
@@ -214,40 +214,4 @@ mod tests {
214214
TableScan: test
215215
")
216216
}
217-
218-
#[test]
219-
fn eliminate_fd_redundant_sort_expr() -> Result<()> {
220-
let table_scan = test_table_scan().unwrap();
221-
let plan = LogicalPlanBuilder::from(table_scan)
222-
.aggregate(vec![col("a")], vec![sum(col("b")).alias("total_sal")])?
223-
.sort(vec![
224-
col("a").sort(true, true),
225-
col("total_sal").sort(true, true),
226-
])?
227-
.build()?;
228-
229-
assert_optimized_plan_equal!(plan, @r"
230-
Sort: test.a ASC NULLS FIRST
231-
Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b) AS total_sal]]
232-
TableScan: test
233-
")
234-
}
235-
236-
#[test]
237-
fn keep_order_by_when_dependency_comes_later() -> Result<()> {
238-
let table_scan = test_table_scan().unwrap();
239-
let plan = LogicalPlanBuilder::from(table_scan)
240-
.aggregate(vec![col("a")], vec![sum(col("b")).alias("total_sal")])?
241-
.sort(vec![
242-
col("total_sal").sort(true, true),
243-
col("a").sort(true, true),
244-
])?
245-
.build()?;
246-
247-
assert_optimized_plan_equal!(plan, @r"
248-
Sort: total_sal ASC NULLS FIRST, test.a ASC NULLS FIRST
249-
Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b) AS total_sal]]
250-
TableScan: test
251-
")
252-
}
253217
}

datafusion/sqllogictest/test_files/order.slt

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,66 @@ physical_plan
260260
02)--SortExec: expr=[c2@1 ASC NULLS LAST, c3@2 ASC NULLS LAST], preserve_partitioning=[false]
261261
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], file_type=csv, has_header=true
262262

263+
264+
# eliminate redundant fd sort expr
265+
query TT
266+
explain SELECT c2, SUM(c3) AS total_sal FROM aggregate_test_100 GROUP BY c2 ORDER BY c2, total_sal
267+
----
268+
logical_plan
269+
01)Sort: aggregate_test_100.c2 ASC NULLS LAST
270+
02)--Projection: aggregate_test_100.c2, sum(aggregate_test_100.c3) AS total_sal
271+
03)----Aggregate: groupBy=[[aggregate_test_100.c2]], aggr=[[sum(CAST(aggregate_test_100.c3 AS Int64))]]
272+
04)------TableScan: aggregate_test_100 projection=[c2, c3]
273+
physical_plan
274+
01)SortPreservingMergeExec: [c2@0 ASC NULLS LAST]
275+
02)--SortExec: expr=[c2@0 ASC NULLS LAST], preserve_partitioning=[true]
276+
03)----ProjectionExec: expr=[c2@0 as c2, sum(aggregate_test_100.c3)@1 as total_sal]
277+
04)------AggregateExec: mode=FinalPartitioned, gby=[c2@0 as c2], aggr=[sum(aggregate_test_100.c3)]
278+
05)--------RepartitionExec: partitioning=Hash([c2@0], 4), input_partitions=4
279+
06)----------AggregateExec: mode=Partial, gby=[c2@0 as c2], aggr=[sum(aggregate_test_100.c3)]
280+
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
281+
08)--------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], file_type=csv, has_header=true
282+
283+
# keep order by when dependency comes later
284+
query TT
285+
explain SELECT c2, SUM(c3) AS total_sal FROM aggregate_test_100 GROUP BY c2 ORDER BY total_sal, c2
286+
----
287+
logical_plan
288+
01)Sort: total_sal ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST
289+
02)--Projection: aggregate_test_100.c2, sum(aggregate_test_100.c3) AS total_sal
290+
03)----Aggregate: groupBy=[[aggregate_test_100.c2]], aggr=[[sum(CAST(aggregate_test_100.c3 AS Int64))]]
291+
04)------TableScan: aggregate_test_100 projection=[c2, c3]
292+
physical_plan
293+
01)SortPreservingMergeExec: [total_sal@1 ASC NULLS LAST, c2@0 ASC NULLS LAST]
294+
02)--SortExec: expr=[total_sal@1 ASC NULLS LAST, c2@0 ASC NULLS LAST], preserve_partitioning=[true]
295+
03)----ProjectionExec: expr=[c2@0 as c2, sum(aggregate_test_100.c3)@1 as total_sal]
296+
04)------AggregateExec: mode=FinalPartitioned, gby=[c2@0 as c2], aggr=[sum(aggregate_test_100.c3)]
297+
05)--------RepartitionExec: partitioning=Hash([c2@0], 4), input_partitions=4
298+
06)----------AggregateExec: mode=Partial, gby=[c2@0 as c2], aggr=[sum(aggregate_test_100.c3)]
299+
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
300+
08)--------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], file_type=csv, has_header=true
301+
302+
# eliminate redundant sort expr even with non schema expr
303+
query TT
304+
explain SELECT c2, SUM(c3) AS total_sal FROM aggregate_test_100 GROUP BY c2 ORDER BY c2, total_sal, abs(c2)
305+
----
306+
logical_plan
307+
01)Sort: aggregate_test_100.c2 ASC NULLS LAST, abs(aggregate_test_100.c2) ASC NULLS LAST
308+
02)--Projection: aggregate_test_100.c2, sum(aggregate_test_100.c3) AS total_sal
309+
03)----Aggregate: groupBy=[[aggregate_test_100.c2]], aggr=[[sum(CAST(aggregate_test_100.c3 AS Int64))]]
310+
04)------TableScan: aggregate_test_100 projection=[c2, c3]
311+
physical_plan
312+
01)SortPreservingMergeExec: [c2@0 ASC NULLS LAST, abs(c2@0) ASC NULLS LAST]
313+
02)--SortExec: expr=[c2@0 ASC NULLS LAST, abs(c2@0) ASC NULLS LAST], preserve_partitioning=[true]
314+
03)----ProjectionExec: expr=[c2@0 as c2, sum(aggregate_test_100.c3)@1 as total_sal]
315+
04)------AggregateExec: mode=FinalPartitioned, gby=[c2@0 as c2], aggr=[sum(aggregate_test_100.c3)]
316+
05)--------RepartitionExec: partitioning=Hash([c2@0], 4), input_partitions=4
317+
06)----------AggregateExec: mode=Partial, gby=[c2@0 as c2], aggr=[sum(aggregate_test_100.c3)]
318+
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
319+
08)--------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], file_type=csv, has_header=true
320+
321+
statement ok
322+
263323
query II
264324
SELECT c2, c3 FROM aggregate_test_100 ORDER BY c2, c3, c2
265325
----
@@ -1637,7 +1697,7 @@ physical_plan
16371697
statement ok
16381698
reset datafusion.catalog.information_schema;
16391699

1640-
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
1700+
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
16411701
# reset it explicitly.
16421702
statement ok
16431703
set datafusion.execution.target_partitions = 4;

0 commit comments

Comments
 (0)