Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 147 additions & 42 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2275,6 +2275,96 @@ mod tests {
)
}

fn empty_build_with_probe_error_inputs()
-> (Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>, JoinOn) {
let left_batch =
build_table_i32(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![]));
let left_schema = left_batch.schema();
let left: Arc<dyn ExecutionPlan> = TestMemoryExec::try_new_exec(
&[vec![left_batch]],
Arc::clone(&left_schema),
None,
)
.unwrap();

let err = exec_err!("bad data error");
let right_batch =
build_table_i32(("a2", &vec![]), ("b1", &vec![]), ("c2", &vec![]));
let right_schema = right_batch.schema();
let on = vec![(
Arc::new(Column::new_with_schema("b1", &left_schema).unwrap()) as _,
Arc::new(Column::new_with_schema("b1", &right_schema).unwrap()) as _,
)];
let right: Arc<dyn ExecutionPlan> = Arc::new(
MockExec::new(vec![Ok(right_batch), err], right_schema).with_use_task(false),
);

(left, right, on)
}

async fn assert_empty_build_probe_behavior(
join_types: &[JoinType],
expect_probe_error: bool,
) {
let (left, right, on) = empty_build_with_probe_error_inputs();

for join_type in join_types {
let join = join(
Arc::clone(&left),
Arc::clone(&right),
on.clone(),
join_type,
NullEquality::NullEqualsNothing,
)
.unwrap();

let result = common::collect(
join.execute(0, Arc::new(TaskContext::default())).unwrap(),
)
.await;

if expect_probe_error {
let result_string = result.unwrap_err().to_string();
assert!(
result_string.contains("bad data error"),
"actual: {result_string}"
);
} else {
let batches = result.unwrap();
assert!(
batches.is_empty(),
"expected no output batches for {join_type}, got {batches:?}"
);
}
}
}

fn hash_join_with_dynamic_filter(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: JoinOn,
join_type: JoinType,
) -> Result<(HashJoinExec, Arc<DynamicFilterPhysicalExpr>)> {
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
let mut join = HashJoinExec::try_new(
left,
right,
on,
None,
&join_type,
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
false,
)?;
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
filter: Arc::clone(&dynamic_filter),
build_accumulator: OnceLock::new(),
});

Ok((join, dynamic_filter))
}

async fn join_collect(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
Expand Down Expand Up @@ -4983,6 +5073,36 @@ mod tests {
}
}

#[tokio::test]
async fn join_does_not_consume_probe_when_empty_build_fixes_output() {
assert_empty_build_probe_behavior(
&[
JoinType::Inner,
JoinType::Left,
JoinType::LeftSemi,
JoinType::LeftAnti,
JoinType::LeftMark,
JoinType::RightSemi,
],
false,
)
.await;
}

#[tokio::test]
async fn join_still_consumes_probe_when_empty_build_needs_probe_rows() {
assert_empty_build_probe_behavior(
&[
JoinType::Right,
JoinType::Full,
JoinType::RightAnti,
JoinType::RightMark,
],
true,
)
.await;
}

#[tokio::test]
async fn join_split_batch() {
let left = build_table(
Expand Down Expand Up @@ -5426,34 +5546,16 @@ mod tests {
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];

// Create a dynamic filter manually
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
let dynamic_filter_clone = Arc::clone(&dynamic_filter);

// Create HashJoinExec with the dynamic filter
let mut join = HashJoinExec::try_new(
left,
right,
on,
None,
&JoinType::Inner,
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
false,
)?;
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
filter: dynamic_filter,
build_accumulator: OnceLock::new(),
});
let (join, dynamic_filter) =
hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?;

// Execute the join
let stream = join.execute(0, task_ctx)?;
let _batches = common::collect(stream).await?;

// After the join completes, the dynamic filter should be marked as complete
// wait_complete() should return immediately
dynamic_filter_clone.wait_complete().await;
dynamic_filter.wait_complete().await;

Ok(())
}
Expand All @@ -5475,34 +5577,37 @@ mod tests {
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
)];

// Create a dynamic filter manually
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
let dynamic_filter_clone = Arc::clone(&dynamic_filter);

// Create HashJoinExec with the dynamic filter
let mut join = HashJoinExec::try_new(
left,
right,
on,
None,
&JoinType::Inner,
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
false,
)?;
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
filter: dynamic_filter,
build_accumulator: OnceLock::new(),
});
let (join, dynamic_filter) =
hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?;

// Execute the join
let stream = join.execute(0, task_ctx)?;
let _batches = common::collect(stream).await?;

// Even with empty build side, the dynamic filter should be marked as complete
// wait_complete() should return immediately
dynamic_filter_clone.wait_complete().await;
dynamic_filter.wait_complete().await;

Ok(())
}

#[tokio::test]
async fn test_hash_join_skips_probe_on_empty_build_after_partition_bounds_report()
-> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let (left, right, on) = empty_build_with_probe_error_inputs();

// Keep an extra consumer reference so execute() enables dynamic filter pushdown
// and enters the WaitPartitionBoundsReport path before deciding whether to poll
// the probe side.
let (join, dynamic_filter) =
hash_join_with_dynamic_filter(left, right, on, JoinType::Inner)?;

let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
assert!(batches.is_empty());

dynamic_filter.wait_complete().await;

Ok(())
}
Expand Down
32 changes: 29 additions & 3 deletions datafusion/physical-plan/src/joins/hash_join/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::{
BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType,
StatefulStreamResult, adjust_indices_by_join_type, apply_join_filter_to_indices,
build_batch_empty_build_side, build_batch_from_indices,
need_produce_result_in_final,
empty_build_side_produces_empty_result, need_produce_result_in_final,
},
};

