Skip to content

Commit 39c22f5

Browse files
committed
Optimize HashJoinStream for empty build side
Implement a staged mini-plan for HashJoinStream to immediately exit when the build side is empty and the join type's result is fully determined. This change avoids unnecessary entry into FetchProbeBatch for Inner, Left, LeftSemi, LeftAnti, LeftMark, and RightSemi joins without filters. Add tests to verify join behavior with empty build: - join_does_not_consume_probe_when_empty_build_fixes_output - join_still_consumes_probe_when_empty_build_needs_probe_rows These use MockExec to distinguish between short-circuiting and necessary probe row consumption.
1 parent d138c36 commit 39c22f5

2 files changed

Lines changed: 133 additions & 2 deletions

File tree

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4983,6 +4983,109 @@ mod tests {
49834983
}
49844984
}
49854985

4986+
#[tokio::test]
4987+
async fn join_does_not_consume_probe_when_empty_build_fixes_output() {
4988+
let left_batch =
4989+
build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![]));
4990+
let left_schema = left_batch.schema();
4991+
4992+
let err = exec_err!("bad data error");
4993+
let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
4994+
4995+
let on = vec![(
4996+
Arc::new(Column::new_with_schema("b1", &left_schema).unwrap()) as _,
4997+
Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
4998+
)];
4999+
let schema = right.schema();
5000+
let right_input =
5001+
Arc::new(MockExec::new(vec![Ok(right), err], schema).with_use_task(false));
5002+
let left: Arc<dyn ExecutionPlan> = TestMemoryExec::try_new_exec(
5003+
&[vec![left_batch]],
5004+
Arc::clone(&left_schema),
5005+
None,
5006+
)
5007+
.unwrap();
5008+
5009+
let join_types = vec![
5010+
JoinType::Inner,
5011+
JoinType::Left,
5012+
JoinType::LeftSemi,
5013+
JoinType::LeftAnti,
5014+
JoinType::LeftMark,
5015+
JoinType::RightSemi,
5016+
];
5017+
5018+
for join_type in join_types {
5019+
let join = join(
5020+
Arc::clone(&left),
5021+
Arc::clone(&right_input) as Arc<dyn ExecutionPlan>,
5022+
on.clone(),
5023+
&join_type,
5024+
NullEquality::NullEqualsNothing,
5025+
)
5026+
.unwrap();
5027+
let task_ctx = Arc::new(TaskContext::default());
5028+
5029+
let stream = join.execute(0, task_ctx).unwrap();
5030+
let batches = common::collect(stream).await.unwrap();
5031+
5032+
assert!(
5033+
batches.is_empty(),
5034+
"expected no output batches for {join_type}, got {batches:?}"
5035+
);
5036+
}
5037+
}
5038+
5039+
#[tokio::test]
5040+
async fn join_still_consumes_probe_when_empty_build_needs_probe_rows() {
5041+
let left_batch =
5042+
build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![]));
5043+
let left_schema = left_batch.schema();
5044+
5045+
let err = exec_err!("bad data error");
5046+
let right = build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
5047+
5048+
let on = vec![(
5049+
Arc::new(Column::new_with_schema("b1", &left_schema).unwrap()) as _,
5050+
Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
5051+
)];
5052+
let schema = right.schema();
5053+
let right_input =
5054+
Arc::new(MockExec::new(vec![Ok(right), err], schema).with_use_task(false));
5055+
let left: Arc<dyn ExecutionPlan> = TestMemoryExec::try_new_exec(
5056+
&[vec![left_batch]],
5057+
Arc::clone(&left_schema),
5058+
None,
5059+
)
5060+
.unwrap();
5061+
5062+
let join_types = vec![
5063+
JoinType::Right,
5064+
JoinType::Full,
5065+
JoinType::RightAnti,
5066+
JoinType::RightMark,
5067+
];
5068+
5069+
for join_type in join_types {
5070+
let join = join(
5071+
Arc::clone(&left),
5072+
Arc::clone(&right_input) as Arc<dyn ExecutionPlan>,
5073+
on.clone(),
5074+
&join_type,
5075+
NullEquality::NullEqualsNothing,
5076+
)
5077+
.unwrap();
5078+
let task_ctx = Arc::new(TaskContext::default());
5079+
5080+
let stream = join.execute(0, task_ctx).unwrap();
5081+
let result_string = common::collect(stream).await.unwrap_err().to_string();
5082+
assert!(
5083+
result_string.contains("bad data error"),
5084+
"actual: {result_string}"
5085+
);
5086+
}
5087+
}
5088+
49865089
#[tokio::test]
49875090
async fn join_split_batch() {
49885091
let left = build_table(

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,21 @@ impl HashJoinStream {
406406
}
407407
}
408408

409+
/// Returns true when an empty build side fully determines the join result,
410+
/// so the probe side does not need to be consumed.
411+
fn can_skip_probe_on_empty_build_side(&self) -> bool {
412+
self.filter.is_none()
413+
&& matches!(
414+
self.join_type,
415+
JoinType::Inner
416+
| JoinType::Left
417+
| JoinType::LeftSemi
418+
| JoinType::LeftAnti
419+
| JoinType::LeftMark
420+
| JoinType::RightSemi
421+
)
422+
}
423+
409424
/// Separate implementation function that unpins the [`HashJoinStream`] so
410425
/// that partial borrows work correctly
411426
fn poll_next_impl(
@@ -469,7 +484,14 @@ impl HashJoinStream {
469484
if let Some(ref mut fut) = self.build_waiter {
470485
ready!(fut.get_shared(cx))?;
471486
}
472-
self.state = HashJoinStreamState::FetchProbeBatch;
487+
let build_side = self.build_side.try_as_ready()?;
488+
self.state = if build_side.left_data.map().is_empty()
489+
&& self.can_skip_probe_on_empty_build_side()
490+
{
491+
HashJoinStreamState::Completed
492+
} else {
493+
HashJoinStreamState::FetchProbeBatch
494+
};
473495
Poll::Ready(Ok(StatefulStreamResult::Continue))
474496
}
475497

@@ -540,7 +562,13 @@ impl HashJoinStream {
540562
}));
541563
self.state = HashJoinStreamState::WaitPartitionBoundsReport;
542564
} else {
543-
self.state = HashJoinStreamState::FetchProbeBatch;
565+
self.state = if left_data.map().is_empty()
566+
&& self.can_skip_probe_on_empty_build_side()
567+
{
568+
HashJoinStreamState::Completed
569+
} else {
570+
HashJoinStreamState::FetchProbeBatch
571+
};
544572
}
545573

546574
self.build_side = BuildSide::Ready(BuildSideReadyState { left_data });

0 commit comments

Comments
 (0)