Skip to content

Commit 6f1040b

Browse files
authored
Fix PushdownSort dropping LIMIT when eliminating SortExec (#21744)
## Which issue does this PR close? When `PushdownSort` removes a `SortExec` because a source returns `Exact` (guaranteeing ordering), any `fetch` (LIMIT) on the `SortExec` is silently dropped if the underlying plan does not support `with_fetch()`. For example, `ProjectionExec` supports `try_pushdown_sort` (delegating to its child) but does not implement `with_fetch()`. A plan like `SortExec(fetch=10) → ProjectionExec → source` that gets sort-eliminated loses the limit. ## What changes are included in this PR? In the `Exact` branch of `PushdownSort`, when the eliminated `SortExec` carried a `fetch`: 1. Try `with_fetch()` on the pushed-down source first 2. If `with_fetch()` returns `None`, fall back to wrapping with `GlobalLimitExec` ## Are these changes tested? Yes. Three new unit tests: - `test_sort_pushdown_exact_no_fetch_no_limit` — Exact elimination without fetch: no limit wrapper added - `test_sort_pushdown_exact_preserves_fetch_with_global_limit` — Exact elimination with fetch, source does NOT support `with_fetch()`: `GlobalLimitExec` wrapper added - `test_sort_pushdown_exact_preserves_fetch_with_source_support` — Exact elimination with fetch, source supports `with_fetch()`: limit pushed into source directly ## Are there any user-facing changes? No.
1 parent aab4263 commit 6f1040b

3 files changed

Lines changed: 185 additions & 22 deletions

File tree

datafusion/core/tests/physical_optimizer/pushdown_sort.rs

Lines changed: 92 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@ use datafusion_physical_optimizer::pushdown_sort::PushdownSort;
3232
use std::sync::Arc;
3333

3434
use crate::physical_optimizer::test_utils::{
35-
OptimizationTest, coalesce_partitions_exec, parquet_exec, parquet_exec_with_sort,
36-
projection_exec, projection_exec_with_alias, repartition_exec, schema,
37-
simple_projection_exec, sort_exec, sort_exec_with_fetch, sort_expr, sort_expr_named,
38-
test_scan_with_ordering,
35+
OptimizationTest, TestScan, coalesce_partitions_exec, parquet_exec,
36+
parquet_exec_with_sort, projection_exec, projection_exec_with_alias,
37+
repartition_exec, schema, simple_projection_exec, sort_exec, sort_exec_with_fetch,
38+
sort_expr, sort_expr_named, test_scan_with_ordering,
3939
};
4040

4141
#[test]
@@ -996,3 +996,91 @@ fn test_sort_pushdown_with_test_scan_arbitrary_ordering() {
996996
"
997997
);
998998
}
999+
1000+
// ============================================================================
1001+
// EXACT PUSHDOWN TESTS (source guarantees ordering, SortExec removed)
1002+
// ============================================================================
1003+
1004+
#[test]
1005+
fn test_sort_pushdown_exact_no_fetch_no_limit() {
1006+
// When a source returns Exact (without fetch), the SortExec should be
1007+
// removed entirely with no GlobalLimitExec wrapper.
1008+
let schema = schema();
1009+
let a = sort_expr("a", &schema);
1010+
let b = sort_expr("b", &schema);
1011+
let source =
1012+
Arc::new(TestScan::new(schema.clone(), vec![]).with_exact_pushdown(true));
1013+
1014+
let ordering = LexOrdering::new(vec![a, b.reverse()]).unwrap();
1015+
let plan = sort_exec(ordering, source);
1016+
1017+
insta::assert_snapshot!(
1018+
OptimizationTest::new(plan, PushdownSort::new(), true),
1019+
@r"
1020+
OptimizationTest:
1021+
input:
1022+
- SortExec: expr=[a@0 ASC, b@1 DESC NULLS LAST], preserve_partitioning=[false]
1023+
- TestScan
1024+
output:
1025+
Ok:
1026+
- TestScan: requested_ordering=[a@0 ASC, b@1 DESC NULLS LAST]
1027+
"
1028+
);
1029+
}
1030+
1031+
#[test]
1032+
fn test_sort_pushdown_exact_preserves_fetch_with_global_limit() {
1033+
// When a source returns Exact but does NOT support with_fetch(),
1034+
// the optimizer must wrap the result with GlobalLimitExec to preserve
1035+
// the LIMIT from the eliminated SortExec.
1036+
let schema = schema();
1037+
let a = sort_expr("a", &schema);
1038+
let source =
1039+
Arc::new(TestScan::new(schema.clone(), vec![]).with_exact_pushdown(true));
1040+
1041+
let ordering = LexOrdering::new(vec![a]).unwrap();
1042+
let plan = sort_exec_with_fetch(ordering, Some(10), source);
1043+
1044+
insta::assert_snapshot!(
1045+
OptimizationTest::new(plan, PushdownSort::new(), true),
1046+
@r"
1047+
OptimizationTest:
1048+
input:
1049+
- SortExec: TopK(fetch=10), expr=[a@0 ASC], preserve_partitioning=[false]
1050+
- TestScan
1051+
output:
1052+
Ok:
1053+
- GlobalLimitExec: skip=0, fetch=10
1054+
- TestScan: requested_ordering=[a@0 ASC]
1055+
"
1056+
);
1057+
}
1058+
1059+
#[test]
1060+
fn test_sort_pushdown_exact_preserves_fetch_with_source_support() {
1061+
// When a source returns Exact AND supports with_fetch(),
1062+
// the limit should be pushed into the source directly (no GlobalLimitExec).
1063+
let schema = schema();
1064+
let a = sort_expr("a", &schema);
1065+
let source = Arc::new(
1066+
TestScan::new(schema.clone(), vec![])
1067+
.with_exact_pushdown(true)
1068+
.with_supports_fetch(true),
1069+
);
1070+
1071+
let ordering = LexOrdering::new(vec![a]).unwrap();
1072+
let plan = sort_exec_with_fetch(ordering, Some(10), source);
1073+
1074+
insta::assert_snapshot!(
1075+
OptimizationTest::new(plan, PushdownSort::new(), true),
1076+
@r"
1077+
OptimizationTest:
1078+
input:
1079+
- SortExec: TopK(fetch=10), expr=[a@0 ASC], preserve_partitioning=[false]
1080+
- TestScan
1081+
output:
1082+
Ok:
1083+
- TestScan: requested_ordering=[a@0 ASC], fetch=10
1084+
"
1085+
);
1086+
}

