1919//! into a single partition
2020
2121use std:: any:: Any ;
22+ use std:: pin:: Pin ;
2223use std:: sync:: Arc ;
24+ use std:: task:: { Context , Poll } ;
2325
2426use super :: metrics:: { BaselineMetrics , ExecutionPlanMetricsSet , MetricsSet } ;
2527use super :: stream:: { ObservedStream , RecordBatchReceiverStream } ;
2628use super :: {
27- DisplayAs , ExecutionPlanProperties , PlanProperties , SendableRecordBatchStream ,
28- Statistics ,
29+ DisplayAs , ExecutionPlanProperties , PlanProperties , RecordBatchStream ,
30+ SendableRecordBatchStream , Statistics ,
2931} ;
32+ use crate :: coalesce:: { LimitedBatchCoalescer , PushBatchStatus } ;
3033use crate :: execution_plan:: { CardinalityEffect , EvaluationType , SchedulingType } ;
3134use crate :: filter_pushdown:: { FilterDescription , FilterPushdownPhase } ;
3235use crate :: projection:: { ProjectionExec , make_with_child} ;
3336use crate :: sort_pushdown:: SortOrderPushdownResult ;
3437use crate :: { DisplayFormatType , ExecutionPlan , Partitioning , check_if_same_properties} ;
3538use datafusion_physical_expr_common:: sort_expr:: PhysicalSortExpr ;
3639
40+ use arrow:: datatypes:: SchemaRef ;
41+ use arrow:: record_batch:: RecordBatch ;
3742use datafusion_common:: config:: ConfigOptions ;
3843use datafusion_common:: tree_node:: TreeNodeRecursion ;
3944use datafusion_common:: { Result , assert_eq_or_internal_err, internal_err} ;
4045use datafusion_execution:: TaskContext ;
4146use datafusion_physical_expr:: PhysicalExpr ;
47+ use futures:: ready;
48+ use futures:: stream:: { Stream , StreamExt } ;
4249
4350/// Merge execution plan executes partitions in parallel and combines them into a single
4451/// partition. No guarantees are made about the order of the resulting partition.
@@ -214,6 +221,8 @@ impl ExecutionPlan for CoalescePartitionsExec {
214221 let elapsed_compute = baseline_metrics. elapsed_compute ( ) . clone ( ) ;
215222 let _timer = elapsed_compute. timer ( ) ;
216223
224+ let batch_size = context. session_config ( ) . batch_size ( ) ;
225+
217226 // use a stream that allows each sender to put in at
218227 // least one result in an attempt to maximize
219228 // parallelism.
@@ -231,11 +240,23 @@ impl ExecutionPlan for CoalescePartitionsExec {
231240 }
232241
233242 let stream = builder. build ( ) ;
234- Ok ( Box :: pin ( ObservedStream :: new (
235- stream,
236- baseline_metrics,
237- self . fetch ,
238- ) ) )
243+ // Coalesce small batches from multiple partitions into
244+ // larger batches of target_batch_size. This improves
245+ // downstream performance (e.g. hash join build side
246+ // benefits from fewer, larger batches).
247+ Ok ( Box :: pin ( CoalescedStream {
248+ input : Box :: pin ( ObservedStream :: new (
249+ stream,
250+ baseline_metrics,
251+ self . fetch ,
252+ ) ) ,
253+ coalescer : LimitedBatchCoalescer :: new (
254+ self . schema ( ) ,
255+ batch_size,
256+ None , // fetch is already handled by ObservedStream
257+ ) ,
258+ completed : false ,
259+ } ) )
239260 }
240261 }
241262 }
@@ -352,6 +373,55 @@ impl ExecutionPlan for CoalescePartitionsExec {
352373 }
353374}
354375
376+ /// Stream that coalesces small batches into larger ones using
377+ /// [`LimitedBatchCoalescer`].
378+ struct CoalescedStream {
379+ input : SendableRecordBatchStream ,
380+ coalescer : LimitedBatchCoalescer ,
381+ completed : bool ,
382+ }
383+
384+ impl Stream for CoalescedStream {
385+ type Item = Result < RecordBatch > ;
386+
387+ fn poll_next (
388+ mut self : Pin < & mut Self > ,
389+ cx : & mut Context < ' _ > ,
390+ ) -> Poll < Option < Self :: Item > > {
391+ loop {
392+ if let Some ( batch) = self . coalescer . next_completed_batch ( ) {
393+ return Poll :: Ready ( Some ( Ok ( batch) ) ) ;
394+ }
395+ if self . completed {
396+ return Poll :: Ready ( None ) ;
397+ }
398+ let input_batch = ready ! ( self . input. poll_next_unpin( cx) ) ;
399+ match input_batch {
400+ None => {
401+ self . completed = true ;
402+ self . coalescer . finish ( ) ?;
403+ }
404+ Some ( Ok ( batch) ) => {
405+ match self . coalescer . push_batch ( batch) ? {
406+ PushBatchStatus :: Continue => { }
407+ PushBatchStatus :: LimitReached => {
408+ self . completed = true ;
409+ self . coalescer . finish ( ) ?;
410+ }
411+ }
412+ }
413+ other => return Poll :: Ready ( other) ,
414+ }
415+ }
416+ }
417+ }
418+
419+ impl RecordBatchStream for CoalescedStream {
420+ fn schema ( & self ) -> SchemaRef {
421+ self . coalescer . schema ( )
422+ }
423+ }
424+
355425#[ cfg( test) ]
356426mod tests {
357427 use super :: * ;
@@ -383,10 +453,9 @@ mod tests {
383453 1
384454 ) ;
385455
386- // the result should contain 4 batches (one per input partition )
456+ // the result should contain all rows (coalesced into fewer batches )
387457 let iter = merge. execute ( 0 , task_ctx) ?;
388458 let batches = common:: collect ( iter) . await ?;
389- assert_eq ! ( batches. len( ) , num_partitions) ;
390459
391460 // there should be a total of 400 rows (100 per each partition)
392461 let row_count: usize = batches. iter ( ) . map ( |batch| batch. num_rows ( ) ) . sum ( ) ;
0 commit comments