-
Notifications
You must be signed in to change notification settings - Fork 2.1k
feat: Optimize ORDER BY by Pruning Functionally Redundant Sort Keys #21362
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
3a57d0b
dc15b02
77308cc
f998b1a
8725d5c
17b1bd4
fb3c0b9
e6e430f
c50dc1b
c1e58db
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -260,6 +260,64 @@ physical_plan | |
| 02)--SortExec: expr=[c2@1 ASC NULLS LAST, c3@2 ASC NULLS LAST], preserve_partitioning=[false] | ||
| 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], file_type=csv, has_header=true | ||
|
|
||
|
|
||
| # eliminate redundant fd sort expr | ||
| query TT | ||
| explain SELECT c2, SUM(c3) AS total_sal FROM aggregate_test_100 GROUP BY c2 ORDER BY c2, total_sal | ||
| ---- | ||
| logical_plan | ||
| 01)Sort: aggregate_test_100.c2 ASC NULLS LAST | ||
| 02)--Projection: aggregate_test_100.c2, sum(aggregate_test_100.c3) AS total_sal | ||
| 03)----Aggregate: groupBy=[[aggregate_test_100.c2]], aggr=[[sum(CAST(aggregate_test_100.c3 AS Int64))]] | ||
| 04)------TableScan: aggregate_test_100 projection=[c2, c3] | ||
| physical_plan | ||
| 01)SortPreservingMergeExec: [c2@0 ASC NULLS LAST] | ||
| 02)--SortExec: expr=[c2@0 ASC NULLS LAST], preserve_partitioning=[true] | ||
| 03)----ProjectionExec: expr=[c2@0 as c2, sum(aggregate_test_100.c3)@1 as total_sal] | ||
| 04)------AggregateExec: mode=FinalPartitioned, gby=[c2@0 as c2], aggr=[sum(aggregate_test_100.c3)] | ||
| 05)--------RepartitionExec: partitioning=Hash([c2@0], 4), input_partitions=4 | ||
| 06)----------AggregateExec: mode=Partial, gby=[c2@0 as c2], aggr=[sum(aggregate_test_100.c3)] | ||
| 07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | ||
| 08)--------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], file_type=csv, has_header=true | ||
|
|
||
| # keep order by when dependency comes later | ||
| query TT | ||
| explain SELECT c2, SUM(c3) AS total_sal FROM aggregate_test_100 GROUP BY c2 ORDER BY total_sal, c2 | ||
| ---- | ||
| logical_plan | ||
| 01)Sort: total_sal ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST | ||
| 02)--Projection: aggregate_test_100.c2, sum(aggregate_test_100.c3) AS total_sal | ||
| 03)----Aggregate: groupBy=[[aggregate_test_100.c2]], aggr=[[sum(CAST(aggregate_test_100.c3 AS Int64))]] | ||
| 04)------TableScan: aggregate_test_100 projection=[c2, c3] | ||
| physical_plan | ||
| 01)SortPreservingMergeExec: [total_sal@1 ASC NULLS LAST, c2@0 ASC NULLS LAST] | ||
| 02)--SortExec: expr=[total_sal@1 ASC NULLS LAST, c2@0 ASC NULLS LAST], preserve_partitioning=[true] | ||
| 03)----ProjectionExec: expr=[c2@0 as c2, sum(aggregate_test_100.c3)@1 as total_sal] | ||
| 04)------AggregateExec: mode=FinalPartitioned, gby=[c2@0 as c2], aggr=[sum(aggregate_test_100.c3)] | ||
| 05)--------RepartitionExec: partitioning=Hash([c2@0], 4), input_partitions=4 | ||
| 06)----------AggregateExec: mode=Partial, gby=[c2@0 as c2], aggr=[sum(aggregate_test_100.c3)] | ||
| 07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | ||
| 08)--------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], file_type=csv, has_header=true | ||
|
|
||
| # eliminate redundant sort expr even with non schema expr | ||
| query TT | ||
| explain SELECT c2, SUM(c3) AS total_sal FROM aggregate_test_100 GROUP BY c2 ORDER BY c2, total_sal, abs(c2) | ||
| ---- | ||
| logical_plan | ||
| 01)Sort: aggregate_test_100.c2 ASC NULLS LAST, abs(aggregate_test_100.c2) ASC NULLS LAST | ||
| 02)--Projection: aggregate_test_100.c2, sum(aggregate_test_100.c3) AS total_sal | ||
| 03)----Aggregate: groupBy=[[aggregate_test_100.c2]], aggr=[[sum(CAST(aggregate_test_100.c3 AS Int64))]] | ||
| 04)------TableScan: aggregate_test_100 projection=[c2, c3] | ||
| physical_plan | ||
| 01)SortPreservingMergeExec: [c2@0 ASC NULLS LAST, abs(c2@0) ASC NULLS LAST] | ||
| 02)--SortExec: expr=[c2@0 ASC NULLS LAST, abs(c2@0) ASC NULLS LAST], preserve_partitioning=[true] | ||
| 03)----ProjectionExec: expr=[c2@0 as c2, sum(aggregate_test_100.c3)@1 as total_sal] | ||
| 04)------AggregateExec: mode=FinalPartitioned, gby=[c2@0 as c2], aggr=[sum(aggregate_test_100.c3)] | ||
| 05)--------RepartitionExec: partitioning=Hash([c2@0], 4), input_partitions=4 | ||
| 06)----------AggregateExec: mode=Partial, gby=[c2@0 as c2], aggr=[sum(aggregate_test_100.c3)] | ||
| 07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | ||
| 08)--------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], file_type=csv, has_header=true | ||
|
|
||
| query II | ||
| SELECT c2, c3 FROM aggregate_test_100 ORDER BY c2, c3, c2 | ||
| ---- | ||
|
|
@@ -1521,7 +1579,7 @@ query TT | |
| EXPLAIN SELECT c1, c2 FROM table_with_ordered_pk ORDER BY c1, c2; | ||
| ---- | ||
| logical_plan | ||
| 01)Sort: table_with_ordered_pk.c1 ASC NULLS LAST, table_with_ordered_pk.c2 ASC NULLS LAST | ||
| 01)Sort: table_with_ordered_pk.c1 ASC NULLS LAST | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand this change -- the query requires For example what about INSERT INTO table VALUES (1,2)
INSERT INTO table VALUES (2,1)That is still ordered by c1, but if you don't also sort of c2, you'll end up with the wrong sort
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, the optimization seems sound to me. If c1 functionally determines c2, we know that each distinct c1 value is associated with exactly one c2 value. So sorting by c1 is sufficient; adding in c2 as a tiebreaker / secondary sort key is never useful. In the example, if
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @alamb I fully agree with Neil's explanation: functional dependency serves precisely to guarantee this property—for instance, an injective function inherently exhibits the characteristics of functional dependency. In the context of
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh, I see -- that makes sense -- the fact that there is a PK means there will never be a tie on c1. Thank you for the clarification |
||
| 02)--TableScan: table_with_ordered_pk projection=[c1, c2] | ||
| physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, projection=[c1, c2], output_ordering=[c1@0 ASC NULLS LAST], constraints=[PrimaryKey([0])], file_type=csv, has_header=true | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.