Expand Down Expand Up @@ -406,6 +406,23 @@ impl HashJoinStream {
}
}

/// Returns the next state after the build side has been fully collected
/// and any required build-side coordination has completed.
fn state_after_build_ready(
has_filter: bool,
join_type: JoinType,
left_data: &JoinLeftData,
) -> HashJoinStreamState {
if !has_filter
&& left_data.map().is_empty()
&& empty_build_side_produces_empty_result(join_type)
{
HashJoinStreamState::Completed
} else {
HashJoinStreamState::FetchProbeBatch
}
}

/// Separate implementation function that unpins the [`HashJoinStream`] so
/// that partial borrows work correctly
fn poll_next_impl(
Expand Down Expand Up @@ -469,7 +486,12 @@ impl HashJoinStream {
if let Some(ref mut fut) = self.build_waiter {
ready!(fut.get_shared(cx))?;
}
self.state = HashJoinStreamState::FetchProbeBatch;
let build_side = self.build_side.try_as_ready()?;
self.state = Self::state_after_build_ready(
self.filter.is_some(),
self.join_type,
build_side.left_data.as_ref(),
);
Poll::Ready(Ok(StatefulStreamResult::Continue))
}

Expand Down Expand Up @@ -540,7 +562,11 @@ impl HashJoinStream {
}));
self.state = HashJoinStreamState::WaitPartitionBoundsReport;
} else {
self.state = HashJoinStreamState::FetchProbeBatch;
self.state = Self::state_after_build_ready(
self.filter.is_some(),
self.join_type,
left_data.as_ref(),
);
}

self.build_side = BuildSide::Ready(BuildSideReadyState { left_data });
Expand Down
78 changes: 41 additions & 37 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,22 @@ pub(crate) fn need_produce_result_in_final(join_type: JoinType) -> bool {
)
}

/// Returns true when an empty build side necessarily produces an empty result.
///
/// This is the shared source of truth for both state-machine short-circuiting
/// and `build_batch_empty_build_side`.
pub(crate) fn empty_build_side_produces_empty_result(join_type: JoinType) -> bool {
matches!(
join_type,
JoinType::Inner
| JoinType::Left
| JoinType::LeftSemi
| JoinType::LeftAnti
| JoinType::LeftMark
| JoinType::RightSemi
)
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a method on JoinType?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this reads more like join semantics than a hash-join utility. Moving it onto JoinType as something like JoinType::empty_build_side_produces_empty_result() would make the call sites a bit clearer and keep the rule next to the other join-type predicates.

pub(crate) fn get_final_indices_from_shared_bitmap(
shared_bitmap: &SharedBitmapBuilder,
join_type: JoinType,
Expand Down Expand Up @@ -1060,47 +1076,35 @@ pub(crate) fn build_batch_empty_build_side(
column_indices: &[ColumnIndex],
join_type: JoinType,
) -> Result<RecordBatch> {
match join_type {
// these join types only return data if the left side is not empty, so we return an
// empty RecordBatch
JoinType::Inner
| JoinType::Left
| JoinType::LeftSemi
| JoinType::RightSemi
| JoinType::LeftAnti
| JoinType::LeftMark => Ok(RecordBatch::new_empty(Arc::new(schema.clone()))),
if empty_build_side_produces_empty_result(join_type) {
// These join types only return data if the left side is not empty.
return Ok(RecordBatch::new_empty(Arc::new(schema.clone())));
}
Comment on lines +1063 to +1066
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we keep this? this is technically unreachable no? 🤔

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LiaCastaneda
I see your point - in the current hash-join control flow, that branch is effectively unreachable from the main runtime path because stream.rs moves empty-build + empty-result join types directly to Completed, and stream.rs now asserts that invariant.

But pub(crate) fn build_batch_empty_build_side is a shared helper with its own contract, and this guard is relevant because:

  1. It keeps the helper semantically correct even if called from another path later.
  2. It is a safe fallback in release builds if the upstream invariant is ever accidentally violated.
  3. It makes the helper behavior explicit and self-contained, rather than relying entirely on one caller’s state machine.


// the remaining joins will return data for the right columns and null for the left ones
JoinType::Right | JoinType::Full | JoinType::RightAnti | JoinType::RightMark => {
let num_rows = probe_batch.num_rows();
if schema.fields().is_empty() {
return new_empty_schema_batch(schema, num_rows);
}
let mut columns: Vec<Arc<dyn Array>> =
Vec::with_capacity(schema.fields().len());

for column_index in column_indices {
let array = match column_index.side {
// left -> null array
JoinSide::Left => new_null_array(
build_batch.column(column_index.index).data_type(),
num_rows,
),
// right -> respective right array
JoinSide::Right => Arc::clone(probe_batch.column(column_index.index)),
// right mark -> unset boolean array as there are no matches on the left side
JoinSide::None => Arc::new(BooleanArray::new(
BooleanBuffer::new_unset(num_rows),
None,
)),
};
// The remaining joins return right-side rows and nulls for the left side.
let num_rows = probe_batch.num_rows();
if schema.fields().is_empty() {
return new_empty_schema_batch(schema, num_rows);
}

columns.push(array);
let columns = column_indices
.iter()
.map(|column_index| match column_index.side {
// left -> null array
JoinSide::Left => new_null_array(
build_batch.column(column_index.index).data_type(),
num_rows,
),
// right -> respective right array
JoinSide::Right => Arc::clone(probe_batch.column(column_index.index)),
// right mark -> unset boolean array as there are no matches on the left side
JoinSide::None => {
Arc::new(BooleanArray::new(BooleanBuffer::new_unset(num_rows), None))
}
})
.collect();

Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
}
}
Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
}

/// The input is the matched indices for left and right and
Expand Down
Loading