@@ -576,65 +576,83 @@ impl SharedBuildAccumulator {
576576 lit ( ScalarValue :: UInt64 ( Some ( num_partitions as u64 ) ) ) ,
577577 ) ) as Arc < dyn PhysicalExpr > ;
578578
579- let when_then_branches = partitions
580- . iter ( )
581- . enumerate ( )
582- . map ( |( partition_id, partition) | -> Result < _ > {
583- let then_expr = match partition {
584- PartitionStatus :: Reported ( partition)
585- if matches ! (
586- partition. pushdown,
587- PushdownStrategy :: Empty
588- ) =>
589- {
590- lit ( false )
591- }
592- PartitionStatus :: Reported ( partition) => {
593- let membership_expr = create_membership_predicate (
594- & self . on_right ,
595- partition. pushdown . clone ( ) ,
596- & HASH_JOIN_SEED ,
597- self . probe_schema . as_ref ( ) ,
598- ) ?;
599- let bounds_expr = create_bounds_predicate (
600- & self . on_right ,
601- & partition. bounds ,
602- ) ;
603- match ( membership_expr, bounds_expr) {
604- ( Some ( membership) , Some ( bounds) ) => Arc :: new (
605- BinaryExpr :: new (
606- bounds,
607- Operator :: And ,
608- membership,
609- ) ,
610- )
611- as Arc < dyn PhysicalExpr > ,
612- ( Some ( membership) , None ) => membership,
613- ( None , Some ( bounds) ) => bounds,
614- ( None , None ) => lit ( true ) ,
615- }
616- }
617- PartitionStatus :: CanceledUnknown => lit ( true ) ,
618- PartitionStatus :: Pending => {
619- return datafusion_common:: internal_err!(
620- "attempted to finalize dynamic filter with pending partition"
579+ let mut real_branches = Vec :: new ( ) ;
580+ let mut empty_partition_ids = Vec :: new ( ) ;
581+ let mut has_canceled_unknown = false ;
582+
583+ for ( partition_id, partition) in partitions. iter ( ) . enumerate ( ) {
584+ match partition {
585+ PartitionStatus :: Reported ( partition)
586+ if matches ! ( partition. pushdown, PushdownStrategy :: Empty ) =>
587+ {
588+ empty_partition_ids. push ( partition_id) ;
589+ }
590+ PartitionStatus :: Reported ( partition) => {
591+ let membership_expr = create_membership_predicate (
592+ & self . on_right ,
593+ partition. pushdown . clone ( ) ,
594+ & HASH_JOIN_SEED ,
595+ self . probe_schema . as_ref ( ) ,
596+ ) ?;
597+ let bounds_expr = create_bounds_predicate (
598+ & self . on_right ,
599+ & partition. bounds ,
600+ ) ;
601+ let then_expr = match ( membership_expr, bounds_expr) {
602+ ( Some ( membership) , Some ( bounds) ) => Arc :: new (
603+ BinaryExpr :: new ( bounds, Operator :: And , membership) ,
621604 )
622- }
623- } ;
624- Ok ( (
625- lit ( ScalarValue :: UInt64 ( Some ( partition_id as u64 ) ) ) ,
626- then_expr,
627- ) )
628- } )
629- . collect :: < Result < Vec < _ > > > ( ) ?;
630-
631- let filter_expr = if when_then_branches. len ( ) == 1 {
632- Arc :: clone ( & when_then_branches[ 0 ] . 1 )
605+ as Arc < dyn PhysicalExpr > ,
606+ ( Some ( membership) , None ) => membership,
607+ ( None , Some ( bounds) ) => bounds,
608+ ( None , None ) => lit ( true ) ,
609+ } ;
610+ real_branches. push ( (
611+ lit ( ScalarValue :: UInt64 ( Some ( partition_id as u64 ) ) ) ,
612+ then_expr,
613+ ) ) ;
614+ }
615+ PartitionStatus :: CanceledUnknown => {
616+ has_canceled_unknown = true ;
617+ }
618+ PartitionStatus :: Pending => {
619+ return datafusion_common:: internal_err!(
620+ "attempted to finalize dynamic filter with pending partition"
621+ ) ;
622+ }
623+ }
624+ }
625+
626+ let filter_expr = if has_canceled_unknown {
627+ let mut when_then_branches = empty_partition_ids
628+ . into_iter ( )
629+ . map ( |partition_id| {
630+ (
631+ lit ( ScalarValue :: UInt64 ( Some ( partition_id as u64 ) ) ) ,
632+ lit ( false ) ,
633+ )
634+ } )
635+ . collect :: < Vec < _ > > ( ) ;
636+ when_then_branches. extend ( real_branches) ;
637+
638+ if when_then_branches. is_empty ( ) {
639+ lit ( true )
640+ } else {
641+ Arc :: new ( CaseExpr :: try_new (
642+ Some ( modulo_expr) ,
643+ when_then_branches,
644+ Some ( lit ( true ) ) ,
645+ ) ?) as Arc < dyn PhysicalExpr >
646+ }
647+ } else if real_branches. is_empty ( ) {
648+ lit ( false )
649+ } else if num_partitions == 1 {
650+ Arc :: clone ( & real_branches[ 0 ] . 1 )
633651 } else {
634652 Arc :: new ( CaseExpr :: try_new (
635653 Some ( modulo_expr) ,
636- when_then_branches ,
637- Some ( lit ( true ) ) ,
654+ real_branches ,
655+ Some ( lit ( false ) ) ,
638656 ) ?) as Arc < dyn PhysicalExpr >
639657 } ;
640658
0 commit comments