Skip to content

Commit 0bec631

Browse files
committed
Unify dynamic filter finalize terminal states
1 parent 1edf536 commit 0bec631

1 file changed

Lines changed: 18 additions & 8 deletions

File tree

datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ enum AccumulatedBuildData {
266266
completed_partitions: usize,
267267
},
268268
CollectLeft {
269-
data: Option<PartitionData>,
269+
data: PartitionStatus,
270270
reported_count: usize,
271271
expected_reports: usize,
272272
},
@@ -293,7 +293,7 @@ enum PartitionStatus {
293293
#[derive(Clone)]
294294
enum FinalizeInput {
295295
Partitioned(Vec<PartitionStatus>),
296-
CollectLeft(Option<PartitionData>),
296+
CollectLeft(PartitionStatus),
297297
}
298298

299299
impl SharedBuildAccumulator {
@@ -356,7 +356,7 @@ impl SharedBuildAccumulator {
356356
completed_partitions: 0,
357357
},
358358
PartitionMode::CollectLeft => AccumulatedBuildData::CollectLeft {
359-
data: None,
359+
data: PartitionStatus::Pending,
360360
reported_count: 0,
361361
expected_reports: expected_calls,
362362
},
@@ -447,8 +447,8 @@ impl SharedBuildAccumulator {
447447
..
448448
},
449449
) => {
450-
if data.is_none() {
451-
*data = Some(PartitionData { pushdown, bounds });
450+
if matches!(data, PartitionStatus::Pending) {
451+
*data = PartitionStatus::Reported(PartitionData { pushdown, bounds });
452452
}
453453
*reported_count += 1;
454454
}
@@ -538,8 +538,8 @@ impl SharedBuildAccumulator {
538538

539539
fn build_filter(&self, finalize_input: FinalizeInput) -> Result<()> {
540540
match finalize_input {
541-
FinalizeInput::CollectLeft(data) => {
542-
if let Some(partition_data) = data {
541+
FinalizeInput::CollectLeft(partition) => match partition {
542+
PartitionStatus::Reported(partition_data) => {
543543
let membership_expr = create_membership_predicate(
544544
&self.on_right,
545545
partition_data.pushdown.clone(),
@@ -561,7 +561,17 @@ impl SharedBuildAccumulator {
561561
self.dynamic_filter.update(filter_expr)?;
562562
}
563563
}
564-
}
564+
PartitionStatus::Pending => {
565+
return datafusion_common::internal_err!(
566+
"attempted to finalize collect-left dynamic filter without reported build data"
567+
);
568+
}
569+
PartitionStatus::CanceledUnknown => {
570+
return datafusion_common::internal_err!(
571+
"collect-left dynamic filter cannot finalize with canceled build data"
572+
);
573+
}
574+
},
565575
FinalizeInput::Partitioned(partitions) => {
566576
let num_partitions = partitions.len();
567577
let routing_hash_expr = Arc::new(HashExpr::new(

0 commit comments

Comments
 (0)