Skip to content

Commit 42aa7b2

Browse files
Dandandanclaude
andcommitted
Extend element-wise comparison to multi-batch builds in equal_rows_arr
The multi-batch path in equal_rows_arr was using interleave() to materialize intermediate arrays, which is slower than the old take()-on-concatenated-keys approach. This adds compare_rows_elementwise_multi which decodes packed indices (batch_idx << 32 | row_idx) and compares values directly from per-batch arrays, avoiding all intermediate allocation for common key types. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 9986fb0 commit 42aa7b2

1 file changed

Lines changed: 155 additions & 6 deletions

File tree

  • datafusion/physical-plan/src/joins

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 155 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1886,18 +1886,29 @@ pub(super) fn equal_rows_arr(
18861886
for key_idx in 0..num_keys {
18871887
let right_array = &right_arrays[key_idx];
18881888

1889-
// For single-batch builds, try element-wise comparison which avoids
1890-
// allocating intermediate arrays via take+eq per key column.
1891-
// Falls back to take+eq for nested/unsupported types or multi-batch.
1892-
let handled = single_batch
1893-
&& compare_rows_elementwise(
1889+
// Try element-wise comparison which avoids allocating intermediate arrays.
1890+
// Works for both single-batch and multi-batch builds on common types.
1891+
// Falls back to take/interleave+eq for nested/unsupported types.
1892+
let handled = if single_batch {
1893+
compare_rows_elementwise(
18941894
&mut equal_bits,
18951895
left_indices,
18961896
right_indices,
18971897
&left_arrays_per_batch[0][key_idx],
18981898
right_array,
18991899
null_equality,
1900-
);
1900+
)
1901+
} else {
1902+
compare_rows_elementwise_multi(
1903+
&mut equal_bits,
1904+
left_indices,
1905+
right_indices,
1906+
left_arrays_per_batch,
1907+
key_idx,
1908+
right_array,
1909+
null_equality,
1910+
)
1911+
};
19011912

19021913
if !handled {
19031914
// Fallback: materialize via take/interleave, then eq
@@ -2097,6 +2108,144 @@ fn do_compare_elementwise<A: ArrayAccessor>(
20972108
}
20982109
}
20992110

2111+
/// Compare rows element-wise for multi-batch builds without materializing
2112+
/// intermediate arrays. Decodes packed indices to (batch_idx, row_idx) and
2113+
/// accesses the correct batch array directly.
2114+
/// Returns `true` if the comparison was handled, `false` if fallback is needed.
2115+
fn compare_rows_elementwise_multi(
2116+
equal_bits: &mut BooleanBufferBuilder,
2117+
left_indices: &[u64],
2118+
right_indices: &[u32],
2119+
left_arrays_per_batch: &[Vec<ArrayRef>],
2120+
key_idx: usize,
2121+
right_array: &ArrayRef,
2122+
null_equality: NullEquality,
2123+
) -> bool {
2124+
if right_array.data_type().is_nested() {
2125+
return false;
2126+
}
2127+
2128+
macro_rules! compare_multi {
2129+
($array_type:ty) => {{
2130+
let left_typed: Vec<&$array_type> = left_arrays_per_batch
2131+
.iter()
2132+
.map(|keys| keys[key_idx].as_any().downcast_ref::<$array_type>().unwrap())
2133+
.collect();
2134+
let right = right_array.as_any().downcast_ref::<$array_type>().unwrap();
2135+
do_compare_elementwise_multi(
2136+
equal_bits,
2137+
left_indices,
2138+
right_indices,
2139+
&left_typed,
2140+
&right,
2141+
null_equality,
2142+
);
2143+
}};
2144+
}
2145+
2146+
match right_array.data_type() {
2147+
DataType::Null => {
2148+
match null_equality {
2149+
NullEquality::NullEqualsNothing => {
2150+
for i in 0..left_indices.len() {
2151+
equal_bits.set_bit(i, false);
2152+
}
2153+
}
2154+
NullEquality::NullEqualsNull => {}
2155+
}
2156+
}
2157+
DataType::Boolean => compare_multi!(BooleanArray),
2158+
DataType::Int8 => compare_multi!(Int8Array),
2159+
DataType::Int16 => compare_multi!(Int16Array),
2160+
DataType::Int32 => compare_multi!(Int32Array),
2161+
DataType::Int64 => compare_multi!(Int64Array),
2162+
DataType::UInt8 => compare_multi!(UInt8Array),
2163+
DataType::UInt16 => compare_multi!(UInt16Array),
2164+
DataType::UInt32 => compare_multi!(UInt32Array),
2165+
DataType::UInt64 => compare_multi!(UInt64Array),
2166+
DataType::Float32 => compare_multi!(Float32Array),
2167+
DataType::Float64 => compare_multi!(Float64Array),
2168+
DataType::Binary => compare_multi!(BinaryArray),
2169+
DataType::BinaryView => compare_multi!(BinaryViewArray),
2170+
DataType::FixedSizeBinary(_) => compare_multi!(FixedSizeBinaryArray),
2171+
DataType::LargeBinary => compare_multi!(LargeBinaryArray),
2172+
DataType::Utf8 => compare_multi!(StringArray),
2173+
DataType::Utf8View => compare_multi!(StringViewArray),
2174+
DataType::LargeUtf8 => compare_multi!(LargeStringArray),
2175+
DataType::Decimal128(..) => compare_multi!(Decimal128Array),
2176+
DataType::Timestamp(time_unit, None) => match time_unit {
2177+
TimeUnit::Second => compare_multi!(TimestampSecondArray),
2178+
TimeUnit::Millisecond => compare_multi!(TimestampMillisecondArray),
2179+
TimeUnit::Microsecond => compare_multi!(TimestampMicrosecondArray),
2180+
TimeUnit::Nanosecond => compare_multi!(TimestampNanosecondArray),
2181+
},
2182+
DataType::Date32 => compare_multi!(Date32Array),
2183+
DataType::Date64 => compare_multi!(Date64Array),
2184+
_ => return false,
2185+
}
2186+
true
2187+
}
2188+
2189+
/// Inner loop for multi-batch element-wise comparison.
2190+
/// Decodes packed indices to access the correct batch array per row.
2191+
///
2192+
/// Takes `&[A]` where `A` is a reference type like `&Int32Array` that implements
2193+
/// `ArrayAccessor`. Null checking works through auto-deref to the `Array` trait.
2194+
fn do_compare_elementwise_multi<A: ArrayAccessor>(
2195+
equal_bits: &mut BooleanBufferBuilder,
2196+
left_indices: &[u64],
2197+
right_indices: &[u32],
2198+
left_arrays: &[A],
2199+
right: &A,
2200+
null_equality: NullEquality,
2201+
) where
2202+
A::Item: PartialEq,
2203+
{
2204+
let right_nulls = right.nulls();
2205+
let has_nulls = right.null_count() > 0
2206+
|| left_arrays
2207+
.iter()
2208+
.any(|a| a.null_count() > 0);
2209+
let num_rows = left_indices.len();
2210+
2211+
if !has_nulls {
2212+
for i in 0..num_rows {
2213+
if !equal_bits.get_bit(i) {
2214+
continue;
2215+
}
2216+
let packed = left_indices[i];
2217+
let batch_idx = (packed >> 32) as usize;
2218+
let row_idx = (packed & 0xFFFFFFFF) as usize;
2219+
let r_idx = right_indices[i] as usize;
2220+
if left_arrays[batch_idx].value(row_idx) != right.value(r_idx) {
2221+
equal_bits.set_bit(i, false);
2222+
}
2223+
}
2224+
} else {
2225+
for i in 0..num_rows {
2226+
if !equal_bits.get_bit(i) {
2227+
continue;
2228+
}
2229+
let packed = left_indices[i];
2230+
let batch_idx = (packed >> 32) as usize;
2231+
let row_idx = (packed & 0xFFFFFFFF) as usize;
2232+
let r_idx = right_indices[i] as usize;
2233+
let left = &left_arrays[batch_idx];
2234+
let l_null = left.nulls().is_some_and(|n| !n.is_valid(row_idx));
2235+
let r_null = right_nulls.is_some_and(|n| !n.is_valid(r_idx));
2236+
2237+
let is_equal = match (l_null, r_null) {
2238+
(true, true) => null_equality == NullEquality::NullEqualsNull,
2239+
(true, false) | (false, true) => false,
2240+
(false, false) => left.value(row_idx) == right.value(r_idx),
2241+
};
2242+
if !is_equal {
2243+
equal_bits.set_bit(i, false);
2244+
}
2245+
}
2246+
}
2247+
}
2248+
21002249
// version of eq_dyn supporting equality on null arrays
21012250
fn eq_dyn_null(
21022251
left: &dyn Array,

0 commit comments

Comments
 (0)