Skip to content

Commit 45ee6d1

Browse files
committed
Refactor empty build side handling
Remove duplication by introducing a shared helper, empty_build_side_produces_empty_result, in utils.rs. Update build_batch_empty_build_side to use this helper directly, ensuring alignment in the short-circuit and batch-construction logic within the hash join state transition in stream.rs.
1 parent 626f771 commit 45ee6d1

2 files changed

Lines changed: 37 additions & 43 deletions

File tree

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use crate::{
4242
BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType,
4343
StatefulStreamResult, adjust_indices_by_join_type, apply_join_filter_to_indices,
4444
build_batch_empty_build_side, build_batch_from_indices,
45-
can_skip_probe_on_empty_build_side, need_produce_result_in_final,
45+
empty_build_side_produces_empty_result, need_produce_result_in_final,
4646
},
4747
};
4848

@@ -414,7 +414,7 @@ impl HashJoinStream {
414414
) -> HashJoinStreamState {
415415
if left_data.map().is_empty()
416416
&& self.filter.is_none()
417-
&& can_skip_probe_on_empty_build_side(self.join_type)
417+
&& empty_build_side_produces_empty_result(self.join_type)
418418
{
419419
HashJoinStreamState::Completed
420420
} else {

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 35 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -855,9 +855,11 @@ pub(crate) fn need_produce_result_in_final(join_type: JoinType) -> bool {
855855
)
856856
}
857857

858-
/// Returns true when an empty build side fully determines the join result,
859-
/// so the probe side does not need to be consumed.
860-
pub(crate) fn can_skip_probe_on_empty_build_side(join_type: JoinType) -> bool {
858+
/// Returns true when an empty build side necessarily produces an empty result.
859+
///
860+
/// This is the shared source of truth for both state-machine short-circuiting
861+
/// and `build_batch_empty_build_side`.
862+
pub(crate) fn empty_build_side_produces_empty_result(join_type: JoinType) -> bool {
861863
matches!(
862864
join_type,
863865
JoinType::Inner
@@ -1074,46 +1076,38 @@ pub(crate) fn build_batch_empty_build_side(
10741076
column_indices: &[ColumnIndex],
10751077
join_type: JoinType,
10761078
) -> Result<RecordBatch> {
1077-
match join_type {
1078-
// these join types only return data if the left side is not empty, so we return an
1079-
// empty RecordBatch
1080-
JoinType::Inner
1081-
| JoinType::Left
1082-
| JoinType::LeftSemi
1083-
| JoinType::RightSemi
1084-
| JoinType::LeftAnti
1085-
| JoinType::LeftMark => Ok(RecordBatch::new_empty(Arc::new(schema.clone()))),
1086-
1087-
// the remaining joins will return data for the right columns and null for the left ones
1088-
JoinType::Right | JoinType::Full | JoinType::RightAnti | JoinType::RightMark => {
1089-
let num_rows = probe_batch.num_rows();
1090-
if schema.fields().is_empty() {
1091-
return new_empty_schema_batch(schema, num_rows);
1092-
}
1093-
let mut columns: Vec<Arc<dyn Array>> =
1094-
Vec::with_capacity(schema.fields().len());
1095-
1096-
for column_index in column_indices {
1097-
let array = match column_index.side {
1098-
// left -> null array
1099-
JoinSide::Left => new_null_array(
1100-
build_batch.column(column_index.index).data_type(),
1101-
num_rows,
1102-
),
1103-
// right -> respective right array
1104-
JoinSide::Right => Arc::clone(probe_batch.column(column_index.index)),
1105-
// right mark -> unset boolean array as there are no matches on the left side
1106-
JoinSide::None => Arc::new(BooleanArray::new(
1107-
BooleanBuffer::new_unset(num_rows),
1108-
None,
1109-
)),
1110-
};
1111-
1112-
columns.push(array);
1113-
}
1079+
if empty_build_side_produces_empty_result(join_type) {
1080+
// These join types only return data if the left side is not empty.
1081+
Ok(RecordBatch::new_empty(Arc::new(schema.clone())))
1082+
} else {
1083+
// The remaining joins return right-side rows and nulls for the left side.
1084+
let num_rows = probe_batch.num_rows();
1085+
if schema.fields().is_empty() {
1086+
return new_empty_schema_batch(schema, num_rows);
1087+
}
1088+
let mut columns: Vec<Arc<dyn Array>> =
1089+
Vec::with_capacity(schema.fields().len());
1090+
1091+
for column_index in column_indices {
1092+
let array = match column_index.side {
1093+
// left -> null array
1094+
JoinSide::Left => new_null_array(
1095+
build_batch.column(column_index.index).data_type(),
1096+
num_rows,
1097+
),
1098+
// right -> respective right array
1099+
JoinSide::Right => Arc::clone(probe_batch.column(column_index.index)),
1100+
// right mark -> unset boolean array as there are no matches on the left side
1101+
JoinSide::None => Arc::new(BooleanArray::new(
1102+
BooleanBuffer::new_unset(num_rows),
1103+
None,
1104+
)),
1105+
};
11141106

1115-
Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
1107+
columns.push(array);
11161108
}
1109+
1110+
Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
11171111
}
11181112
}
11191113

0 commit comments

Comments
 (0)