Skip to content

Commit 83043e7

Browse files
Dandandanclaude
andcommitted
Coalesce build-side batches in hash join and fix BatchCoalescer min size
Coalesce small build-side batches into target_batch_size in collect_left_input, so build_batch_from_indices hits the fast SingleBatch/take path more often instead of MultiBatch/interleave. Also add .max(8) guards to all LimitedBatchCoalescer call sites to avoid debug_assert panic in arrow-select BatchCoalescer when target_batch_size < array capacity (upstream arrow bug). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 37531f6 commit 83043e7

5 files changed

Lines changed: 54 additions & 28 deletions

File tree

datafusion/physical-plan/src/coalesce_partitions.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
252252
)),
253253
coalescer: LimitedBatchCoalescer::new(
254254
self.schema(),
255-
batch_size,
255+
batch_size.max(8),
256256
None, // fetch is already handled by ObservedStream
257257
),
258258
completed: false,
@@ -401,15 +401,13 @@ impl Stream for CoalescedStream {
401401
self.completed = true;
402402
self.coalescer.finish()?;
403403
}
404-
Some(Ok(batch)) => {
405-
match self.coalescer.push_batch(batch)? {
406-
PushBatchStatus::Continue => {}
407-
PushBatchStatus::LimitReached => {
408-
self.completed = true;
409-
self.coalescer.finish()?;
410-
}
404+
Some(Ok(batch)) => match self.coalescer.push_batch(batch)? {
405+
PushBatchStatus::Continue => {}
406+
PushBatchStatus::LimitReached => {
407+
self.completed = true;
408+
self.coalescer.finish()?;
411409
}
412-
}
410+
},
413411
other => return Poll::Ready(other),
414412
}
415413
}

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ use crate::{
6666
};
6767

6868
use arrow::array::{ArrayRef, BooleanBufferBuilder, UInt64Array};
69+
use arrow::compute::BatchCoalescer;
6970
use arrow::datatypes::SchemaRef;
7071
use arrow::record_batch::RecordBatch;
7172
use arrow::util::bit_util;
@@ -2001,6 +2002,25 @@ async fn collect_left_input(
20012002
bounds_accumulators,
20022003
} = state;
20032004

2005+
// Coalesce small batches into larger ones to reduce the number of
2006+
// build-side batches. Fewer (larger) batches mean that
2007+
// `build_batch_from_indices` is more likely to take the fast
2008+
// `SingleBatch`/`take` path instead of the slower
2009+
// `MultiBatch`/`interleave` path.
2010+
let batches = {
2011+
let target_batch_size = config.execution.batch_size.max(8);
2012+
let mut coalescer = BatchCoalescer::new(Arc::clone(&schema), target_batch_size);
2013+
for batch in batches {
2014+
coalescer.push_batch(batch)?;
2015+
}
2016+
coalescer.finish_buffered_batch()?;
2017+
let mut coalesced = Vec::new();
2018+
while let Some(batch) = coalescer.next_completed_batch() {
2019+
coalesced.push(batch);
2020+
}
2021+
coalesced
2022+
};
2023+
20042024
// Compute bounds
20052025
let mut bounds = match bounds_accumulators {
20062026
Some(accumulators) if num_rows > 0 => {

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ impl HashJoinStream {
382382
) -> Self {
383383
// Create output buffer with coalescing and optional fetch limit.
384384
let output_buffer =
385-
LimitedBatchCoalescer::new(Arc::clone(&schema), batch_size, fetch);
385+
LimitedBatchCoalescer::new(Arc::clone(&schema), batch_size.max(8), fetch);
386386

387387
Self {
388388
partition,

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1984,16 +1984,14 @@ fn and_bitmap_with_boolean_buffer(
19841984
macro_rules! dispatch_elementwise {
19851985
($data_type:expr, $equal_bits:expr, $left_indices:expr, $null_equality:expr, $action:ident) => {
19861986
match $data_type {
1987-
DataType::Null => {
1988-
match $null_equality {
1989-
NullEquality::NullEqualsNothing => {
1990-
for i in 0..$left_indices.len() {
1991-
$equal_bits.set_bit(i, false);
1992-
}
1987+
DataType::Null => match $null_equality {
1988+
NullEquality::NullEqualsNothing => {
1989+
for i in 0..$left_indices.len() {
1990+
$equal_bits.set_bit(i, false);
19931991
}
1994-
NullEquality::NullEqualsNull => {}
19951992
}
1996-
}
1993+
NullEquality::NullEqualsNull => {}
1994+
},
19971995
DataType::Boolean => $action!(BooleanArray),
19981996
DataType::Int8 => $action!(Int8Array),
19991997
DataType::Int16 => $action!(Int16Array),
@@ -2059,7 +2057,13 @@ fn compare_rows_elementwise(
20592057
}};
20602058
}
20612059

2062-
dispatch_elementwise!(left_array.data_type(), equal_bits, left_indices, null_equality, compare_elementwise);
2060+
dispatch_elementwise!(
2061+
left_array.data_type(),
2062+
equal_bits,
2063+
left_indices,
2064+
null_equality,
2065+
compare_elementwise
2066+
);
20632067
true
20642068
}
20652069

@@ -2152,7 +2156,13 @@ fn compare_rows_elementwise_multi(
21522156
}};
21532157
}
21542158

2155-
dispatch_elementwise!(right_array.data_type(), equal_bits, left_indices, null_equality, compare_multi);
2159+
dispatch_elementwise!(
2160+
right_array.data_type(),
2161+
equal_bits,
2162+
left_indices,
2163+
null_equality,
2164+
compare_multi
2165+
);
21562166
true
21572167
}
21582168

@@ -2172,10 +2182,8 @@ fn do_compare_elementwise_multi<A: ArrayAccessor>(
21722182
A::Item: PartialEq,
21732183
{
21742184
let right_nulls = right.nulls();
2175-
let has_nulls = right.null_count() > 0
2176-
|| left_arrays
2177-
.iter()
2178-
.any(|a| a.null_count() > 0);
2185+
let has_nulls =
2186+
right.null_count() > 0 || left_arrays.iter().any(|a| a.null_count() > 0);
21792187
let num_rows = left_indices.len();
21802188

21812189
if !has_nulls {
@@ -2204,8 +2212,8 @@ fn do_compare_elementwise_multi<A: ArrayAccessor>(
22042212
let batch_idx = (packed >> 32) as usize;
22052213
let row_idx = (packed & 0xFFFFFFFF) as usize;
22062214
let r_idx = right_indices[i] as usize;
2207-
let l_null = left_nulls_per_batch[batch_idx]
2208-
.is_some_and(|n| !n.is_valid(row_idx));
2215+
let l_null =
2216+
left_nulls_per_batch[batch_idx].is_some_and(|n| !n.is_valid(row_idx));
22092217
let r_null = right_nulls.is_some_and(|n| !n.is_valid(r_idx));
22102218

22112219
let is_equal = match (l_null, r_null) {

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1595,8 +1595,8 @@ impl PerPartitionStream {
15951595
baseline_metrics: BaselineMetrics,
15961596
batch_size: Option<usize>,
15971597
) -> Self {
1598-
let batch_coalescer =
1599-
batch_size.map(|s| LimitedBatchCoalescer::new(Arc::clone(&schema), s, None));
1598+
let batch_coalescer = batch_size
1599+
.map(|s| LimitedBatchCoalescer::new(Arc::clone(&schema), s.max(8), None));
16001600
Self {
16011601
schema,
16021602
receiver,

0 commit comments

Comments
 (0)