Skip to content

Commit 5ad96d9

Browse files
adriangbclaude
andcommitted
Extract build data reporting into a state transition method
Replaces the manual PartitionBuildData construction + report_build_data call + build_reported flag set in collect_build_side with a single transition_after_build_collected method, making it impossible to forget to report build data when transitioning state. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 7011c5d commit 5ad96d9

1 file changed

Lines changed: 44 additions & 50 deletions

File tree

  • datafusion/physical-plan/src/joins/hash_join

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 44 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,49 @@ impl HashJoinStream {
424424
}
425425
}
426426

427+
/// Transitions state after build-side data has been collected, automatically
428+
/// reporting build data to the accumulator when one is present.
429+
///
430+
/// If a `build_accumulator` is configured, this method constructs the
431+
/// appropriate [`PartitionBuildData`], schedules the reporting future, and
432+
/// returns [`HashJoinStreamState::WaitPartitionBoundsReport`]. Otherwise it
433+
/// delegates to [`Self::state_after_build_ready`].
434+
fn transition_after_build_collected(
435+
&mut self,
436+
left_data: &Arc<JoinLeftData>,
437+
) -> HashJoinStreamState {
438+
let Some(build_accumulator) = self.build_accumulator.as_ref() else {
439+
return Self::state_after_build_ready(self.join_type, left_data.as_ref());
440+
};
441+
442+
let pushdown = left_data.membership().clone();
443+
let bounds = left_data
444+
.bounds
445+
.clone()
446+
.unwrap_or_else(|| PartitionBounds::new(vec![]));
447+
448+
let build_data = match self.mode {
449+
PartitionMode::Partitioned => PartitionBuildData::Partitioned {
450+
partition_id: self.partition,
451+
pushdown,
452+
bounds,
453+
},
454+
PartitionMode::CollectLeft => {
455+
PartitionBuildData::CollectLeft { pushdown, bounds }
456+
}
457+
PartitionMode::Auto => unreachable!(
458+
"PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!"
459+
),
460+
};
461+
462+
let acc = Arc::clone(build_accumulator);
463+
self.build_waiter = Some(OnceFut::new(async move {
464+
acc.report_build_data(build_data).await
465+
}));
466+
self.build_reported = true;
467+
HashJoinStreamState::WaitPartitionBoundsReport
468+
}
469+
427470
/// Separate implementation function that unpins the [`HashJoinStream`] so
428471
/// that partial borrows work correctly
429472
fn poll_next_impl(
@@ -514,56 +557,7 @@ impl HashJoinStream {
514557
// not the build side (left). The probe-side NULL check happens during process_probe_batch.
515558
// The probe_side_has_null flag will be set there if any probe batch contains NULL.
516559

517-
// Handle dynamic filter build-side information accumulation
518-
//
519-
// Dynamic filter coordination between partitions:
520-
// Report hash maps (Partitioned mode) or bounds (CollectLeft mode) to the accumulator
521-
// which will handle synchronization and filter updates
522-
if let Some(ref build_accumulator) = self.build_accumulator {
523-
let build_accumulator = Arc::clone(build_accumulator);
524-
525-
let left_side_partition_id = match self.mode {
526-
PartitionMode::Partitioned => self.partition,
527-
PartitionMode::CollectLeft => 0,
528-
PartitionMode::Auto => unreachable!(
529-
"PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!"
530-
),
531-
};
532-
533-
// Determine pushdown strategy based on availability of InList values
534-
let pushdown = left_data.membership().clone();
535-
536-
// Construct the appropriate build data enum variant based on partition mode
537-
let build_data = match self.mode {
538-
PartitionMode::Partitioned => PartitionBuildData::Partitioned {
539-
partition_id: left_side_partition_id,
540-
pushdown,
541-
bounds: left_data
542-
.bounds
543-
.clone()
544-
.unwrap_or_else(|| PartitionBounds::new(vec![])),
545-
},
546-
PartitionMode::CollectLeft => PartitionBuildData::CollectLeft {
547-
pushdown,
548-
bounds: left_data
549-
.bounds
550-
.clone()
551-
.unwrap_or_else(|| PartitionBounds::new(vec![])),
552-
},
553-
PartitionMode::Auto => unreachable!(
554-
"PartitionMode::Auto should not be present at execution time"
555-
),
556-
};
557-
558-
self.build_waiter = Some(OnceFut::new(async move {
559-
build_accumulator.report_build_data(build_data).await
560-
}));
561-
self.build_reported = true;
562-
self.state = HashJoinStreamState::WaitPartitionBoundsReport;
563-
} else {
564-
self.state =
565-
Self::state_after_build_ready(self.join_type, left_data.as_ref());
566-
}
560+
self.state = self.transition_after_build_collected(&left_data);
567561

568562
self.build_side = BuildSide::Ready(BuildSideReadyState { left_data });
569563
Poll::Ready(Ok(StatefulStreamResult::Continue))

0 commit comments

Comments
 (0)