Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1244,6 +1244,13 @@ config_namespace! {
/// `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
pub prefer_existing_sort: bool, default = false

/// When set to true, the physical plan optimizer will use `PartialSortExec`
/// instead of `SortExec` when the input is already sorted on a prefix of the
/// required ordering, even for bounded (finite) inputs. This reduces memory
/// usage by sorting data segment-by-segment instead of buffering the entire input.
/// When false (default), `PartialSortExec` is only used for unbounded (streaming) inputs.
pub prefer_partial_sort: bool, default = false

/// When set to true, the logical plan optimizer will produce warning
/// messages if any optimization rules produce errors and then proceed to the next
/// rule. When set to false, any rules that produce errors will cause the query to fail
Expand Down
63 changes: 63 additions & 0 deletions datafusion/core/tests/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,15 @@ fn csv_exec_sorted(
pub(crate) struct EnforceSortingTest {
plan: Arc<dyn ExecutionPlan>,
repartition_sorts: bool,
prefer_partial_sort: bool,
}

impl EnforceSortingTest {
pub(crate) fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
Self {
plan,
repartition_sorts: false,
prefer_partial_sort: false,
}
}

Expand All @@ -112,11 +114,18 @@ impl EnforceSortingTest {
self
}

/// Set whether to prefer partial sort for bounded inputs
pub(crate) fn with_prefer_partial_sort(mut self, prefer_partial_sort: bool) -> Self {
self.prefer_partial_sort = prefer_partial_sort;
self
}

/// Runs the enforce sorting test and returns a string with the input and
/// optimized plan as strings for snapshot comparison using insta
pub(crate) fn run(&self) -> String {
let mut config = ConfigOptions::new();
config.optimizer.repartition_sorts = self.repartition_sorts;
config.optimizer.prefer_partial_sort = self.prefer_partial_sort;

// This file has 4 rules that use tree node, apply these rules as in the
// EnforceSorting::optimize implementation
Expand Down Expand Up @@ -1808,6 +1817,60 @@ async fn test_not_replaced_with_partial_sort_for_bounded_input() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn test_replace_with_partial_sort_for_bounded_input_with_config() -> Result<()> {
let schema = create_test_schema3()?;
let parquet_ordering = [sort_expr("a", &schema)].into();
let parquet_input = parquet_exec_with_sort(schema.clone(), vec![parquet_ordering]);
let physical_plan = sort_exec(
[sort_expr("a", &schema), sort_expr("c", &schema)].into(),
parquet_input,
);
let test = EnforceSortingTest::new(physical_plan.clone())
.with_repartition_sorts(false)
.with_prefer_partial_sort(true);

assert_snapshot!(test.run(), @r"
Input Plan:
SortExec: expr=[a@0 ASC, c@2 ASC], preserve_partitioning=[false]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet

Optimized Plan:
PartialSortExec: expr=[a@0 ASC, c@2 ASC], common_prefix_length=[1]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
");

Ok(())
}

#[tokio::test]
async fn test_not_replaced_with_partial_sort_for_bounded_input_no_prefix() -> Result<()> {
let schema = create_test_schema3()?;
let parquet_ordering = [sort_expr("b", &schema), sort_expr("c", &schema)].into();
let parquet_input = parquet_exec_with_sort(schema.clone(), vec![parquet_ordering]);
let physical_plan = sort_exec(
[
sort_expr("a", &schema),
sort_expr("b", &schema),
sort_expr("c", &schema),
]
.into(),
parquet_input,
);
// Even with prefer_partial_sort = true, no common prefix → stays SortExec
let test = EnforceSortingTest::new(physical_plan.clone())
.with_repartition_sorts(false)
.with_prefer_partial_sort(true);

assert_snapshot!(test.run(), @r"
Input / Optimized Plan:
SortExec: expr=[a@0 ASC, b@1 ASC, c@2 ASC], preserve_partitioning=[false]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[b@1 ASC, c@2 ASC], file_type=parquet
");

Ok(())
}

#[tokio::test]
async fn test_remove_unnecessary_sort() -> Result<()> {
let schema = create_test_schema()?;
Expand Down
17 changes: 11 additions & 6 deletions datafusion/physical-optimizer/src/enforce_sorting/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,9 @@ impl PhysicalOptimizerRule for EnforceSorting {
let adjusted = pushdown_sorts(sort_pushdown)?;
adjusted
.plan
.transform_up(|plan| Ok(Transformed::yes(replace_with_partial_sort(plan)?)))
.transform_up(|plan| {
Ok(Transformed::yes(replace_with_partial_sort(plan, config)?))
})
.data()
}

Expand All @@ -258,20 +260,23 @@ impl PhysicalOptimizerRule for EnforceSorting {
}
}

/// Only interested with [`SortExec`]s and their unbounded children.
/// If the plan is not a [`SortExec`] or its child is not unbounded, returns the original plan.
/// Otherwise, by checking the requirement satisfaction searches for a replacement chance.
/// If there's one replaces the [`SortExec`] plan with a [`PartialSortExec`]
/// Checks if a [`SortExec`] can be replaced with a [`PartialSortExec`] when
/// the child's existing ordering satisfies a prefix of the required sort ordering.
///
/// For unbounded inputs, this replacement is always attempted.
/// For bounded inputs, this replacement is only attempted when
/// `config.optimizer.prefer_partial_sort` is set to `true`.
fn replace_with_partial_sort(
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let Some(sort_plan) = plan.downcast_ref::<SortExec>() else {
return Ok(plan);
};

// It's safe to get first child of the SortExec
let child = Arc::clone(sort_plan.children()[0]);
if !child.boundedness().is_unbounded() {
if !child.boundedness().is_unbounded() && !config.optimizer.prefer_partial_sort {
return Ok(plan);
}

Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ datafusion.optimizer.max_passes 3
datafusion.optimizer.prefer_existing_sort false
datafusion.optimizer.prefer_existing_union false
datafusion.optimizer.prefer_hash_join true
datafusion.optimizer.prefer_partial_sort false
datafusion.optimizer.preserve_file_partitions 0
datafusion.optimizer.repartition_aggregations true
datafusion.optimizer.repartition_file_min_size 10485760
Expand Down Expand Up @@ -468,6 +469,7 @@ datafusion.optimizer.max_passes 3 Number of times that the optimizer will attemp
datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
datafusion.optimizer.prefer_existing_union false When set to true, the optimizer will not attempt to convert Union to Interleave
datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory
datafusion.optimizer.prefer_partial_sort false When set to true, the physical plan optimizer will use `PartialSortExec` instead of `SortExec` when the input is already sorted on a prefix of the required ordering, even for bounded (finite) inputs. This reduces memory usage by sorting data segment-by-segment instead of buffering the entire input. When false (default), `PartialSortExec` is only used for unbounded (streaming) inputs.
datafusion.optimizer.preserve_file_partitions 0 Minimum number of distinct partition values required to group files by their Hive partition column values (enabling Hash partitioning declaration). How the option is used: - preserve_file_partitions=0: Disable it. - preserve_file_partitions=1: Always enable it. - preserve_file_partitions=N, actual file partitions=M: Only enable when M >= N. This threshold preserves I/O parallelism when file partitioning is below it. Note: This may reduce parallelism, rooting from the I/O level, if the number of distinct partitions is less than the target_partitions.
datafusion.optimizer.repartition_aggregations true Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level
datafusion.optimizer.repartition_file_min_size 10485760 Minimum total files size in bytes to perform file scan repartitioning.
Expand Down
218 changes: 218 additions & 0 deletions datafusion/sqllogictest/test_files/partial_sort_bounded.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Tests for PartialSortExec with bounded (finite) inputs
# when `prefer_partial_sort` config is enabled.

###############################################################
# Test 1: prefix_length=2, suffix=1 — sorted on (a,b), ORDER BY a,b,c
###############################################################

statement ok
COPY (
SELECT a, b, c FROM (VALUES
(1, 10, 'z'), (1, 10, 'a'), (1, 20, 'x'), (1, 20, 'b'),
(2, 10, 'y'), (2, 10, 'c'), (2, 30, 'w'), (2, 30, 'd')
) AS t(a, b, c)
ORDER BY a, b
) TO '/tmp/test_partial_sort_bounded.parquet';

statement ok
CREATE EXTERNAL TABLE sorted_ab
STORED AS PARQUET
WITH ORDER (a ASC, b ASC)
LOCATION '/tmp/test_partial_sort_bounded.parquet';

# Default: bounded input uses SortExec (not PartialSortExec)
query TT
EXPLAIN SELECT * FROM sorted_ab ORDER BY a, b, c;
----
logical_plan
01)Sort: sorted_ab.a ASC NULLS LAST, sorted_ab.b ASC NULLS LAST, sorted_ab.c ASC NULLS LAST
02)--TableScan: sorted_ab projection=[a, b, c]
physical_plan
01)SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], preserve_partitioning=[false]
02)--DataSourceExec: file_groups={1 group: [[tmp/test_partial_sort_bounded.parquet]]}, projection=[a, b, c], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], file_type=parquet

# Enable prefer_partial_sort
statement ok
set datafusion.optimizer.prefer_partial_sort = true;

# With config: PartialSortExec with common_prefix_length=2
query TT
EXPLAIN SELECT * FROM sorted_ab ORDER BY a, b, c;
----
logical_plan
01)Sort: sorted_ab.a ASC NULLS LAST, sorted_ab.b ASC NULLS LAST, sorted_ab.c ASC NULLS LAST
02)--TableScan: sorted_ab projection=[a, b, c]
physical_plan
01)PartialSortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], common_prefix_length=[2]
02)--DataSourceExec: file_groups={1 group: [[tmp/test_partial_sort_bounded.parquet]]}, projection=[a, b, c], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], file_type=parquet

