Skip to content

Commit d075e5f

Browse files
committed
fix: resolve HashJoin deadlock with dynamic filtering and empty partitions
Defer short-circuiting to Completed state for empty partitions when dynamic filtering is enabled. This ensures all partitions participate in the SharedBuildAccumulator barrier synchronization before finishing. Made-with: Cursor
1 parent 2818abb commit d075e5f

1 file changed

Lines changed: 4 additions & 6 deletions

File tree

  • datafusion/physical-plan/src/joins/hash_join

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -409,11 +409,11 @@ impl HashJoinStream {
409409
/// Returns the next state after the build side has been fully collected
410410
/// and any required build-side coordination has completed.
411411
fn state_after_build_ready(
412-
join_type: JoinType,
412+
&self,
413413
left_data: &JoinLeftData,
414414
) -> HashJoinStreamState {
415415
if left_data.map().is_empty()
416-
&& join_type.empty_build_side_produces_empty_result()
416+
&& self.join_type.empty_build_side_produces_empty_result()
417417
{
418418
HashJoinStreamState::Completed
419419
} else {
@@ -485,8 +485,7 @@ impl HashJoinStream {
485485
ready!(fut.get_shared(cx))?;
486486
}
487487
let build_side = self.build_side.try_as_ready()?;
488-
self.state =
489-
Self::state_after_build_ready(self.join_type, build_side.left_data.as_ref());
488+
self.state = self.state_after_build_ready(build_side.left_data.as_ref());
490489
Poll::Ready(Ok(StatefulStreamResult::Continue))
491490
}
492491

@@ -557,8 +556,7 @@ impl HashJoinStream {
557556
}));
558557
self.state = HashJoinStreamState::WaitPartitionBoundsReport;
559558
} else {
560-
self.state =
561-
Self::state_after_build_ready(self.join_type, left_data.as_ref());
559+
self.state = self.state_after_build_ready(left_data.as_ref());
562560
}
563561

564562
self.build_side = BuildSide::Ready(BuildSideReadyState { left_data });

0 commit comments

Comments
 (0)