Skip to content

Commit 42ecad5

Browse files
kosiewgabotechs
authored andcommitted
Skip probe-side consumption when hash join build side is empty (apache#21068)
## Which issue does this PR close? * Closes apache#20492. ## Rationale for this change `HashJoinExec` currently continues polling and consuming the probe side even after the build side has completed with zero rows. For join types whose output is guaranteed to be empty when the build side is empty, this work is unnecessary. In practice, it can trigger large avoidable scans and extra compute despite producing no output. This is especially costly for cases such as INNER, LEFT, LEFT SEMI, LEFT ANTI, LEFT MARK, and RIGHT SEMI joins. This change makes the stream state machine aware of that condition so execution can terminate as soon as the build side is known to be empty and no probe rows are needed to determine the final result. The change also preserves the existing behavior for join types that still require probe-side rows even when the build side is empty, such as RIGHT, FULL, RIGHT ANTI, and RIGHT MARK joins. ## What changes are included in this PR? * Added `JoinType::empty_build_side_produces_empty_result` to centralize logic determining when an empty build side guarantees empty output. * Updated `HashJoinStream` state transitions to: * Skip transitioning to `FetchProbeBatch` when the build side is empty and output is deterministically empty. * Immediately complete the stream in such cases. * Refactored logic in `build_batch_empty_build_side` to reuse the new helper method and simplify match branches. * Ensured probe-side consumption still occurs for join types that require probe rows (e.g., RIGHT, FULL). * Added helper `state_after_build_ready` to unify post-build decision logic. * Introduced reusable helper for constructing hash joins with dynamic filters in tests. ## Are these changes tested? Yes, comprehensive tests have been added: * Verified that probe side is **not consumed** when: * Build side is empty * Join type guarantees empty output * Verified that probe side **is still consumed** when required by join semantics (e.g., RIGHT, FULL joins) * Covered both filtered and non-filtered joins * Added tests ensuring correct behavior with dynamic filters * Added regression test ensuring correct behavior after partition bounds reporting These tests validate both correctness and the intended optimization behavior. ## Are there any user-facing changes? No API changes. However, this introduces a performance optimization: * Queries involving joins with empty build sides may complete significantly faster * Reduced unnecessary IO and compute No behavioral changes in query results. ## LLM-generated code disclosure This PR includes LLM-generated code and comments. All LLM-generated content has been manually reviewed and tested. (cherry picked from commit 6c5e241)
1 parent eae7bf4 commit 42ecad5

4 files changed

Lines changed: 260 additions & 83 deletions

File tree

datafusion/common/src/join_type.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,20 @@ impl JoinType {
113113
| JoinType::RightMark
114114
)
115115
}
116+
117+
/// Returns true when an empty build side necessarily produces an empty
118+
/// result for this join type.
119+
pub fn empty_build_side_produces_empty_result(self) -> bool {
120+
matches!(
121+
self,
122+
JoinType::Inner
123+
| JoinType::Left
124+
| JoinType::LeftSemi
125+
| JoinType::LeftAnti
126+
| JoinType::LeftMark
127+
| JoinType::RightSemi
128+
)
129+
}
116130
}
117131

118132
impl Display for JoinType {

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 195 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -2215,6 +2215,110 @@ mod tests {
22152215
)
22162216
}
22172217

