Skip to content

Commit 12a52ef

Browse files
committed
Simplify hash-join state machine and batch construction
Refactor stream.rs and utils.rs to streamline the hash-join state machine. Compute the post-build state directly from inputs, eliminating unnecessary indirection. Update the empty-build-side batch construction to utilize early returns and iterator-based collection for columns, replacing manual Vec setup and push logic.
1 parent 45ee6d1 commit 12a52ef

2 files changed

Lines changed: 41 additions & 36 deletions

File tree

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -408,13 +408,14 @@ impl HashJoinStream {
408408

409409
/// Returns the next state after the build side has been fully collected
410410
/// and any required build-side coordination has completed.
411-
fn next_state_after_build_ready(
412-
&self,
411+
fn state_after_build_ready(
412+
has_filter: bool,
413+
join_type: JoinType,
413414
left_data: &JoinLeftData,
414415
) -> HashJoinStreamState {
415-
if left_data.map().is_empty()
416-
&& self.filter.is_none()
417-
&& empty_build_side_produces_empty_result(self.join_type)
416+
if !has_filter
417+
&& left_data.map().is_empty()
418+
&& empty_build_side_produces_empty_result(join_type)
418419
{
419420
HashJoinStreamState::Completed
420421
} else {
@@ -486,7 +487,11 @@ impl HashJoinStream {
486487
ready!(fut.get_shared(cx))?;
487488
}
488489
let build_side = self.build_side.try_as_ready()?;
489-
self.state = self.next_state_after_build_ready(build_side.left_data.as_ref());
490+
self.state = Self::state_after_build_ready(
491+
self.filter.is_some(),
492+
self.join_type,
493+
build_side.left_data.as_ref(),
494+
);
490495
Poll::Ready(Ok(StatefulStreamResult::Continue))
491496
}
492497

@@ -557,7 +562,11 @@ impl HashJoinStream {
557562
}));
558563
self.state = HashJoinStreamState::WaitPartitionBoundsReport;
559564
} else {
560-
self.state = self.next_state_after_build_ready(left_data.as_ref());
565+
self.state = Self::state_after_build_ready(
566+
self.filter.is_some(),
567+
self.join_type,
568+
left_data.as_ref(),
569+
);
561570
}
562571

563572
self.build_side = BuildSide::Ready(BuildSideReadyState { left_data });

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 25 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1078,37 +1078,33 @@ pub(crate) fn build_batch_empty_build_side(
10781078
) -> Result<RecordBatch> {
10791079
if empty_build_side_produces_empty_result(join_type) {
10801080
// These join types only return data if the left side is not empty.
1081-
Ok(RecordBatch::new_empty(Arc::new(schema.clone())))
1082-
} else {
1083-
// The remaining joins return right-side rows and nulls for the left side.
1084-
let num_rows = probe_batch.num_rows();
1085-
if schema.fields().is_empty() {
1086-
return new_empty_schema_batch(schema, num_rows);
1087-
}
1088-
let mut columns: Vec<Arc<dyn Array>> =
1089-
Vec::with_capacity(schema.fields().len());
1090-
1091-
for column_index in column_indices {
1092-
let array = match column_index.side {
1093-
// left -> null array
1094-
JoinSide::Left => new_null_array(
1095-
build_batch.column(column_index.index).data_type(),
1096-
num_rows,
1097-
),
1098-
// right -> respective right array
1099-
JoinSide::Right => Arc::clone(probe_batch.column(column_index.index)),
1100-
// right mark -> unset boolean array as there are no matches on the left side
1101-
JoinSide::None => Arc::new(BooleanArray::new(
1102-
BooleanBuffer::new_unset(num_rows),
1103-
None,
1104-
)),
1105-
};
1106-
1107-
columns.push(array);
1108-
}
1081+
return Ok(RecordBatch::new_empty(Arc::new(schema.clone())));
1082+
}
11091083

1110-
Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
1084+
// The remaining joins return right-side rows and nulls for the left side.
1085+
let num_rows = probe_batch.num_rows();
1086+
if schema.fields().is_empty() {
1087+
return new_empty_schema_batch(schema, num_rows);
11111088
}
1089+
1090+
let columns = column_indices
1091+
.iter()
1092+
.map(|column_index| match column_index.side {
1093+
// left -> null array
1094+
JoinSide::Left => new_null_array(
1095+
build_batch.column(column_index.index).data_type(),
1096+
num_rows,
1097+
),
1098+
// right -> respective right array
1099+
JoinSide::Right => Arc::clone(probe_batch.column(column_index.index)),
1100+
// right mark -> unset boolean array as there are no matches on the left side
1101+
JoinSide::None => {
1102+
Arc::new(BooleanArray::new(BooleanBuffer::new_unset(num_rows), None))
1103+
}
1104+
})
1105+
.collect();
1106+
1107+
Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
11121108
}
11131109

11141110
/// The input is the matched indices for left and right and

0 commit comments

Comments
 (0)