From 8eb939fb42822b8d38a9be492000eacf813683b4 Mon Sep 17 00:00:00 2001 From: Subham Singhal Date: Thu, 23 Apr 2026 19:55:23 +0530 Subject: [PATCH 1/2] feat: add prefer_partial_sort config to enable PartialSortExec for bounded inputs --- datafusion/common/src/config.rs | 7 + .../physical_optimizer/enforce_sorting.rs | 62 +++++ .../src/enforce_sorting/mod.rs | 17 +- .../test_files/information_schema.slt | 2 + .../test_files/partial_sort_bounded.slt | 218 ++++++++++++++++++ docs/source/user-guide/configs.md | 1 + 6 files changed, 301 insertions(+), 6 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/partial_sort_bounded.slt diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 85361ef5e17e1..1693cc6b3c69c 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1244,6 +1244,13 @@ config_namespace! { /// `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`. pub prefer_existing_sort: bool, default = false + /// When set to true, the physical plan optimizer will use `PartialSortExec` + /// instead of `SortExec` when the input is already sorted on a prefix of the + /// required ordering, even for bounded (finite) inputs. This reduces memory + /// usage by sorting data segment-by-segment instead of buffering the entire input. + /// When false (default), `PartialSortExec` is only used for unbounded (streaming) inputs. + pub prefer_partial_sort: bool, default = false + /// When set to true, the logical plan optimizer will produce warning /// messages if any optimization rules produce errors and then proceed to the next /// rule. When set to false, any rules that produce errors will cause the query to fail diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index 40bcdbbd6efef..136efde8c2157 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -96,6 +96,7 @@ fn csv_exec_sorted( pub(crate) struct EnforceSortingTest { plan: Arc, repartition_sorts: bool, + prefer_partial_sort: bool, } impl EnforceSortingTest { @@ -103,6 +104,7 @@ impl EnforceSortingTest { Self { plan, repartition_sorts: false, + prefer_partial_sort: false, } } @@ -112,11 +114,18 @@ impl EnforceSortingTest { self } + /// Set whether to prefer partial sort for bounded inputs + pub(crate) fn with_prefer_partial_sort(mut self, prefer_partial_sort: bool) -> Self { + self.prefer_partial_sort = prefer_partial_sort; + self + } + /// Runs the enforce sorting test and returns a string with the input and /// optimized plan as strings for snapshot comparison using insta pub(crate) fn run(&self) -> String { let mut config = ConfigOptions::new(); config.optimizer.repartition_sorts = self.repartition_sorts; + config.optimizer.prefer_partial_sort = self.prefer_partial_sort; // This file has 4 rules that use tree node, apply these rules as in the // EnforceSorting::optimize implementation @@ -1808,6 +1817,59 @@ async fn test_not_replaced_with_partial_sort_for_bounded_input() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_replace_with_partial_sort_for_bounded_input_with_config() -> Result<()> { + let schema = create_test_schema3()?; + let parquet_ordering = [sort_expr("a", &schema)].into(); + let parquet_input = parquet_exec_with_sort(schema.clone(), vec![parquet_ordering]); + let physical_plan = sort_exec( + [sort_expr("a", &schema), sort_expr("c", &schema)].into(), + parquet_input, + ); + let test = EnforceSortingTest::new(physical_plan.clone()) + .with_repartition_sorts(false) + .with_prefer_partial_sort(true); + + assert_snapshot!(test.run(), @r" + Input Plan: + SortExec: expr=[a@0 ASC, c@2 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + Optimized Plan: + PartialSortExec: expr=[a@0 ASC, c@2 ASC], common_prefix_length=[1] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + "); + + Ok(()) +} + +#[tokio::test] +async fn test_not_replaced_with_partial_sort_for_bounded_input_no_prefix() -> Result<()> { + let schema = create_test_schema3()?; + let parquet_ordering = [sort_expr("b", &schema), sort_expr("c", &schema)].into(); + let parquet_input = parquet_exec_with_sort(schema.clone(), vec![parquet_ordering]); + let physical_plan = sort_exec( + [ + sort_expr("a", &schema), + sort_expr("b", &schema), + sort_expr("c", &schema), + ] + .into(), + parquet_input, + ); + // Even with prefer_partial_sort = true, no common prefix → stays SortExec + let test = EnforceSortingTest::new(physical_plan.clone()) + .with_repartition_sorts(false) + .with_prefer_partial_sort(true); + + assert_snapshot!(test.run(), @r" + Input / Optimized Plan: + SortExec: expr=[a@0 ASC, b@1 ASC, c@2 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[b@1 ASC, c@2 ASC], file_type=parquet + "); + + Ok(()) +} + #[tokio::test] async fn test_remove_unnecessary_sort() -> Result<()> { let schema = create_test_schema()?; diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index 729a6b3121a83..6b4c7603f128a 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -245,7 +245,9 @@ impl PhysicalOptimizerRule for EnforceSorting { let adjusted = pushdown_sorts(sort_pushdown)?; adjusted .plan - .transform_up(|plan| Ok(Transformed::yes(replace_with_partial_sort(plan)?))) + .transform_up(|plan| { + Ok(Transformed::yes(replace_with_partial_sort(plan, config)?)) + }) .data() } @@ -258,12 +260,15 @@ impl PhysicalOptimizerRule for EnforceSorting { } } -/// Only interested with [`SortExec`]s and their unbounded children. -/// If the plan is not a [`SortExec`] or its child is not unbounded, returns the original plan. -/// Otherwise, by checking the requirement satisfaction searches for a replacement chance. -/// If there's one replaces the [`SortExec`] plan with a [`PartialSortExec`] +/// Checks if a [`SortExec`] can be replaced with a [`PartialSortExec`] when +/// the child's existing ordering satisfies a prefix of the required sort ordering. +/// +/// For unbounded inputs, this replacement is always attempted. +/// For bounded inputs, this replacement is only attempted when +/// `config.optimizer.prefer_partial_sort` is set to `true`. fn replace_with_partial_sort( plan: Arc, + config: &ConfigOptions, ) -> Result> { let Some(sort_plan) = plan.downcast_ref::() else { return Ok(plan); @@ -271,7 +276,7 @@ fn replace_with_partial_sort( // It's safe to get first child of the SortExec let child = Arc::clone(sort_plan.children()[0]); - if !child.boundedness().is_unbounded() { + if !child.boundedness().is_unbounded() && !config.optimizer.prefer_partial_sort { return Ok(plan); } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index b04c78bd2774c..56336e0ee2d95 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -321,6 +321,7 @@ datafusion.optimizer.max_passes 3 datafusion.optimizer.prefer_existing_sort false datafusion.optimizer.prefer_existing_union false datafusion.optimizer.prefer_hash_join true +datafusion.optimizer.prefer_partial_sort false datafusion.optimizer.preserve_file_partitions 0 datafusion.optimizer.repartition_aggregations true datafusion.optimizer.repartition_file_min_size 10485760 @@ -468,6 +469,7 @@ datafusion.optimizer.max_passes 3 Number of times that the optimizer will attemp datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`. datafusion.optimizer.prefer_existing_union false When set to true, the optimizer will not attempt to convert Union to Interleave datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory +datafusion.optimizer.prefer_partial_sort false When set to true, the physical plan optimizer will use `PartialSortExec` instead of `SortExec` when the input is already sorted on a prefix of the required ordering, even for bounded (finite) inputs. This reduces memory usage by sorting data segment-by-segment instead of buffering the entire input. When false (default), `PartialSortExec` is only used for unbounded (streaming) inputs. datafusion.optimizer.preserve_file_partitions 0 Minimum number of distinct partition values required to group files by their Hive partition column values (enabling Hash partitioning declaration). How the option is used: - preserve_file_partitions=0: Disable it. - preserve_file_partitions=1: Always enable it. - preserve_file_partitions=N, actual file partitions=M: Only enable when M >= N. This threshold preserves I/O parallelism when file partitioning is below it. Note: This may reduce parallelism, rooting from the I/O level, if the number of distinct partitions is less than the target_partitions. datafusion.optimizer.repartition_aggregations true Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level datafusion.optimizer.repartition_file_min_size 10485760 Minimum total files size in bytes to perform file scan repartitioning. diff --git a/datafusion/sqllogictest/test_files/partial_sort_bounded.slt b/datafusion/sqllogictest/test_files/partial_sort_bounded.slt new file mode 100644 index 0000000000000..9a5d3a604537e --- /dev/null +++ b/datafusion/sqllogictest/test_files/partial_sort_bounded.slt @@ -0,0 +1,218 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Tests for PartialSortExec with bounded (finite) inputs +# when `prefer_partial_sort` config is enabled. + +############################################################### +# Test 1: prefix_length=2, suffix=1 — sorted on (a,b), ORDER BY a,b,c +############################################################### + +statement ok +COPY ( + SELECT a, b, c FROM (VALUES + (1, 10, 'z'), (1, 10, 'a'), (1, 20, 'x'), (1, 20, 'b'), + (2, 10, 'y'), (2, 10, 'c'), (2, 30, 'w'), (2, 30, 'd') + ) AS t(a, b, c) + ORDER BY a, b +) TO '/tmp/test_partial_sort_bounded.parquet'; + +statement ok +CREATE EXTERNAL TABLE sorted_ab +STORED AS PARQUET +WITH ORDER (a ASC, b ASC) +LOCATION '/tmp/test_partial_sort_bounded.parquet'; + +# Default: bounded input uses SortExec (not PartialSortExec) +query TT +EXPLAIN SELECT * FROM sorted_ab ORDER BY a, b, c; +---- +logical_plan +01)Sort: sorted_ab.a ASC NULLS LAST, sorted_ab.b ASC NULLS LAST, sorted_ab.c ASC NULLS LAST +02)--TableScan: sorted_ab projection=[a, b, c] +physical_plan +01)SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[tmp/test_partial_sort_bounded.parquet]]}, projection=[a, b, c], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], file_type=parquet + +# Enable prefer_partial_sort +statement ok +set datafusion.optimizer.prefer_partial_sort = true; + +# With config: PartialSortExec with common_prefix_length=2 +query TT +EXPLAIN SELECT * FROM sorted_ab ORDER BY a, b, c; +---- +logical_plan +01)Sort: sorted_ab.a ASC NULLS LAST, sorted_ab.b ASC NULLS LAST, sorted_ab.c ASC NULLS LAST +02)--TableScan: sorted_ab projection=[a, b, c] +physical_plan +01)PartialSortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], common_prefix_length=[2] +02)--DataSourceExec: file_groups={1 group: [[tmp/test_partial_sort_bounded.parquet]]}, projection=[a, b, c], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], file_type=parquet + +# Verify correctness +query IIT +SELECT * FROM sorted_ab ORDER BY a, b, c; +---- +1 10 a +1 10 z +1 20 b +1 20 x +2 10 c +2 10 y +2 30 d +2 30 w + +# Verify with LIMIT +query TT +EXPLAIN SELECT * FROM sorted_ab ORDER BY a, b, c LIMIT 3; +---- +logical_plan +01)Sort: sorted_ab.a ASC NULLS LAST, sorted_ab.b ASC NULLS LAST, sorted_ab.c ASC NULLS LAST, fetch=3 +02)--TableScan: sorted_ab projection=[a, b, c] +physical_plan +01)PartialSortExec: TopK(fetch=3), expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], common_prefix_length=[2] +02)--DataSourceExec: file_groups={1 group: [[tmp/test_partial_sort_bounded.parquet]]}, projection=[a, b, c], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], file_type=parquet + +query IIT +SELECT * FROM sorted_ab ORDER BY a, b, c LIMIT 3; +---- +1 10 a +1 10 z +1 20 b + +statement ok +set datafusion.optimizer.prefer_partial_sort = false; + +statement ok +DROP TABLE sorted_ab; + +############################################################### +# Test 2: prefix_length=1, suffix=2 — sorted on (a), ORDER BY a,b,c +############################################################### + +statement ok +COPY ( + SELECT a, b, c, d FROM (VALUES + (1, 3, 20, 'p'), (1, 1, 10, 'q'), (1, 2, 30, 'r'), + (2, 2, 10, 's'), (2, 3, 20, 't'), (2, 1, 30, 'u'), + (3, 1, 10, 'v'), (3, 1, 20, 'w') + ) AS t(a, b, c, d) + ORDER BY a +) TO '/tmp/test_partial_sort_prefix1.parquet'; + +statement ok +CREATE EXTERNAL TABLE sorted_a +STORED AS PARQUET +WITH ORDER (a ASC) +LOCATION '/tmp/test_partial_sort_prefix1.parquet'; + +statement ok +set datafusion.optimizer.prefer_partial_sort = true; + +# prefix=1, sorting b and c within each group of a +query TT +EXPLAIN SELECT * FROM sorted_a ORDER BY a, b, c; +---- +logical_plan +01)Sort: sorted_a.a ASC NULLS LAST, sorted_a.b ASC NULLS LAST, sorted_a.c ASC NULLS LAST +02)--TableScan: sorted_a projection=[a, b, c, d] +physical_plan +01)PartialSortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], common_prefix_length=[1] +02)--DataSourceExec: file_groups={1 group: [[tmp/test_partial_sort_prefix1.parquet]]}, projection=[a, b, c, d], output_ordering=[a@0 ASC NULLS LAST], file_type=parquet + +# Verify correctness — within each a group, (b,c) should be sorted +query IIIT +SELECT * FROM sorted_a ORDER BY a, b, c; +---- +1 1 10 q +1 2 30 r +1 3 20 p +2 1 30 u +2 2 10 s +2 3 20 t +3 1 10 v +3 1 20 w + +statement ok +set datafusion.optimizer.prefer_partial_sort = false; + +statement ok +DROP TABLE sorted_a; + +############################################################### +# Test 3: prefix_length=3, suffix=1 — sorted on (a,b,c), ORDER BY a,b,c,d +############################################################### + +statement ok +COPY ( + SELECT a, b, c, d FROM (VALUES + (1, 1, 1, 40), (1, 1, 1, 10), (1, 1, 2, 30), (1, 1, 2, 20), + (1, 2, 1, 50), (1, 2, 1, 5), + (2, 1, 1, 60), (2, 1, 1, 15) + ) AS t(a, b, c, d) + ORDER BY a, b, c +) TO '/tmp/test_partial_sort_prefix3.parquet'; + +statement ok +CREATE EXTERNAL TABLE sorted_abc +STORED AS PARQUET +WITH ORDER (a ASC, b ASC, c ASC) +LOCATION '/tmp/test_partial_sort_prefix3.parquet'; + +statement ok +set datafusion.optimizer.prefer_partial_sort = true; + +# prefix=3, only sorting d within each (a,b,c) group +query TT +EXPLAIN SELECT * FROM sorted_abc ORDER BY a, b, c, d; +---- +logical_plan +01)Sort: sorted_abc.a ASC NULLS LAST, sorted_abc.b ASC NULLS LAST, sorted_abc.c ASC NULLS LAST, sorted_abc.d ASC NULLS LAST +02)--TableScan: sorted_abc projection=[a, b, c, d] +physical_plan +01)PartialSortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, d@3 ASC NULLS LAST], common_prefix_length=[3] +02)--DataSourceExec: file_groups={1 group: [[tmp/test_partial_sort_prefix3.parquet]]}, projection=[a, b, c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], file_type=parquet + +# Verify correctness — d sorted within each (a,b,c) group +query IIII +SELECT * FROM sorted_abc ORDER BY a, b, c, d; +---- +1 1 1 10 +1 1 1 40 +1 1 2 20 +1 1 2 30 +1 2 1 5 +1 2 1 50 +2 1 1 15 +2 1 1 60 + +# After reset: back to SortExec +statement ok +set datafusion.optimizer.prefer_partial_sort = false; + +query TT +EXPLAIN SELECT * FROM sorted_abc ORDER BY a, b, c, d; +---- +logical_plan +01)Sort: sorted_abc.a ASC NULLS LAST, sorted_abc.b ASC NULLS LAST, sorted_abc.c ASC NULLS LAST, sorted_abc.d ASC NULLS LAST +02)--TableScan: sorted_abc projection=[a, b, c, d] +physical_plan +01)SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, d@3 ASC NULLS LAST], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[tmp/test_partial_sort_prefix3.parquet]]}, projection=[a, b, c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], file_type=parquet + +statement ok +DROP TABLE sorted_abc; \ No newline at end of file diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 46039f3c99c27..285e61cfd6514 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -157,6 +157,7 @@ The following configuration settings are available: | datafusion.optimizer.repartition_sorts | true | Should DataFusion execute sorts in a per-partition fashion and merge afterwards instead of coalescing first and sorting globally. With this flag is enabled, plans in the form below `text "SortExec: [a@0 ASC]", " CoalescePartitionsExec", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` would turn into the plan below which performs better in multithreaded environments `text "SortPreservingMergeExec: [a@0 ASC]", " SortExec: [a@0 ASC]", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` | | datafusion.optimizer.subset_repartition_threshold | 4 | Partition count threshold for subset satisfaction optimization. When the current partition count is >= this threshold, DataFusion will skip repartitioning if the required partitioning expression is a subset of the current partition expression such as Hash(a) satisfies Hash(a, b). When the current partition count is < this threshold, DataFusion will repartition to increase parallelism even when subset satisfaction applies. Set to 0 to always repartition (disable subset satisfaction optimization). Set to a high value to always use subset satisfaction. Example (subset_repartition_threshold = 4): `text Hash([a]) satisfies Hash([a, b]) because (Hash([a, b]) is subset of Hash([a]) If current partitions (3) < threshold (4), repartition: AggregateExec: mode=FinalPartitioned, gby=[a, b], aggr=[SUM(x)] RepartitionExec: partitioning=Hash([a, b], 8), input_partitions=3 AggregateExec: mode=Partial, gby=[a, b], aggr=[SUM(x)] DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 3) If current partitions (8) >= threshold (4), use subset satisfaction: AggregateExec: mode=SinglePartitioned, gby=[a, b], aggr=[SUM(x)] DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 8) ` | | datafusion.optimizer.prefer_existing_sort | false | When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`. | +| datafusion.optimizer.prefer_partial_sort | false | When set to true, the physical plan optimizer will use `PartialSortExec` instead of `SortExec` when the input is already sorted on a prefix of the required ordering, even for bounded (finite) inputs. This reduces memory usage by sorting data segment-by-segment instead of buffering the entire input. When false (default), `PartialSortExec` is only used for unbounded (streaming) inputs. | | datafusion.optimizer.skip_failed_rules | false | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail | | datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan | | datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys | From d94d115a5da6c94093c25f6ea1be98f73f26ee7b Mon Sep 17 00:00:00 2001 From: Subham Singhal Date: Fri, 24 Apr 2026 11:01:24 +0530 Subject: [PATCH 2/2] Fix build failure --- datafusion/core/tests/physical_optimizer/enforce_sorting.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index 136efde8c2157..f977ee20a11f5 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -1834,6 +1834,7 @@ async fn test_replace_with_partial_sort_for_bounded_input_with_config() -> Resul Input Plan: SortExec: expr=[a@0 ASC, c@2 ASC], preserve_partitioning=[false] DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + Optimized Plan: PartialSortExec: expr=[a@0 ASC, c@2 ASC], common_prefix_length=[1] DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet