Skip to content

Commit 626f771

Browse files
committed
Consolidate empty-build probe tests and join setup
Refactor exec.rs by consolidating empty-build probe-behavior tests into `assert_empty_build_probe_behavior(...)` and repeated dynamic filter join setup into `hash_join_with_dynamic_filter(...)`. Maintain existing runtime logic while reducing duplicate test boilerplate and redundant local setup for improved clarity and maintainability.
1 parent 30f2a3e commit 626f771

2 files changed

Lines changed: 104 additions & 123 deletions

File tree

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 100 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -2275,8 +2275,10 @@ mod tests {
22752275
)
22762276
}
22772277

2278-
fn empty_build_with_probe_error_inputs() -> (Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>, JoinOn) {
2279-
let left_batch = build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![]));
2278+
fn empty_build_with_probe_error_inputs()
2279+
-> (Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>, JoinOn) {
2280+
let left_batch =
2281+
build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![]));
22802282
let left_schema = left_batch.schema();
22812283
let left: Arc<dyn ExecutionPlan> =
22822284
TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema.clone(), None)
@@ -2297,6 +2299,69 @@ mod tests {
22972299
(left, right, on)
22982300
}
22992301

2302+
async fn assert_empty_build_probe_behavior(
2303+
join_types: &[JoinType],
2304+
expect_probe_error: bool,
2305+
) {
2306+
let (left, right, on) = empty_build_with_probe_error_inputs();
2307+
2308+
for join_type in join_types {
2309+
let join = join(
2310+
Arc::clone(&left),
2311+
Arc::clone(&right),
2312+
on.clone(),
2313+
join_type,
2314+
NullEquality::NullEqualsNothing,
2315+
)
2316+
.unwrap();
2317+
2318+
let result = common::collect(
2319+
join.execute(0, Arc::new(TaskContext::default())).unwrap(),
2320+
)
2321+
.await;
2322+
2323+
if expect_probe_error {
2324+
let result_string = result.unwrap_err().to_string();
2325+
assert!(
2326+
result_string.contains("bad data error"),
2327+
"actual: {result_string}"
2328+
);
2329+
} else {
2330+
let batches = result.unwrap();
2331+
assert!(
2332+
batches.is_empty(),
2333+
"expected no output batches for {join_type}, got {batches:?}"
2334+
);
2335+
}
2336+
}
2337+
}
2338+
2339+
fn hash_join_with_dynamic_filter(
2340+
left: Arc<dyn ExecutionPlan>,
2341+
right: Arc<dyn ExecutionPlan>,
2342+
on: JoinOn,
2343+
join_type: JoinType,
2344+
) -> Result<(HashJoinExec, Arc<DynamicFilterPhysicalExpr>)> {
2345+
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
2346+
let mut join = HashJoinExec::try_new(
2347+
left,
2348+
right,
2349+
on,
2350+
None,
2351+
&join_type,
2352+
None,
2353+
PartitionMode::CollectLeft,
2354+
NullEquality::NullEqualsNothing,
2355+
false,
2356+
)?;
2357+
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
2358+
filter: Arc::clone(&dynamic_filter),
2359+
build_accumulator: OnceLock::new(),
2360+
});
2361+
2362+
Ok((join, dynamic_filter))
2363+
}
2364+
23002365
async fn join_collect(
23012366
left: Arc<dyn ExecutionPlan>,
23022367
right: Arc<dyn ExecutionPlan>,
@@ -5007,67 +5072,32 @@ mod tests {
50075072

50085073
#[tokio::test]
50095074
async fn join_does_not_consume_probe_when_empty_build_fixes_output() {
5010-
let (left, right_input, on) = empty_build_with_probe_error_inputs();
5011-
5012-
let join_types = vec![
5013-
JoinType::Inner,
5014-
JoinType::Left,
5015-
JoinType::LeftSemi,
5016-
JoinType::LeftAnti,
5017-
JoinType::LeftMark,
5018-
JoinType::RightSemi,
5019-
];
5020-
5021-
for join_type in join_types {
5022-
let join = join(
5023-
Arc::clone(&left),
5024-
Arc::clone(&right_input),
5025-
on.clone(),
5026-
&join_type,
5027-
NullEquality::NullEqualsNothing,
5028-
)
5029-
.unwrap();
5030-
let task_ctx = Arc::new(TaskContext::default());
5031-
5032-
let stream = join.execute(0, task_ctx).unwrap();
5033-
let batches = common::collect(stream).await.unwrap();
5034-
5035-
assert!(
5036-
batches.is_empty(),
5037-
"expected no output batches for {join_type}, got {batches:?}"
5038-
);
5039-
}
5075+
assert_empty_build_probe_behavior(
5076+
&[
5077+
JoinType::Inner,
5078+
JoinType::Left,
5079+
JoinType::LeftSemi,
5080+
JoinType::LeftAnti,
5081+
JoinType::LeftMark,
5082+
JoinType::RightSemi,
5083+
],
5084+
false,
5085+
)
5086+
.await;
50405087
}
50415088

50425089
#[tokio::test]
50435090
async fn join_still_consumes_probe_when_empty_build_needs_probe_rows() {
5044-
let (left, right_input, on) = empty_build_with_probe_error_inputs();
5045-
5046-
let join_types = vec![
5047-
JoinType::Right,
5048-
JoinType::Full,
5049-
JoinType::RightAnti,
5050-
JoinType::RightMark,
5051-
];
5052-
5053-
for join_type in join_types {
5054-
let join = join(
5055-
Arc::clone(&left),
5056-
Arc::clone(&right_input),
5057-
on.clone(),
5058-
&join_type,
5059-
NullEquality::NullEqualsNothing,
5060-
)
5061-
.unwrap();
5062-
let task_ctx = Arc::new(TaskContext::default());
5063-
5064-
let stream = join.execute(0, task_ctx).unwrap();
5065-
let result_string = common::collect(stream).await.unwrap_err().to_string();
5066-
assert!(
5067-
result_string.contains("bad data error"),
5068-
"actual: {result_string}"
5069-
);
5070-
}
5091+
assert_empty_build_probe_behavior(
5092+
&[
5093+
JoinType::Right,
5094+
JoinType::Full,
5095+
JoinType::RightAnti,
5096+
JoinType::RightMark,
5097+
],
5098+
true,
5099+
)
5100+
.await;
50715101
}
50725102

50735103
#[tokio::test]
@@ -5513,34 +5543,16 @@ mod tests {
55135543
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
55145544
)];
55155545

5516-
// Create a dynamic filter manually
5517-
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
5518-
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
5519-
5520-
// Create HashJoinExec with the dynamic filter
5521-
let mut join = HashJoinExec::try_new(
5522-
left,
5523-
right,
5524-
on,
5525-
None,
5526-
&JoinType::Inner,
5527-
None,
5528-
PartitionMode::CollectLeft,
5529-
NullEquality::NullEqualsNothing,
5530-
false,
5531-
)?;
5532-
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
5533-
filter: dynamic_filter,
5534-
build_accumulator: OnceLock::new(),
5535-
});
5546+
let (join, dynamic_filter) =
5547+
hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?;
55365548