datafusion/core/tests/physical_optimizer/test_utils.rs

Lines changed: 75 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -830,15 +830,30 @@ pub fn sort_expr_named(name: &str, index: usize) -> PhysicalSortExpr {
830830
}
831831
}
832832

833-
/// A test data source that can display any requested ordering
834-
/// This is useful for testing sort pushdown behavior
833+
/// A test data source that can display any requested ordering.
834+
/// This is useful for testing sort pushdown behavior.
835+
///
836+
/// ## Configuration
837+
///
838+
/// - `exact_pushdown`: if `true`, `try_pushdown_sort` returns `Exact`
839+
/// (source guarantees ordering, SortExec can be removed); if `false`
840+
/// (default), returns `Inexact` (SortExec kept).
841+
/// - `supports_fetch`: if `true`, `with_fetch()` returns `Some` so the
842+
/// optimizer can push a LIMIT into the source; if `false` (default),
843+
/// `with_fetch()` returns `None`, forcing a `GlobalLimitExec` wrapper.
835844
#[derive(Debug, Clone)]
836845
pub struct TestScan {
837846
schema: SchemaRef,
838847
output_ordering: Vec<LexOrdering>,
839848
plan_properties: Arc<PlanProperties>,
840849
// Store the requested ordering for display
841850
requested_ordering: Option<LexOrdering>,
851+
/// If true, `try_pushdown_sort` returns `Exact` instead of `Inexact`.
852+
exact_pushdown: bool,
853+
/// If true, `with_fetch()` returns `Some(...)` (source absorbs the limit).
854+
supports_fetch: bool,
855+
/// The fetch (LIMIT) value pushed into this scan via `with_fetch()`.
856+
fetch: Option<usize>,
842857
}
843858