2218+
fn empty_build_with_probe_error_inputs()
2219+
-> (Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>, JoinOn) {
2220+
let left_batch =
2221+
build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![]));
2222+
let left_schema = left_batch.schema();
2223+
let left: Arc<dyn ExecutionPlan> = TestMemoryExec::try_new_exec(
2224+
&[vec![left_batch]],
2225+
Arc::clone(&left_schema),
2226+
None,
2227+
)
2228+
.unwrap();
2229+
2230+
let err = exec_err!("bad data error");
2231+
let right_batch =
2232+
build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
2233+
let right_schema = right_batch.schema();
2234+
let on = vec![(
2235+
Arc::new(Column::new_with_schema("b1", &left_schema).unwrap()) as _,
2236+
Arc::new(Column::new_with_schema("b1", &right_schema).unwrap()) as _,
2237+
)];
2238+
let right: Arc<dyn ExecutionPlan> = Arc::new(
2239+
MockExec::new(vec![Ok(right_batch), err], right_schema).with_use_task(false),
2240+
);
2241+
2242+
(left, right, on)
2243+
}
2244+
2245+
async fn assert_empty_build_probe_behavior(
2246+
join_types: &[JoinType],
2247+
expect_probe_error: bool,
2248+
with_filter: bool,
2249+
) {
2250+
let (left, right, on) = empty_build_with_probe_error_inputs();
2251+
let filter = prepare_join_filter();
2252+
2253+
for join_type in join_types {
2254+
let join = if with_filter {
2255+
join_with_filter(
2256+
Arc::clone(&left),
2257+
Arc::clone(&right),
2258+
on.clone(),
2259+
filter.clone(),
2260+
join_type,
2261+
NullEquality::NullEqualsNothing,
2262+
)
2263+
.unwrap()
2264+
} else {
2265+
join(
2266+
Arc::clone(&left),
2267+
Arc::clone(&right),
2268+
on.clone(),
2269+
join_type,
2270+
NullEquality::NullEqualsNothing,
2271+
)
2272+
.unwrap()
2273+
};
2274+
2275+
let result = common::collect(
2276+
join.execute(0, Arc::new(TaskContext::default())).unwrap(),
2277+
)
2278+
.await;
2279+
2280+
if expect_probe_error {
2281+
let result_string = result.unwrap_err().to_string();
2282+
assert!(
2283+
result_string.contains("bad data error"),
2284+
"actual: {result_string}"
2285+
);
2286+
} else {
2287+
let batches = result.unwrap();
2288+
assert!(
2289+
batches.is_empty(),
2290+
"expected no output batches for {join_type}, got {batches:?}"
2291+
);
2292+
}
2293+
}
2294+
}
2295+
2296+
fn hash_join_with_dynamic_filter(
2297+
left: Arc<dyn ExecutionPlan>,
2298+
right: Arc<dyn ExecutionPlan>,
2299+
on: JoinOn,
2300+
join_type: JoinType,
2301+
) -> Result<(HashJoinExec, Arc<DynamicFilterPhysicalExpr>)> {
2302+
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
2303+
let mut join = HashJoinExec::try_new(
2304+
left,
2305+
right,
2306+
on,
2307+
None,
2308+
&join_type,
2309+
None,
2310+
PartitionMode::CollectLeft,
2311+
NullEquality::NullEqualsNothing,
2312+
false,
2313+
)?;
2314+
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
2315+
filter: Arc::clone(&dynamic_filter),
2316+
build_accumulator: OnceLock::new(),
2317+
});
2318+
2319+
Ok((join, dynamic_filter))
2320+
}
2321+
22182322
async fn join_collect(
22192323
left: Arc<dyn ExecutionPlan>,
22202324
right: Arc<dyn ExecutionPlan>,
@@ -4923,6 +5027,70 @@ mod tests {
49235027
}
49245028
}
49255029

5030+
#[tokio::test]
5031+
async fn join_does_not_consume_probe_when_empty_build_fixes_output() {
5032+
assert_empty_build_probe_behavior(
5033+
&[
5034+
JoinType::Inner,
5035+
JoinType::Left,
5036+
JoinType::LeftSemi,
5037+
JoinType::LeftAnti,
5038+
JoinType::LeftMark,
5039+
JoinType::RightSemi,
5040+
],
5041+
false,
5042+
false,
5043+
)
5044+
.await;
5045+
}
5046+
5047+
#[tokio::test]
5048+
async fn join_does_not_consume_probe_when_empty_build_fixes_output_with_filter() {
5049+
assert_empty_build_probe_behavior(
5050+
&[
5051+
JoinType::Inner,
5052+
JoinType::Left,
5053+
JoinType::LeftSemi,
5054+
JoinType::LeftAnti,
5055+
JoinType::LeftMark,
5056+
JoinType::RightSemi,
5057+
],
5058+
false,
5059+
true,
5060+
)
5061+
.await;
5062+
}
5063+
5064+
#[tokio::test]
5065+
async fn join_still_consumes_probe_when_empty_build_needs_probe_rows() {
5066+
assert_empty_build_probe_behavior(
5067+
&[
5068+
JoinType::Right,
5069+
JoinType::Full,
5070+
JoinType::RightAnti,
5071+
JoinType::RightMark,
5072+
],
5073+
true,
5074+
false,
5075+
)
5076+
.await;
5077+
}
5078+
5079+
#[tokio::test]
5080+
async fn join_still_consumes_probe_when_empty_build_needs_probe_rows_with_filter() {
5081+
assert_empty_build_probe_behavior(
5082+
&[
5083+
JoinType::Right,
5084+
JoinType::Full,
5085+
JoinType::RightAnti,
5086+
JoinType::RightMark,
5087+
],
5088+
true,
5089+
true,
5090+
)
5091+
.await;
5092+
}
5093+
49265094
#[tokio::test]
49275095
async fn join_split_batch() {
49285096
let left = build_table(
@@ -5366,34 +5534,16 @@ mod tests {
53665534
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
53675535
)];
53685536

5369-
// Create a dynamic filter manually
5370-
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
5371-
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
5372-
5373-
// Create HashJoinExec with the dynamic filter
5374-
let mut join = HashJoinExec::try_new(
5375-
left,
5376-
right,
5377-
on,
5378-
None,
5379-
&JoinType::Inner,
5380-
None,
5381-
PartitionMode::CollectLeft,
5382-
NullEquality::NullEqualsNothing,
5383-
false,
5384-
)?;
5385-
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
5386-
filter: dynamic_filter,
5387-
build_accumulator: OnceLock::new(),
5388-
});
5537+
let (join, dynamic_filter) =
5538+
hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?;
53895539

