Skip to content

Commit 0e83891

Browse files
committed
Add fetch method to LogicalPlan
1 parent 475de9f commit 0e83891

2 files changed

Lines changed: 40 additions & 14 deletions

File tree

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1392,6 +1392,40 @@ impl LogicalPlan {
13921392
}
13931393
}
13941394

1395+
/// Returns the fetch (limit) of this plan node, if it has one.
1396+
///
1397+
/// Only [`LogicalPlan::Sort`] and [`LogicalPlan::TableScan`] carry a fetch
1398+
/// value directly; all other variants return `None`.
1399+
pub fn fetch(&self) -> Option<usize> {
1400+
match self {
1401+
LogicalPlan::Sort(Sort { fetch, .. }) => *fetch,
1402+
LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch,
1403+
LogicalPlan::Projection(_) => None,
1404+
LogicalPlan::Filter(_) => None,
1405+
LogicalPlan::Window(_) => None,
1406+
LogicalPlan::Aggregate(_) => None,
1407+
LogicalPlan::Join(_) => None,
1408+
LogicalPlan::Repartition(_) => None,
1409+
LogicalPlan::Union(_) => None,
1410+
LogicalPlan::EmptyRelation(_) => None,
1411+
LogicalPlan::Subquery(_) => None,
1412+
LogicalPlan::SubqueryAlias(_) => None,
1413+
LogicalPlan::Limit(_) => None,
1414+
LogicalPlan::Statement(_) => None,
1415+
LogicalPlan::Values(_) => None,
1416+
LogicalPlan::Explain(_) => None,
1417+
LogicalPlan::Analyze(_) => None,
1418+
LogicalPlan::Extension(_) => None,
1419+
LogicalPlan::Distinct(_) => None,
1420+
LogicalPlan::Dml(_) => None,
1421+
LogicalPlan::Ddl(_) => None,
1422+
LogicalPlan::Copy(_) => None,
1423+
LogicalPlan::DescribeTable(_) => None,
1424+
LogicalPlan::Unnest(_) => None,
1425+
LogicalPlan::RecursiveQuery(_) => None,
1426+
}
1427+
}
1428+
13951429
/// If this node's expressions contains any references to an outer subquery
13961430
pub fn contains_outer_reference(&self) -> bool {
13971431
let mut contains = false;

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -796,6 +796,12 @@ impl OptimizerRule for PushDownFilter {
796796
filter.predicate = new_predicate;
797797
}
798798

799+
// If the child has a fetch (limit), pushing a filter below it would
800+
// change semantics: the limit should apply before the filter, not after.
801+
if filter.input.fetch().is_some() {
802+
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
803+
}
804+
799805
match Arc::unwrap_or_clone(filter.input) {
800806
LogicalPlan::Filter(child_filter) => {
801807
let parents_predicates = split_conjunction_owned(filter.predicate);
@@ -832,13 +838,6 @@ impl OptimizerRule for PushDownFilter {
832838
insert_below(LogicalPlan::Distinct(distinct), new_filter)
833839
}
834840
LogicalPlan::Sort(sort) => {
835-
// If the sort has a fetch (limit), pushing a filter below
836-
// it would change semantics: the limit should apply before
837-
// the filter, not after.
838-
if sort.fetch.is_some() {
839-
filter.input = Arc::new(LogicalPlan::Sort(sort));
840-
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
841-
}
842841
let new_filter =
843842
Filter::try_new(filter.predicate, Arc::clone(&sort.input))
844843
.map(LogicalPlan::Filter)?;
@@ -1137,13 +1136,6 @@ impl OptimizerRule for PushDownFilter {
11371136
}
11381137
LogicalPlan::Join(join) => push_down_join(join, Some(&filter.predicate)),
11391138
LogicalPlan::TableScan(scan) => {
1140-
// If the scan has a fetch (limit), pushing filters into it
1141-
// would change semantics: the limit should apply before the
1142-
// filter, not after.
1143-
if scan.fetch.is_some() {
1144-
filter.input = Arc::new(LogicalPlan::TableScan(scan));
1145-
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
1146-
}
11471139
let filter_predicates = split_conjunction(&filter.predicate);
11481140

11491141
let (volatile_filters, non_volatile_filters): (Vec<&Expr>, Vec<&Expr>) =

0 commit comments

Comments
 (0)