Skip to content

Commit 4d6367b

Browse files
committed
Refactor join type handling for empty builds
Move empty-build predicate to JoinType. Implement JoinType::empty_build_side_produces_empty_result method and update relevant call sites. Stream state machine now directly uses this method, simplifying logic. Remove old utility predicate and update tests, including new filtered regression tests.
1 parent 48cdea9 commit 4d6367b

4 files changed

Lines changed: 78 additions & 40 deletions

File tree

datafusion/common/src/join_type.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,20 @@ impl JoinType {
142142
| JoinType::RightMark
143143
)
144144
}
145+
146+
/// Returns true when an empty build side necessarily produces an empty
147+
/// result for this join type.
148+
pub fn empty_build_side_produces_empty_result(self) -> bool {
149+
matches!(
150+
self,
151+
JoinType::Inner
152+
| JoinType::Left
153+
| JoinType::LeftSemi
154+
| JoinType::LeftAnti
155+
| JoinType::LeftMark
156+
| JoinType::RightSemi
157+
)
158+
}
145159
}
146160

147161
impl Display for JoinType {

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2305,18 +2305,32 @@ mod tests {
23052305
async fn assert_empty_build_probe_behavior(
23062306
join_types: &[JoinType],
23072307
expect_probe_error: bool,
2308+
with_filter: bool,
23082309
) {
23092310
let (left, right, on) = empty_build_with_probe_error_inputs();
2311+
let filter = prepare_join_filter();
23102312

23112313
for join_type in join_types {
2312-
let join = join(
2313-
Arc::clone(&left),
2314-
Arc::clone(&right),
2315-
on.clone(),
2316-
join_type,
2317-
NullEquality::NullEqualsNothing,
2318-
)
2319-
.unwrap();
2314+
let join = if with_filter {
2315+
join_with_filter(
2316+
Arc::clone(&left),
2317+
Arc::clone(&right),
2318+
on.clone(),
2319+
filter.clone(),
2320+
join_type,
2321+
NullEquality::NullEqualsNothing,
2322+
)
2323+
.unwrap()
2324+
} else {
2325+
join(
2326+
Arc::clone(&left),
2327+
Arc::clone(&right),
2328+
on.clone(),
2329+
join_type,
2330+
NullEquality::NullEqualsNothing,
2331+
)
2332+
.unwrap()
2333+
};
23202334

23212335
let result = common::collect(
23222336
join.execute(0, Arc::new(TaskContext::default())).unwrap(),
@@ -5085,6 +5099,24 @@ mod tests {
50855099
JoinType::RightSemi,
50865100
],
50875101
false,
5102+
false,
5103+
)
5104+
.await;
5105+
}
5106+
5107+
#[tokio::test]
5108+
async fn join_does_not_consume_probe_when_empty_build_fixes_output_with_filter() {
5109+
assert_empty_build_probe_behavior(
5110+
&[
5111+
JoinType::Inner,
5112+
JoinType::Left,
5113+
JoinType::LeftSemi,
5114+
JoinType::LeftAnti,
5115+
JoinType::LeftMark,
5116+
JoinType::RightSemi,
5117+
],
5118+
false,
5119+
true,
50885120
)
50895121
.await;
50905122
}
@@ -5099,6 +5131,22 @@ mod tests {
50995131
JoinType::RightMark,
51005132
],
51015133
true,
5134+
false,
5135+
)
5136+
.await;
5137+
}
5138+
5139+
#[tokio::test]
5140+
async fn join_still_consumes_probe_when_empty_build_needs_probe_rows_with_filter() {
5141+
assert_empty_build_probe_behavior(
5142+
&[
5143+
JoinType::Right,
5144+
JoinType::Full,
5145+
JoinType::RightAnti,
5146+
JoinType::RightMark,
5147+
],
5148+
true,
5149+
true,
51025150
)
51035151
.await;
51045152
}

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use crate::{
4242
BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType,
4343
StatefulStreamResult, adjust_indices_by_join_type, apply_join_filter_to_indices,
4444
build_batch_empty_build_side, build_batch_from_indices,
45-
empty_build_side_produces_empty_result, need_produce_result_in_final,
45+
need_produce_result_in_final,
4646
},
4747
};
4848

@@ -409,13 +409,11 @@ impl HashJoinStream {
409409
/// Returns the next state after the build side has been fully collected
410410
/// and any required build-side coordination has completed.
411411
fn state_after_build_ready(
412-
has_filter: bool,
413412
join_type: JoinType,
414413
left_data: &JoinLeftData,
415414
) -> HashJoinStreamState {
416-
if !has_filter
417-
&& left_data.map().is_empty()
418-
&& empty_build_side_produces_empty_result(join_type)
415+
if left_data.map().is_empty()
416+
&& join_type.empty_build_side_produces_empty_result()
419417
{
420418
HashJoinStreamState::Completed
421419
} else {
@@ -487,11 +485,8 @@ impl HashJoinStream {
487485
ready!(fut.get_shared(cx))?;
488486
}
489487
let build_side = self.build_side.try_as_ready()?;
490-
self.state = Self::state_after_build_ready(
491-
self.filter.is_some(),
492-
self.join_type,
493-
build_side.left_data.as_ref(),
494-
);
488+
self.state =
489+
Self::state_after_build_ready(self.join_type, build_side.left_data.as_ref());
495490
Poll::Ready(Ok(StatefulStreamResult::Continue))
496491
}
497492

@@ -562,11 +557,8 @@ impl HashJoinStream {
562557
}));
563558
self.state = HashJoinStreamState::WaitPartitionBoundsReport;
564559
} else {
565-
self.state = Self::state_after_build_ready(
566-
self.filter.is_some(),
567-
self.join_type,
568-
left_data.as_ref(),
569-
);
560+
self.state =
561+
Self::state_after_build_ready(self.join_type, left_data.as_ref());
570562
}
571563

572564
self.build_side = BuildSide::Ready(BuildSideReadyState { left_data });

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -855,22 +855,6 @@ pub(crate) fn need_produce_result_in_final(join_type: JoinType) -> bool {
855855
)
856856
}
857857

858-
/// Returns true when an empty build side necessarily produces an empty result.
859-
///
860-
/// This is the shared source of truth for both state-machine short-circuiting
861-
/// and `build_batch_empty_build_side`.
862-
pub(crate) fn empty_build_side_produces_empty_result(join_type: JoinType) -> bool {
863-
matches!(
864-
join_type,
865-
JoinType::Inner
866-
| JoinType::Left
867-
| JoinType::LeftSemi
868-
| JoinType::LeftAnti
869-
| JoinType::LeftMark
870-
| JoinType::RightSemi
871-
)
872-
}
873-
874858
pub(crate) fn get_final_indices_from_shared_bitmap(
875859
shared_bitmap: &SharedBitmapBuilder,
876860
join_type: JoinType,
@@ -1076,7 +1060,7 @@ pub(crate) fn build_batch_empty_build_side(
10761060
column_indices: &[ColumnIndex],
10771061
join_type: JoinType,
10781062
) -> Result<RecordBatch> {
1079-
if empty_build_side_produces_empty_result(join_type) {
1063+
if join_type.empty_build_side_produces_empty_result() {
10801064
// These join types only return data if the left side is not empty.
10811065
return Ok(RecordBatch::new_empty(Arc::new(schema.clone())));
10821066
}

0 commit comments

Comments
 (0)