# Verify correctness
query IIT
SELECT * FROM sorted_ab ORDER BY a, b, c;
----
1 10 a
1 10 z
1 20 b
1 20 x
2 10 c
2 10 y
2 30 d
2 30 w

# Verify with LIMIT
query TT
EXPLAIN SELECT * FROM sorted_ab ORDER BY a, b, c LIMIT 3;
----
logical_plan
01)Sort: sorted_ab.a ASC NULLS LAST, sorted_ab.b ASC NULLS LAST, sorted_ab.c ASC NULLS LAST, fetch=3
02)--TableScan: sorted_ab projection=[a, b, c]
physical_plan
01)PartialSortExec: TopK(fetch=3), expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], common_prefix_length=[2]
02)--DataSourceExec: file_groups={1 group: [[tmp/test_partial_sort_bounded.parquet]]}, projection=[a, b, c], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], file_type=parquet

query IIT
SELECT * FROM sorted_ab ORDER BY a, b, c LIMIT 3;
----
1 10 a
1 10 z
1 20 b

statement ok
set datafusion.optimizer.prefer_partial_sort = false;

statement ok
DROP TABLE sorted_ab;

###############################################################
# Test 2: prefix_length=1, suffix=2 — sorted on (a), ORDER BY a,b,c
###############################################################