844859
impl TestScan {
@@ -872,41 +887,60 @@ impl TestScan {
872887
output_ordering,
873888
plan_properties: Arc::new(plan_properties),
874889
requested_ordering: None,
890+
exact_pushdown: false,
891+
supports_fetch: false,
892+
fetch: None,
875893
}
876894
}
877895

878896
/// Create a TestScan with a single output ordering
879897
pub fn with_ordering(schema: SchemaRef, ordering: LexOrdering) -> Self {
880898
Self::new(schema, vec![ordering])
881899
}
900+
901+
/// Set whether `try_pushdown_sort` returns `Exact` (true) or `Inexact` (false).
902+
pub fn with_exact_pushdown(mut self, exact: bool) -> Self {
903+
self.exact_pushdown = exact;
904+
self
905+
}
906+
907+
/// Set whether `with_fetch()` returns `Some` (true) or `None` (false).
908+
pub fn with_supports_fetch(mut self, supports: bool) -> Self {
909+
self.supports_fetch = supports;
910+
self
911+
}
882912
}
883913

884914
impl DisplayAs for TestScan {
885915
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
886916
match t {
887917
DisplayFormatType::Default | DisplayFormatType::Verbose => {
888918
write!(f, "TestScan")?;
919+
let mut sep = ": ";
889920
if !self.output_ordering.is_empty() {
890-
write!(f, ": output_ordering=[")?;
891-
// Format the ordering in a readable way
921+
write!(f, "{sep}output_ordering=[")?;
892922
for (i, sort_expr) in self.output_ordering[0].iter().enumerate() {
893923
if i > 0 {
894924
write!(f, ", ")?;
895925
}
896926
write!(f, "{sort_expr}")?;
897927
}
898928
write!(f, "]")?;
929+
sep = ", ";
899930
}
900-
// This is the key part - show what ordering was requested
901931
if let Some(ref req) = self.requested_ordering {
902-
write!(f, ", requested_ordering=[")?;
932+
write!(f, "{sep}requested_ordering=[")?;
903933
for (i, sort_expr) in req.iter().enumerate() {
904934
if i > 0 {
905935
write!(f, ", ")?;
906936
}
907937
write!(f, "{sort_expr}")?;
908938
}
909939
write!(f, "]")?;
940+
sep = ", ";
941+
}
942+
if let Some(fetch) = self.fetch {
943+
write!(f, "{sep}fetch={fetch}")?;
910944
}
911945
Ok(())
912946
}
@@ -953,6 +987,20 @@ impl ExecutionPlan for TestScan {
953987
Ok(Arc::new(Statistics::new_unknown(&self.schema)))
954988
}
955989

990+
fn with_fetch(&self, fetch: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
991+
if self.supports_fetch {
992+
let mut new_scan = self.clone();
993+
new_scan.fetch = fetch;
994+
Some(Arc::new(new_scan))
995+
} else {
996+
None
997+
}
998+
}
999+
1000+
fn fetch(&self) -> Option<usize> {
1001+
self.fetch
1002+
}
1003+
9561004
// This is the key method - implement sort pushdown
9571005
fn try_pushdown_sort(
9581006
&self,
@@ -965,10 +1013,27 @@ impl ExecutionPlan for TestScan {
9651013
let mut new_scan = self.clone();
9661014
new_scan.requested_ordering = requested_ordering;
9671015

968-
// Always return Inexact to keep the Sort node (like Phase 1 behavior)
969-
Ok(SortOrderPushdownResult::Inexact {
970-
inner: Arc::new(new_scan),
971-
})
1016+
if self.exact_pushdown {
1017+
// Update plan properties to reflect the guaranteed ordering
1018+
let orderings: Vec<Vec<PhysicalSortExpr>> = vec![order.to_vec()];
1019+
let eq_properties = EquivalenceProperties::new_with_orderings(
1020+
Arc::clone(&self.schema),
1021+
orderings,
1022+
);
1023+
new_scan.plan_properties = Arc::new(PlanProperties::new(
1024+
eq_properties,
1025+
Partitioning::UnknownPartitioning(1),
1026+
EmissionType::Incremental,
1027+
Boundedness::Bounded,
1028+
));
1029+
Ok(SortOrderPushdownResult::Exact {
1030+
inner: Arc::new(new_scan),
1031+
})
1032+
} else {
1033+
Ok(SortOrderPushdownResult::Inexact {
1034+
inner: Arc::new(new_scan),
1035+
})
1036+
}
9721037
}
9731038

9741039
fn apply_expressions(

datafusion/physical-optimizer/src/pushdown_sort.rs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
6161
use datafusion_physical_plan::ExecutionPlan;
6262
use datafusion_physical_plan::SortOrderPushdownResult;
6363
use datafusion_physical_plan::buffer::BufferExec;
64+
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
6465
use datafusion_physical_plan::sorts::sort::SortExec;
6566
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
6667
use std::sync::Arc;
@@ -104,8 +105,12 @@ impl PhysicalOptimizerRule for PushdownSort {
104105
let required_ordering = sort_child.expr();
105106
match sort_input.try_pushdown_sort(required_ordering)? {
106107
SortOrderPushdownResult::Exact { inner } => {
108+
// Preserve fetch (LIMIT) from the eliminated SortExec.
109+
// Use LocalLimitExec (not Global) since input is multi-partition.
107110
let inner = if let Some(fetch) = sort_child.fetch() {
108-
inner.with_fetch(Some(fetch)).unwrap_or(inner)
111+
inner.with_fetch(Some(fetch)).unwrap_or_else(|| {
112+
Arc::new(LocalLimitExec::new(inner, fetch))
113+
})
109114
} else {
110115
inner
111116
};
@@ -149,14 +154,19 @@ impl PhysicalOptimizerRule for PushdownSort {
149154
match sort_input.try_pushdown_sort(required_ordering)? {
150155
SortOrderPushdownResult::Exact { inner } => {
151156
// Data source guarantees perfect ordering - remove the Sort operator.
152-
// Preserve the fetch (LIMIT) from the original SortExec so the
153-
// data source can stop reading early.
154-
let inner = if let Some(fetch) = sort_exec.fetch() {
155-
inner.with_fetch(Some(fetch)).unwrap_or(inner)
157+
//
158+
// If the SortExec carried a fetch (LIMIT), we must preserve it.
159+
// First try pushing the limit into the source via `with_fetch()`.
160+
// If the source doesn't support `with_fetch`, fall back to
161+
// wrapping with GlobalLimitExec.
162+
if let Some(fetch) = sort_exec.fetch() {
163+
let inner = inner.with_fetch(Some(fetch)).unwrap_or_else(|| {
164+
Arc::new(GlobalLimitExec::new(inner, 0, Some(fetch)))
165+
});
166+
Ok(Transformed::yes(inner))
156167
} else {
157-
inner
158-
};
159-
Ok(Transformed::yes(inner))
168+
Ok(Transformed::yes(inner))
169+
}
160170
}
161171
SortOrderPushdownResult::Inexact { inner } => {
162172
// Data source is optimized for the ordering but not perfectly sorted

0 commit comments

Comments
 (0)