Skip to content

Commit e2043ad

Browse files
Dandandanclaude
andcommitted
Optimize equal_rows_arr with element-wise comparison
Avoid allocating intermediate arrays (take+eq+and) per key column in hash collision resolution. For common types (primitives, strings), compare values element-wise using a mutable BooleanBufferBuilder. Falls back to take+eq for nested/unsupported types. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent acde8fe commit e2043ad

1 file changed

Lines changed: 201 additions & 45 deletions

File tree

  • datafusion/physical-plan/src/joins

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 201 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ pub use crate::joins::{JoinOn, JoinOnRef};
4141

4242
use ahash::RandomState;
4343
use arrow::array::{
44-
Array, ArrowPrimitiveType, BooleanBufferBuilder, NativeAdapter, PrimitiveArray,
45-
RecordBatch, RecordBatchOptions, UInt32Array, UInt32Builder, UInt64Array,
46-
builder::UInt64Builder, downcast_array, new_null_array,
44+
Array, ArrayAccessor, ArrowPrimitiveType, BooleanBufferBuilder, NativeAdapter,
45+
PrimitiveArray, RecordBatch, RecordBatchOptions, UInt32Array, UInt32Builder,
46+
UInt64Array, builder::UInt64Builder, downcast_array, new_null_array,
4747
};
4848
use arrow::array::{
4949
ArrayRef, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array,
@@ -54,7 +54,7 @@ use arrow::array::{
5454
};
5555
use arrow::buffer::{BooleanBuffer, NullBuffer};
5656
use arrow::compute::kernels::cmp::eq;
57-
use arrow::compute::{self, FilterBuilder, and, take};
57+
use arrow::compute::{self, FilterBuilder, take};
5858
use arrow::datatypes::{
5959
ArrowNativeType, Field, Schema, SchemaBuilder, UInt32Type, UInt64Type,
6060
};
@@ -1866,54 +1866,77 @@ pub(super) fn equal_rows_arr(
18661866
return Ok((Vec::<u64>::new().into(), Vec::<u32>::new().into()));
18671867
}
18681868

1869+
let num_rows = indices_left.len();
18691870
let num_keys = right_arrays.len();
1871+
let left_indices = indices_left.values();
1872+
let right_indices = indices_right.values();
18701873

1871-
// Gather left-side key values using packed indices
1872-
let gather_left_key = |key_idx: usize| -> Result<ArrayRef> {
1873-
if left_arrays_per_batch.len() == 1 {
1874-
// Single batch: packed value == row index (batch_idx is 0),
1875-
// so we can use take directly with UInt32 indices.
1876-
let row_indices: UInt32Array =
1877-
indices_left.values().iter().map(|&v| v as u32).collect();
1878-
Ok(take(
1879-
left_arrays_per_batch[0][key_idx].as_ref(),
1880-
&row_indices,
1881-
None,
1882-
)?)
1883-
} else {
1884-
// Multiple batches: decode packed indices and use interleave
1885-
let arrays: Vec<&dyn Array> = left_arrays_per_batch
1886-
.iter()
1887-
.map(|batch_keys| batch_keys[key_idx].as_ref())
1888-
.collect();
1889-
let il_indices: Vec<(usize, usize)> = indices_left
1890-
.values()
1891-
.iter()
1892-
.map(|&packed| {
1893-
let batch_idx = (packed >> 32) as usize;
1894-
let row_idx = (packed & 0xFFFFFFFF) as usize;
1895-
(batch_idx, row_idx)
1896-
})
1897-
.collect();
1898-
Ok(compute::interleave(&arrays, &il_indices)?)
1899-
}
1900-
};
1874+
// Build a mutable bitmap: start all-true, clear bits where keys don't match.
1875+
// This avoids allocating intermediate arrays via take+eq+and per key column.
1876+
let mut equal_bits = BooleanBufferBuilder::new(num_rows);
1877+
equal_bits.append_n(num_rows, true);
19011878

1902-
let arr_left = gather_left_key(0)?;
1903-
let arr_right = take(right_arrays[0].as_ref(), indices_right, None)?;
1879+
let single_batch = left_arrays_per_batch.len() == 1;
19041880

1905-
let mut equal: BooleanArray = eq_dyn_null(&arr_left, &arr_right, null_equality)?;
1881+
for key_idx in 0..num_keys {
1882+
let right_array = &right_arrays[key_idx];
19061883

1907-
for (key_idx, right_key_array) in
1908-
right_arrays.iter().enumerate().take(num_keys).skip(1)
1909-
{
1910-
let arr_left = gather_left_key(key_idx)?;
1911-
let arr_right = take(right_key_array.as_ref(), indices_right, None)?;
1912-
let eq_result =
1913-
eq_dyn_null(arr_left.as_ref(), arr_right.as_ref(), null_equality)?;
1914-
equal = and(&equal, &eq_result)?;
1884+
// For single-batch builds, try element-wise comparison which avoids
1885+
// allocating intermediate arrays via take+eq per key column.
1886+
// Falls back to take+eq for nested/unsupported types or multi-batch.
1887+
let handled = single_batch
1888+
&& compare_rows_elementwise(
1889+
&mut equal_bits,
1890+
left_indices,
1891+
right_indices,
1892+
&left_arrays_per_batch[0][key_idx],
1893+
right_array,
1894+
null_equality,
1895+
|packed| packed as usize,
1896+
);
1897+
1898+
if !handled {
1899+
// Fallback: materialize via take/interleave, then eq
1900+
let arr_left = if single_batch {
1901+
let row_indices: UInt32Array =
1902+
left_indices.iter().map(|&v| v as u32).collect();
1903+
take(
1904+
left_arrays_per_batch[0][key_idx].as_ref(),
1905+
&row_indices,
1906+
None,
1907+
)?
1908+
} else {
1909+
let arrays: Vec<&dyn Array> = left_arrays_per_batch
1910+
.iter()
1911+
.map(|batch_keys| batch_keys[key_idx].as_ref())
1912+
.collect();
1913+
let il_indices: Vec<(usize, usize)> = left_indices
1914+
.iter()
1915+
.map(|&packed| {
1916+
let batch_idx = (packed >> 32) as usize;
1917+
let row_idx = (packed & 0xFFFFFFFF) as usize;
1918+
(batch_idx, row_idx)
1919+
})
1920+
.collect();
1921+
compute::interleave(&arrays, &il_indices)?
1922+
};
1923+
let arr_right = take(right_array.as_ref(), indices_right, None)?;
1924+
let eq_result = eq_dyn_null(&arr_left, &arr_right, null_equality)?;
1925+
// AND the result into our mutable bitmap.
1926+
// Null positions in eq_result are treated as "not equal".
1927+
let eq_values = eq_result.values();
1928+
let eq_nulls = eq_result.nulls();
1929+
for i in 0..num_rows {
1930+
if equal_bits.get_bit(i)
1931+
&& (!eq_values.value(i) || eq_nulls.is_some_and(|n| !n.is_valid(i)))
1932+
{
1933+
equal_bits.set_bit(i, false);
1934+
}
1935+
}
1936+
}
19151937
}
19161938

1939+
let equal = BooleanArray::new(equal_bits.finish(), None);
19171940
let filter_builder = FilterBuilder::new(&equal).optimize().build();
19181941

19191942
let left_filtered = filter_builder.filter(indices_left)?;
@@ -1925,6 +1948,139 @@ pub(super) fn equal_rows_arr(
19251948
))
19261949
}
19271950