statement ok
COPY (
SELECT a, b, c, d FROM (VALUES
(1, 3, 20, 'p'), (1, 1, 10, 'q'), (1, 2, 30, 'r'),
(2, 2, 10, 's'), (2, 3, 20, 't'), (2, 1, 30, 'u'),
(3, 1, 10, 'v'), (3, 1, 20, 'w')
) AS t(a, b, c, d)
ORDER BY a
) TO '/tmp/test_partial_sort_prefix1.parquet';

statement ok
CREATE EXTERNAL TABLE sorted_a
STORED AS PARQUET
WITH ORDER (a ASC)
LOCATION '/tmp/test_partial_sort_prefix1.parquet';

statement ok
set datafusion.optimizer.prefer_partial_sort = true;

# prefix=1, sorting b and c within each group of a
query TT
EXPLAIN SELECT * FROM sorted_a ORDER BY a, b, c;
----
logical_plan
01)Sort: sorted_a.a ASC NULLS LAST, sorted_a.b ASC NULLS LAST, sorted_a.c ASC NULLS LAST
02)--TableScan: sorted_a projection=[a, b, c, d]
physical_plan
01)PartialSortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], common_prefix_length=[1]
02)--DataSourceExec: file_groups={1 group: [[tmp/test_partial_sort_prefix1.parquet]]}, projection=[a, b, c, d], output_ordering=[a@0 ASC NULLS LAST], file_type=parquet