53905540
// Execute the join
53915541
let stream = join.execute(0, task_ctx)?;
53925542
let _batches = common::collect(stream).await?;
53935543

53945544
// After the join completes, the dynamic filter should be marked as complete
53955545
// wait_complete() should return immediately
5396-
dynamic_filter_clone.wait_complete().await;
5546+
dynamic_filter.wait_complete().await;
53975547

53985548
Ok(())
53995549
}
@@ -5415,34 +5565,37 @@ mod tests {
54155565
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
54165566
)];
54175567

5418-
// Create a dynamic filter manually
5419-
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
5420-
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
5421-
5422-
// Create HashJoinExec with the dynamic filter
5423-
let mut join = HashJoinExec::try_new(
5424-
left,
5425-
right,
5426-
on,
5427-
None,
5428-
&JoinType::Inner,
5429-
None,
5430-
PartitionMode::CollectLeft,
5431-
NullEquality::NullEqualsNothing,
5432-
false,
5433-
)?;
5434-
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
5435-
filter: dynamic_filter,
5436-
build_accumulator: OnceLock::new(),
5437-
});
5568+
let (join, dynamic_filter) =
5569+
hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?;
54385570

54395571
// Execute the join
54405572
let stream = join.execute(0, task_ctx)?;
54415573
let _batches = common::collect(stream).await?;
54425574

54435575
// Even with empty build side, the dynamic filter should be marked as complete
54445576
// wait_complete() should return immediately
5445-
dynamic_filter_clone.wait_complete().await;
5577+
dynamic_filter.wait_complete().await;
5578+
5579+
Ok(())
5580+
}
5581+
5582+
#[tokio::test]
5583+
async fn test_hash_join_skips_probe_on_empty_build_after_partition_bounds_report()
5584+
-> Result<()> {
5585+
let task_ctx = Arc::new(TaskContext::default());
5586+
let (left, right, on) = empty_build_with_probe_error_inputs();
5587+
5588+
// Keep an extra consumer reference so execute() enables dynamic filter pushdown
5589+
// and enters the WaitPartitionBoundsReport path before deciding whether to poll
5590+
// the probe side.
5591+
let (join, dynamic_filter) =
5592+
hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?;
5593+
5594+
let stream = join.execute(0, task_ctx)?;
5595+
let batches = common::collect(stream).await?;
5596+
assert!(batches.is_empty());
5597+
5598+
dynamic_filter.wait_complete().await;
54465599

54475600
Ok(())
54485601
}

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,21 @@ impl HashJoinStream {
406406
}
407407
}
408408

409+
/// Returns the next state after the build side has been fully collected
410+
/// and any required build-side coordination has completed.
411+
fn state_after_build_ready(
412+
join_type: JoinType,
413+
left_data: &JoinLeftData,
414+
) -> HashJoinStreamState {
415+
if left_data.map().is_empty()
416+
&& join_type.empty_build_side_produces_empty_result()
417+
{
418+
HashJoinStreamState::Completed
419+
} else {
420+
HashJoinStreamState::FetchProbeBatch
421+
}
422+
}
423+
409424
/// Separate implementation function that unpins the [`HashJoinStream`] so
410425
/// that partial borrows work correctly
411426
fn poll_next_impl(
@@ -469,7 +484,9 @@ impl HashJoinStream {
469484
if let Some(ref mut fut) = self.build_waiter {
470485
ready!(fut.get_shared(cx))?;
471486
}
472-
self.state = HashJoinStreamState::FetchProbeBatch;
487+
let build_side = self.build_side.try_as_ready()?;
488+
self.state =
489+
Self::state_after_build_ready(self.join_type, build_side.left_data.as_ref());
473490
Poll::Ready(Ok(StatefulStreamResult::Continue))
474491
}
475492

@@ -540,7 +557,8 @@ impl HashJoinStream {
540557
}));
541558
self.state = HashJoinStreamState::WaitPartitionBoundsReport;
542559
} else {
543-
self.state = HashJoinStreamState::FetchProbeBatch;
560+
self.state =
561+
Self::state_after_build_ready(self.join_type, left_data.as_ref());
544562
}
545563

546564
self.build_side = BuildSide::Ready(BuildSideReadyState { left_data });
@@ -643,10 +661,14 @@ impl HashJoinStream {
643661
}
644662
}
645663

646-
// if the left side is empty, we can skip the (potentially expensive) join operation
664+
// If the build side is empty, this stream only reaches ProcessProbeBatch for
665+
// join types whose output still depends on probe rows.
647666
let is_empty = build_side.left_data.map().is_empty();
648667

649-
if is_empty && self.filter.is_none() {
668+
if is_empty {
669+
// Invariant: state_after_build_ready should have already completed
670+
// join types whose result is fixed to empty when the build side is empty.
671+
debug_assert!(!self.join_type.empty_build_side_produces_empty_result());
650672
let result = build_batch_empty_build_side(
651673
&self.schema,
652674
build_side.left_data.batch(),

0 commit comments

Comments
 (0)