1951+
/// Compare rows element-wise without materializing intermediate arrays.
1952+
/// Returns `true` if the comparison was handled, `false` if fallback is needed.
1953+
///
1954+
/// Clears bits in `equal_bits` where the left and right values at the
1955+
/// indexed positions are not equal (respecting `null_equality`).
1956+
fn compare_rows_elementwise(
1957+
equal_bits: &mut BooleanBufferBuilder,
1958+
left_indices: &[u64],
1959+
right_indices: &[u32],
1960+
left_array: &ArrayRef,
1961+
right_array: &ArrayRef,
1962+
null_equality: NullEquality,
1963+
left_index_to_row: impl Fn(u64) -> usize,
1964+
) -> bool {
1965+
// Nested types need special comparison logic, fall back
1966+
if left_array.data_type().is_nested() {
1967+
return false;
1968+
}
1969+
1970+
macro_rules! compare_elementwise {
1971+
($array_type:ty) => {{
1972+
let left = left_array.as_any().downcast_ref::<$array_type>().unwrap();
1973+
let right = right_array.as_any().downcast_ref::<$array_type>().unwrap();
1974+
do_compare_elementwise(
1975+
equal_bits,
1976+
left_indices,
1977+
right_indices,
1978+
&left,
1979+
&right,
1980+
null_equality,
1981+
&left_index_to_row,
1982+
);
1983+
}};
1984+
}
1985+
1986+
match left_array.data_type() {
1987+
DataType::Null => {
1988+
match null_equality {
1989+
NullEquality::NullEqualsNothing => {
1990+
// null != null, clear all bits
1991+
for i in 0..left_indices.len() {
1992+
equal_bits.set_bit(i, false);
1993+
}
1994+
}
1995+
NullEquality::NullEqualsNull => {} // null == null, keep bits
1996+
}
1997+
}
1998+
DataType::Boolean => compare_elementwise!(BooleanArray),
1999+
DataType::Int8 => compare_elementwise!(Int8Array),
2000+
DataType::Int16 => compare_elementwise!(Int16Array),
2001+
DataType::Int32 => compare_elementwise!(Int32Array),
2002+
DataType::Int64 => compare_elementwise!(Int64Array),
2003+
DataType::UInt8 => compare_elementwise!(UInt8Array),
2004+
DataType::UInt16 => compare_elementwise!(UInt16Array),
2005+
DataType::UInt32 => compare_elementwise!(UInt32Array),
2006+
DataType::UInt64 => compare_elementwise!(UInt64Array),
2007+
DataType::Float32 => compare_elementwise!(Float32Array),
2008+
DataType::Float64 => compare_elementwise!(Float64Array),
2009+
DataType::Binary => compare_elementwise!(BinaryArray),
2010+
DataType::BinaryView => compare_elementwise!(BinaryViewArray),
2011+
DataType::FixedSizeBinary(_) => compare_elementwise!(FixedSizeBinaryArray),
2012+
DataType::LargeBinary => compare_elementwise!(LargeBinaryArray),
2013+
DataType::Utf8 => compare_elementwise!(StringArray),
2014+
DataType::Utf8View => compare_elementwise!(StringViewArray),
2015+
DataType::LargeUtf8 => compare_elementwise!(LargeStringArray),
2016+
DataType::Decimal128(..) => compare_elementwise!(Decimal128Array),
2017+
DataType::Timestamp(time_unit, None) => match time_unit {
2018+
TimeUnit::Second => compare_elementwise!(TimestampSecondArray),
2019+
TimeUnit::Millisecond => compare_elementwise!(TimestampMillisecondArray),
2020+
TimeUnit::Microsecond => compare_elementwise!(TimestampMicrosecondArray),
2021+
TimeUnit::Nanosecond => compare_elementwise!(TimestampNanosecondArray),
2022+
},
2023+
DataType::Date32 => compare_elementwise!(Date32Array),
2024+
DataType::Date64 => compare_elementwise!(Date64Array),
2025+
_ => return false, // Unsupported type, use fallback
2026+
}
2027+
true
2028+
}
2029+
2030+
/// Inner loop for element-wise comparison. Generic over array type via `ArrayAccessor`.
2031+
/// Compares `left.value(left_index_to_row(left_indices[i]))` against
2032+
/// `right.value(right_indices[i])` for each row, clearing bits that don't match.
2033+
fn do_compare_elementwise<A: ArrayAccessor>(
2034+
equal_bits: &mut BooleanBufferBuilder,
2035+
left_indices: &[u64],
2036+
right_indices: &[u32],
2037+
left: &A,
2038+
right: &A,
2039+
null_equality: NullEquality,
2040+
left_index_to_row: &dyn Fn(u64) -> usize,
2041+
) where
2042+
A::Item: PartialEq,
2043+
{
2044+
let left_nulls = left.nulls();
2045+
let right_nulls = right.nulls();
2046+
let has_nulls = left.null_count() > 0 || right.null_count() > 0;
2047+
2048+
let num_rows = left_indices.len();
2049+
2050+
if !has_nulls {
2051+
// Fast path: no nulls, just compare values directly
2052+
for i in 0..num_rows {
2053+
if !equal_bits.get_bit(i) {
2054+
continue;
2055+
}
2056+
let l_idx = left_index_to_row(left_indices[i]);
2057+
let r_idx = right_indices[i] as usize;
2058+
if left.value(l_idx) != right.value(r_idx) {
2059+
equal_bits.set_bit(i, false);
2060+
}
2061+
}
2062+
} else {
2063+
for i in 0..num_rows {
2064+
if !equal_bits.get_bit(i) {
2065+
continue;
2066+
}
2067+
let l_idx = left_index_to_row(left_indices[i]);
2068+
let r_idx = right_indices[i] as usize;
2069+
let l_null = left_nulls.is_some_and(|nulls| !nulls.is_valid(l_idx));
2070+
let r_null = right_nulls.is_some_and(|nulls| !nulls.is_valid(r_idx));
2071+
2072+
let is_equal = match (l_null, r_null) {
2073+
(true, true) => null_equality == NullEquality::NullEqualsNull,
2074+
(true, false) | (false, true) => false,
2075+
(false, false) => left.value(l_idx) == right.value(r_idx),
2076+
};
2077+
if !is_equal {
2078+
equal_bits.set_bit(i, false);
2079+
}
2080+
}
2081+
}
2082+
}
2083+
19282084
// version of eq_dyn supporting equality on null arrays
19292085
fn eq_dyn_null(
19302086
left: &dyn Array,

0 commit comments

Comments
 (0)