# Verify correctness — within each a group, (b,c) should be sorted
query IIIT
SELECT * FROM sorted_a ORDER BY a, b, c;
----
1 1 10 q
1 2 30 r
1 3 20 p
2 1 30 u
2 2 10 s
2 3 20 t
3 1 10 v
3 1 20 w

statement ok
set datafusion.optimizer.prefer_partial_sort = false;

statement ok
DROP TABLE sorted_a;

###############################################################
# Test 3: prefix_length=3, suffix=1 — sorted on (a,b,c), ORDER BY a,b,c,d
###############################################################

statement ok
COPY (
SELECT a, b, c, d FROM (VALUES
(1, 1, 1, 40), (1, 1, 1, 10), (1, 1, 2, 30), (1, 1, 2, 20),
(1, 2, 1, 50), (1, 2, 1, 5),
(2, 1, 1, 60), (2, 1, 1, 15)
) AS t(a, b, c, d)
ORDER BY a, b, c
) TO '/tmp/test_partial_sort_prefix3.parquet';

statement ok
CREATE EXTERNAL TABLE sorted_abc
STORED AS PARQUET
WITH ORDER (a ASC, b ASC, c ASC)
LOCATION '/tmp/test_partial_sort_prefix3.parquet';

statement ok
set datafusion.optimizer.prefer_partial_sort = true;

# prefix=3, only sorting d within each (a,b,c) group
query TT
EXPLAIN SELECT * FROM sorted_abc ORDER BY a, b, c, d;
----
logical_plan
01)Sort: sorted_abc.a ASC NULLS LAST, sorted_abc.b ASC NULLS LAST, sorted_abc.c ASC NULLS LAST, sorted_abc.d ASC NULLS LAST
02)--TableScan: sorted_abc projection=[a, b, c, d]
physical_plan
01)PartialSortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, d@3 ASC NULLS LAST], common_prefix_length=[3]
02)--DataSourceExec: file_groups={1 group: [[tmp/test_partial_sort_prefix3.parquet]]}, projection=[a, b, c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], file_type=parquet

# Verify correctness — d sorted within each (a,b,c) group
query IIII
SELECT * FROM sorted_abc ORDER BY a, b, c, d;
----
1 1 1 10
1 1 1 40
1 1 2 20
1 1 2 30
1 2 1 5
1 2 1 50
2 1 1 15
2 1 1 60

# After reset: back to SortExec
statement ok
set datafusion.optimizer.prefer_partial_sort = false;

query TT
EXPLAIN SELECT * FROM sorted_abc ORDER BY a, b, c, d;
----
logical_plan
01)Sort: sorted_abc.a ASC NULLS LAST, sorted_abc.b ASC NULLS LAST, sorted_abc.c ASC NULLS LAST, sorted_abc.d ASC NULLS LAST
02)--TableScan: sorted_abc projection=[a, b, c, d]
physical_plan
01)SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST, d@3 ASC NULLS LAST], preserve_partitioning=[false]
02)--DataSourceExec: file_groups={1 group: [[tmp/test_partial_sort_prefix3.parquet]]}, projection=[a, b, c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], file_type=parquet

statement ok
DROP TABLE sorted_abc;
Loading
Loading