Skip to content

Commit 8e4c6eb

Browse files
committed
Refactor post-build transition handling
Extract duplicate post-build transition logic into next_state_after_build_ready in stream.rs. This centralizes the decision between Completed and FetchProbeBatch in one location and streamlines both collect_build_side and wait_for_partition_bounds_report to use the new helper function.
1 parent 39c22f5 commit 8e4c6eb

1 file changed

Lines changed: 12 additions & 14 deletions

File tree

  • datafusion/physical-plan/src/joins/hash_join

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,16 @@ impl HashJoinStream {
421421
)
422422
}
423423

424+
/// Returns the next state after the build side has been fully collected
425+
/// and any required build-side coordination has completed.
426+
fn next_state_after_build_ready(&self, left_data: &JoinLeftData) -> HashJoinStreamState {
427+
if left_data.map().is_empty() && self.can_skip_probe_on_empty_build_side() {
428+
HashJoinStreamState::Completed
429+
} else {
430+
HashJoinStreamState::FetchProbeBatch
431+
}
432+
}
433+
424434
/// Separate implementation function that unpins the [`HashJoinStream`] so
425435
/// that partial borrows work correctly
426436
fn poll_next_impl(
@@ -485,13 +495,7 @@ impl HashJoinStream {
485495
ready!(fut.get_shared(cx))?;
486496
}
487497
let build_side = self.build_side.try_as_ready()?;
488-
self.state = if build_side.left_data.map().is_empty()
489-
&& self.can_skip_probe_on_empty_build_side()
490-
{
491-
HashJoinStreamState::Completed
492-
} else {
493-
HashJoinStreamState::FetchProbeBatch
494-
};
498+
self.state = self.next_state_after_build_ready(build_side.left_data.as_ref());
495499
Poll::Ready(Ok(StatefulStreamResult::Continue))
496500
}
497501

@@ -562,13 +566,7 @@ impl HashJoinStream {
562566
}));
563567
self.state = HashJoinStreamState::WaitPartitionBoundsReport;
564568
} else {
565-
self.state = if left_data.map().is_empty()
566-
&& self.can_skip_probe_on_empty_build_side()
567-
{
568-
HashJoinStreamState::Completed
569-
} else {
570-
HashJoinStreamState::FetchProbeBatch
571-
};
569+
self.state = self.next_state_after_build_ready(left_data.as_ref());
572570
}
573571

574572
self.build_side = BuildSide::Ready(BuildSideReadyState { left_data });

0 commit comments

Comments
 (0)