@@ -266,7 +266,7 @@ enum AccumulatedBuildData {
266266 completed_partitions : usize ,
267267 } ,
268268 CollectLeft {
269- data : Option < PartitionData > ,
269+ data : PartitionStatus ,
270270 reported_count : usize ,
271271 expected_reports : usize ,
272272 } ,
@@ -293,7 +293,7 @@ enum PartitionStatus {
293293#[ derive( Clone ) ]
294294enum FinalizeInput {
295295 Partitioned ( Vec < PartitionStatus > ) ,
296- CollectLeft ( Option < PartitionData > ) ,
296+ CollectLeft ( PartitionStatus ) ,
297297}
298298
299299impl SharedBuildAccumulator {
@@ -356,7 +356,7 @@ impl SharedBuildAccumulator {
356356 completed_partitions : 0 ,
357357 } ,
358358 PartitionMode :: CollectLeft => AccumulatedBuildData :: CollectLeft {
359- data : None ,
359+ data : PartitionStatus :: Pending ,
360360 reported_count : 0 ,
361361 expected_reports : expected_calls,
362362 } ,
@@ -447,8 +447,8 @@ impl SharedBuildAccumulator {
447447 ..
448448 } ,
449449 ) => {
450- if data . is_none ( ) {
451- * data = Some ( PartitionData { pushdown, bounds } ) ;
450+ if matches ! ( data , PartitionStatus :: Pending ) {
451+ * data = PartitionStatus :: Reported ( PartitionData { pushdown, bounds } ) ;
452452 }
453453 * reported_count += 1 ;
454454 }
@@ -538,8 +538,8 @@ impl SharedBuildAccumulator {
538538
539539 fn build_filter ( & self , finalize_input : FinalizeInput ) -> Result < ( ) > {
540540 match finalize_input {
541- FinalizeInput :: CollectLeft ( data ) => {
542- if let Some ( partition_data) = data {
541+ FinalizeInput :: CollectLeft ( partition ) => match partition {
542+ PartitionStatus :: Reported ( partition_data) => {
543543 let membership_expr = create_membership_predicate (
544544 & self . on_right ,
545545 partition_data. pushdown . clone ( ) ,
@@ -561,7 +561,17 @@ impl SharedBuildAccumulator {
561561 self . dynamic_filter . update ( filter_expr) ?;
562562 }
563563 }
564- }
564+ PartitionStatus :: Pending => {
565+ return datafusion_common:: internal_err!(
566+ "attempted to finalize collect-left dynamic filter without reported build data"
567+ ) ;
568+ }
569+ PartitionStatus :: CanceledUnknown => {
570+ return datafusion_common:: internal_err!(
571+ "collect-left dynamic filter cannot finalize with canceled build data"
572+ ) ;
573+ }
574+ } ,
565575 FinalizeInput :: Partitioned ( partitions) => {
566576 let num_partitions = partitions. len ( ) ;
567577 let routing_hash_expr = Arc :: new ( HashExpr :: new (
0 commit comments