55375549
// Execute the join
55385550
let stream = join.execute(0, task_ctx)?;
55395551
let _batches = common::collect(stream).await?;
55405552

55415553
// After the join completes, the dynamic filter should be marked as complete
55425554
// wait_complete() should return immediately
5543-
dynamic_filter_clone.wait_complete().await;
5555+
dynamic_filter.wait_complete().await;
55445556

55455557
Ok(())
55465558
}
@@ -5562,71 +5574,37 @@ mod tests {
55625574
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
55635575
)];
55645576

5565-
// Create a dynamic filter manually
5566-
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
5567-
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
5568-
5569-
// Create HashJoinExec with the dynamic filter
5570-
let mut join = HashJoinExec::try_new(
5571-
left,
5572-
right,
5573-
on,
5574-
None,
5575-
&JoinType::Inner,
5576-
None,
5577-
PartitionMode::CollectLeft,
5578-
NullEquality::NullEqualsNothing,
5579-
false,
5580-
)?;
5581-
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
5582-
filter: dynamic_filter,
5583-
build_accumulator: OnceLock::new(),
5584-
});
5577+
let (join, dynamic_filter) =
5578+
hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?;
55855579

55865580
// Execute the join
55875581
let stream = join.execute(0, task_ctx)?;
55885582
let _batches = common::collect(stream).await?;
55895583

55905584
// Even with empty build side, the dynamic filter should be marked as complete
55915585
// wait_complete() should return immediately
5592-
dynamic_filter_clone.wait_complete().await;
5586+
dynamic_filter.wait_complete().await;
55935587

55945588
Ok(())
55955589
}
55965590

55975591
#[tokio::test]
5598-
async fn test_hash_join_skips_probe_on_empty_build_after_partition_bounds_report(
5599-
) -> Result<()> {
5592+
async fn test_hash_join_skips_probe_on_empty_build_after_partition_bounds_report()
5593+
-> Result<()> {
56005594
let task_ctx = Arc::new(TaskContext::default());
56015595
let (left, right, on) = empty_build_with_probe_error_inputs();
56025596

56035597
// Keep an extra consumer reference so execute() enables dynamic filter pushdown
56045598
// and enters the WaitPartitionBoundsReport path before deciding whether to poll
56055599
// the probe side.
5606-
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
5607-
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
5608-
5609-
let mut join = HashJoinExec::try_new(
5610-
left,
5611-
right,
5612-
on,
5613-
None,
5614-
&JoinType::Inner,
5615-
None,
5616-
PartitionMode::CollectLeft,
5617-
NullEquality::NullEqualsNothing,
5618-
false,
5619-
)?;
5620-
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
5621-
filter: dynamic_filter,
5622-
build_accumulator: OnceLock::new(),
5623-
});
5600+
let (join, dynamic_filter) =
5601+
hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?;
56245602

56255603
let stream = join.execute(0, task_ctx)?;
56265604
let batches = common::collect(stream).await?;
56275605
assert!(batches.is_empty());
56285606

5629-
dynamic_filter_clone.wait_complete().await;
5607+
dynamic_filter.wait_complete().await;
56305608

56315609
Ok(())
56325610
}

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,10 @@ impl HashJoinStream {
408408

409409
/// Returns the next state after the build side has been fully collected
410410
/// and any required build-side coordination has completed.
411-
fn next_state_after_build_ready(&self, left_data: &JoinLeftData) -> HashJoinStreamState {
411+
fn next_state_after_build_ready(
412+
&self,
413+
left_data: &JoinLeftData,
414+
) -> HashJoinStreamState {
412415
if left_data.map().is_empty()
413416
&& self.filter.is_none()
414417
&& can_skip_probe_on_empty_build_side(self.join_type)

0 commit comments

Comments
 (0)