Skip to content

Commit 97b0203

Browse files
Dandandanclaude
andcommitted
Simplify equal_rows_arr: remove dyn Fn, hoist indices, use bitwise AND
- Remove left_index_to_row closure parameter (only single-batch uses element-wise path, where packed == row index) - Hoist fallback row_indices and il_indices outside the key loop (lazy-allocated, computed once, reused across keys) - Replace scalar bitmap AND loop with word-level byte-slice AND via and_bitmap_with_boolean_buffer helper Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent e2043ad commit 97b0203

1 file changed

Lines changed: 47 additions & 31 deletions

File tree

  • datafusion/physical-plan/src/joins

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 47 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1137,7 +1137,7 @@ pub(crate) fn build_batch_from_indices(
11371137
/// The resulting batch has [Schema] `schema`.
11381138
pub(crate) fn build_batch_empty_build_side(
11391139
schema: &Schema,
1140-
build_batch: &RecordBatch,
1140+
left_schema: &Schema,
11411141
probe_batch: &RecordBatch,
11421142
column_indices: &[ColumnIndex],
11431143
join_type: JoinType,
@@ -1165,7 +1165,7 @@ pub(crate) fn build_batch_empty_build_side(
11651165
let array = match column_index.side {
11661166
// left -> null array
11671167
JoinSide::Left => new_null_array(
1168-
build_batch.column(column_index.index).data_type(),
1168+
left_schema.field(column_index.index).data_type(),
11691169
num_rows,
11701170
),
11711171
// right -> respective right array
@@ -1878,6 +1878,11 @@ pub(super) fn equal_rows_arr(
18781878

18791879
let single_batch = left_arrays_per_batch.len() == 1;
18801880

1881+
// Pre-compute fallback index arrays once (used across all keys that need fallback).
1882+
// Lazy: only allocated if a key actually needs the fallback path.
1883+
let mut fallback_row_indices: Option<UInt32Array> = None;
1884+
let mut fallback_il_indices: Option<Vec<(usize, usize)>> = None;
1885+
18811886
for key_idx in 0..num_keys {
18821887
let right_array = &right_arrays[key_idx];
18831888

@@ -1892,46 +1897,44 @@ pub(super) fn equal_rows_arr(
18921897
&left_arrays_per_batch[0][key_idx],
18931898
right_array,
18941899
null_equality,
1895-
|packed| packed as usize,
18961900
);
18971901

18981902
if !handled {
18991903
// Fallback: materialize via take/interleave, then eq
19001904
let arr_left = if single_batch {
1901-
let row_indices: UInt32Array =
1902-
left_indices.iter().map(|&v| v as u32).collect();
1905+
let row_indices = fallback_row_indices.get_or_insert_with(|| {
1906+
left_indices.iter().map(|&v| v as u32).collect()
1907+
});
19031908
take(
19041909
left_arrays_per_batch[0][key_idx].as_ref(),
1905-
&row_indices,
1910+
row_indices,
19061911
None,
19071912
)?
19081913
} else {
1914+
let il_indices = fallback_il_indices.get_or_insert_with(|| {
1915+
left_indices
1916+
.iter()
1917+
.map(|&packed| {
1918+
let batch_idx = (packed >> 32) as usize;
1919+
let row_idx = (packed & 0xFFFFFFFF) as usize;
1920+
(batch_idx, row_idx)
1921+
})
1922+
.collect()
1923+
});
19091924
let arrays: Vec<&dyn Array> = left_arrays_per_batch
19101925
.iter()
19111926
.map(|batch_keys| batch_keys[key_idx].as_ref())
19121927
.collect();
1913-
let il_indices: Vec<(usize, usize)> = left_indices
1914-
.iter()
1915-
.map(|&packed| {
1916-
let batch_idx = (packed >> 32) as usize;
1917-
let row_idx = (packed & 0xFFFFFFFF) as usize;
1918-
(batch_idx, row_idx)
1919-
})
1920-
.collect();
1921-
compute::interleave(&arrays, &il_indices)?
1928+
compute::interleave(&arrays, il_indices)?
19221929
};
19231930
let arr_right = take(right_array.as_ref(), indices_right, None)?;
19241931
let eq_result = eq_dyn_null(&arr_left, &arr_right, null_equality)?;
19251932
// AND the result into our mutable bitmap.
1926-
// Null positions in eq_result are treated as "not equal".
1927-
let eq_values = eq_result.values();
1928-
let eq_nulls = eq_result.nulls();
1929-
for i in 0..num_rows {
1930-
if equal_bits.get_bit(i)
1931-
&& (!eq_values.value(i) || eq_nulls.is_some_and(|n| !n.is_valid(i)))
1932-
{
1933-
equal_bits.set_bit(i, false);
1934-
}
1933+
// Null positions in eq_result are treated as "not equal":
1934+
// first AND with the values, then AND with the validity bitmap.
1935+
and_bitmap_with_boolean_buffer(&mut equal_bits, eq_result.values());
1936+
if let Some(eq_nulls) = eq_result.nulls() {
1937+
and_bitmap_with_boolean_buffer(&mut equal_bits, eq_nulls.inner());
19351938
}
19361939
}
19371940
}
@@ -1948,9 +1951,23 @@ pub(super) fn equal_rows_arr(
19481951
))
19491952
}
19501953

1954+
/// AND a `BooleanBuffer` into a `BooleanBufferBuilder` in-place using
1955+
/// word-level operations on the underlying byte slices.
1956+
fn and_bitmap_with_boolean_buffer(
1957+
builder: &mut BooleanBufferBuilder,
1958+
rhs: &BooleanBuffer,
1959+
) {
1960+
let lhs = builder.as_slice_mut();
1961+
let rhs = rhs.inner().as_slice();
1962+
for (l, r) in lhs.iter_mut().zip(rhs.iter()) {
1963+
*l &= r;
1964+
}
1965+
}
1966+
19511967
/// Compare rows element-wise without materializing intermediate arrays.
19521968
/// Returns `true` if the comparison was handled, `false` if fallback is needed.
19531969
///
1970+
/// Only works for single-batch builds where packed index == row index.
19541971
/// Clears bits in `equal_bits` where the left and right values at the
19551972
/// indexed positions are not equal (respecting `null_equality`).
19561973
fn compare_rows_elementwise(
@@ -1960,7 +1977,6 @@ fn compare_rows_elementwise(
19601977
left_array: &ArrayRef,
19611978
right_array: &ArrayRef,
19621979
null_equality: NullEquality,
1963-
left_index_to_row: impl Fn(u64) -> usize,
19641980
) -> bool {
19651981
// Nested types need special comparison logic, fall back
19661982
if left_array.data_type().is_nested() {
@@ -1978,7 +1994,6 @@ fn compare_rows_elementwise(
19781994
&left,
19791995
&right,
19801996
null_equality,
1981-
&left_index_to_row,
19821997
);
19831998
}};
19841999
}
@@ -2028,16 +2043,17 @@ fn compare_rows_elementwise(
20282043
}
20292044

20302045
/// Inner loop for element-wise comparison. Generic over array type via `ArrayAccessor`.
2031-
/// Compares `left.value(left_index_to_row(left_indices[i]))` against
2046+
/// Compares `left.value(left_indices[i] as usize)` against
20322047
/// `right.value(right_indices[i])` for each row, clearing bits that don't match.
2048+
///
2049+
/// Only valid for single-batch builds where packed index == row index.
20332050
fn do_compare_elementwise<A: ArrayAccessor>(
20342051
equal_bits: &mut BooleanBufferBuilder,
20352052
left_indices: &[u64],
20362053
right_indices: &[u32],
20372054
left: &A,
20382055
right: &A,
20392056
null_equality: NullEquality,
2040-
left_index_to_row: &dyn Fn(u64) -> usize,
20412057
) where
20422058
A::Item: PartialEq,
20432059
{
@@ -2053,7 +2069,7 @@ fn do_compare_elementwise<A: ArrayAccessor>(
20532069
if !equal_bits.get_bit(i) {
20542070
continue;
20552071
}
2056-
let l_idx = left_index_to_row(left_indices[i]);
2072+
let l_idx = left_indices[i] as usize;
20572073
let r_idx = right_indices[i] as usize;
20582074
if left.value(l_idx) != right.value(r_idx) {
20592075
equal_bits.set_bit(i, false);
@@ -2064,7 +2080,7 @@ fn do_compare_elementwise<A: ArrayAccessor>(
20642080
if !equal_bits.get_bit(i) {
20652081
continue;
20662082
}
2067-
let l_idx = left_index_to_row(left_indices[i]);
2083+
let l_idx = left_indices[i] as usize;
20682084
let r_idx = right_indices[i] as usize;
20692085
let l_null = left_nulls.is_some_and(|nulls| !nulls.is_valid(l_idx));
20702086
let r_null = right_nulls.is_some_and(|nulls| !nulls.is_valid(r_idx));
@@ -3205,7 +3221,7 @@ mod tests {
32053221

32063222
let result = build_batch_empty_build_side(
32073223
&empty_schema,
3208-
&build_batch,
3224+
&build_batch.schema(),
32093225
&probe_batch,
32103226
&[], // no column indices with empty projection
32113227
JoinType::Right,

0 commit comments

Comments
 (0)