@@ -25,7 +25,7 @@ use datafusion_physical_expr::window::{
2525 StandardWindowFunctionExpr , WindowExpr ,
2626} ;
2727use datafusion_physical_plan:: execution_plan:: CardinalityEffect ;
28- use datafusion_physical_plan:: limit:: GlobalLimitExec ;
28+ use datafusion_physical_plan:: limit:: { GlobalLimitExec , LocalLimitExec } ;
2929use datafusion_physical_plan:: repartition:: RepartitionExec ;
3030use datafusion_physical_plan:: sorts:: sort:: SortExec ;
3131use datafusion_physical_plan:: sorts:: sort_preserving_merge:: SortPreservingMergeExec ;
@@ -206,6 +206,12 @@ fn get_limit(node: &Arc<dyn ExecutionPlan>, ctx: &mut TraverseState) -> bool {
206206 ctx. reset_limit ( limit. fetch ( ) . map ( |fetch| fetch + limit. skip ( ) ) ) ;
207207 return true ;
208208 }
209+ // In distributed execution, GlobalLimitExec becomes LocalLimitExec
210+ // per partition. Handle it the same way (LocalLimitExec has no skip).
211+ if let Some ( limit) = node. as_any ( ) . downcast_ref :: < LocalLimitExec > ( ) {
212+ ctx. reset_limit ( Some ( limit. fetch ( ) ) ) ;
213+ return true ;
214+ }
209215 if let Some ( limit) = node. as_any ( ) . downcast_ref :: < SortPreservingMergeExec > ( ) {
210216 ctx. reset_limit ( limit. fetch ( ) ) ;
211217 return true ;
@@ -254,3 +260,114 @@ fn bound_to_usize(bound: &WindowFrameBound) -> Option<usize> {
254260 _ => None ,
255261 }
256262}
263+
264+ #[ cfg( test) ]
265+ mod tests {
266+ use super :: * ;
267+ use arrow:: datatypes:: { DataType , Field , Schema } ;
268+ use datafusion_common:: ScalarValue ;
269+ use datafusion_expr:: { WindowFrame , WindowFrameBound , WindowFrameUnits } ;
270+ use datafusion_functions_window:: row_number:: row_number_udwf;
271+ use datafusion_physical_expr:: expressions:: col;
272+ use datafusion_physical_expr:: window:: StandardWindowExpr ;
273+ use datafusion_physical_expr_common:: sort_expr:: { LexOrdering , PhysicalSortExpr } ;
274+ use datafusion_physical_plan:: InputOrderMode ;
275+ use datafusion_physical_plan:: displayable;
276+ use datafusion_physical_plan:: limit:: LocalLimitExec ;
277+ use datafusion_physical_plan:: placeholder_row:: PlaceholderRowExec ;
278+ use datafusion_physical_plan:: windows:: {
279+ BoundedWindowAggExec , create_udwf_window_expr,
280+ } ;
281+ use insta:: assert_snapshot;
282+ use std:: sync:: Arc ;
283+
284+ fn plan_str ( plan : & dyn ExecutionPlan ) -> String {
285+ displayable ( plan) . indent ( true ) . to_string ( )
286+ }
287+
288+ fn schema ( ) -> Arc < Schema > {
289+ Arc :: new ( Schema :: new ( vec ! [ Field :: new( "a" , DataType :: Int64 , false ) ] ) )
290+ }
291+
292+ /// Build: LocalLimitExec or GlobalLimitExec → BoundedWindowAggExec(row_number) → SortExec
293+ fn build_window_plan (
294+ use_local_limit : bool ,
295+ ) -> datafusion_common:: Result < Arc < dyn ExecutionPlan > > {
296+ let s = schema ( ) ;
297+ let input: Arc < dyn ExecutionPlan > =
298+ Arc :: new ( PlaceholderRowExec :: new ( Arc :: clone ( & s) ) ) ;
299+
300+ let ordering =
301+ LexOrdering :: new ( vec ! [ PhysicalSortExpr :: new_default( col( "a" , & s) ?) . asc( ) ] )
302+ . unwrap ( ) ;
303+
304+ let sort: Arc < dyn ExecutionPlan > = Arc :: new (
305+ SortExec :: new ( ordering. clone ( ) , input) . with_preserve_partitioning ( true ) ,
306+ ) ;
307+
308+ let window_expr = Arc :: new ( StandardWindowExpr :: new (
309+ create_udwf_window_expr (
310+ & row_number_udwf ( ) ,
311+ & [ ] ,
312+ & s,
313+ "row_number" . to_string ( ) ,
314+ false ,
315+ ) ?,
316+ & [ ] ,
317+ ordering. as_ref ( ) ,
318+ Arc :: new ( WindowFrame :: new_bounds (
319+ WindowFrameUnits :: Rows ,
320+ WindowFrameBound :: Preceding ( ScalarValue :: UInt64 ( None ) ) ,
321+ WindowFrameBound :: CurrentRow ,
322+ ) ) ,
323+ ) ) ;
324+
325+ let window: Arc < dyn ExecutionPlan > = Arc :: new ( BoundedWindowAggExec :: try_new (
326+ vec ! [ window_expr] ,
327+ sort,
328+ InputOrderMode :: Sorted ,
329+ true ,
330+ ) ?) ;
331+
332+ let limit: Arc < dyn ExecutionPlan > = if use_local_limit {
333+ Arc :: new ( LocalLimitExec :: new ( window, 100 ) )
334+ } else {
335+ Arc :: new ( GlobalLimitExec :: new ( window, 0 , Some ( 100 ) ) )
336+ } ;
337+
338+ Ok ( limit)
339+ }
340+
341+ fn optimize ( plan : Arc < dyn ExecutionPlan > ) -> Arc < dyn ExecutionPlan > {
342+ let mut config = ConfigOptions :: new ( ) ;
343+ config. optimizer . enable_window_limits = true ;
344+ LimitPushPastWindows :: new ( ) . optimize ( plan, & config) . unwrap ( )
345+ }
346+
347+ /// GlobalLimitExec above a windowed sort should push fetch into the SortExec.
348+ #[ test]
349+ fn global_limit_pushes_past_window ( ) {
350+ let plan = build_window_plan ( false ) . unwrap ( ) ;
351+ let optimized = optimize ( plan) ;
352+ assert_snapshot ! ( plan_str( optimized. as_ref( ) ) , @r#"
353+ GlobalLimitExec: skip=0, fetch=100
354+ BoundedWindowAggExec: wdw=[row_number: Field { "row_number": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
355+ SortExec: TopK(fetch=100), expr=[a@0 ASC], preserve_partitioning=[true]
356+ PlaceholderRowExec
357+ "# ) ;
358+ }
359+
360+ /// LocalLimitExec above a windowed sort should also push fetch into the SortExec.
361+ /// This is the case in distributed execution where GlobalLimitExec becomes LocalLimitExec.
362+ #[ test]
363+ fn local_limit_pushes_past_window ( ) {
364+ let plan = build_window_plan ( true ) . unwrap ( ) ;
365+ let optimized = optimize ( plan) ;
366+ assert_snapshot ! ( plan_str( optimized. as_ref( ) ) , @r#"
367+ LocalLimitExec: fetch=100
368+ BoundedWindowAggExec: wdw=[row_number: Field { "row_number": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
369+ SortExec: TopK(fetch=100), expr=[a@0 ASC], preserve_partitioning=[true]
370+ PlaceholderRowExec
371+ "# ) ;
372+ }
373+ }
0 commit comments