Skip to content

Commit b5380c9

Browse files
committed
WIP
1 parent 64c0e47 commit b5380c9

3 files changed

Lines changed: 134 additions & 120 deletions

File tree

datafusion/physical-plan/src/joins/array_map.rs

Lines changed: 52 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -157,12 +157,17 @@ impl ArrayMap {
157157
max_val.wrapping_sub(min_val)
158158
}
159159

160-
/// Creates a new [`ArrayMap`] from the given array of join keys.
160+
/// Creates a new [`ArrayMap`] from per-batch arrays of join keys.
161161
///
162-
/// Note: This function processes only the non-null values in the input `array`,
162+
/// Note: This function processes only the non-null values in the input arrays,
163163
/// ignoring any rows where the key is `NULL`.
164164
///
165-
pub(crate) fn try_new(array: &ArrayRef, min_val: u64, max_val: u64) -> Result<Self> {
165+
pub(crate) fn try_new(
166+
arrays: &[&ArrayRef],
167+
total_num_rows: usize,
168+
min_val: u64,
169+
max_val: u64,
170+
) -> Result<Self> {
166171
let range = max_val.wrapping_sub(min_val);
167172
if range >= usize::MAX as u64 {
168173
return internal_err!("ArrayMap key range is too large to be allocated.");
@@ -173,10 +178,16 @@ impl ArrayMap {
173178
let mut next: Vec<u32> = vec![];
174179
let mut num_of_distinct_key = 0;
175180

181+
let data_type = arrays
182+
.first()
183+
.map(|a| a.data_type().clone())
184+
.unwrap_or(DataType::Int32);
185+
176186
downcast_supported_integer!(
177-
array.data_type() => (
178-
fill_data,
179-
array,
187+
&data_type => (
188+
fill_data_batched,
189+
arrays,
190+
total_num_rows,
180191
min_val,
181192
&mut data,
182193
&mut next,
@@ -192,8 +203,9 @@ impl ArrayMap {
192203
})
193204
}
194205

195-
fn fill_data<T: ArrowNumericType>(
196-
array: &ArrayRef,
206+
fn fill_data_batched<T: ArrowNumericType>(
207+
arrays: &[&ArrayRef],
208+
total_num_rows: usize,
197209
offset_val: u64,
198210
data: &mut [u32],
199211
next: &mut Vec<u32>,
@@ -202,25 +214,32 @@ impl ArrayMap {
202214
where
203215
T::Native: AsPrimitive<u64>,
204216
{
205-
let arr = array.as_primitive::<T>();
206217
// Iterate in reverse to maintain FIFO order when there are duplicate keys.
207-
for (i, val) in arr.iter().enumerate().rev() {
208-
if let Some(val) = val {
209-
let key: u64 = val.as_();
210-
let idx = key.wrapping_sub(offset_val) as usize;
211-
if idx >= data.len() {
212-
return internal_err!("failed build Array idx >= data.len()");
213-
}
214-
215-
if data[idx] != 0 {
216-
if next.is_empty() {
217-
*next = vec![0; array.len()]
218+
// We iterate batches in reverse, and within each batch iterate rows in reverse,
219+
// using a flat index that spans all batches.
220+
let mut flat_offset = total_num_rows;
221+
for array in arrays.iter().rev() {
222+
let arr = array.as_primitive::<T>();
223+
flat_offset -= arr.len();
224+
for (row_idx, val) in arr.iter().enumerate().rev() {
225+
if let Some(val) = val {
226+
let key: u64 = val.as_();
227+
let idx = key.wrapping_sub(offset_val) as usize;
228+
if idx >= data.len() {
229+
return internal_err!("failed build Array idx >= data.len()");
218230
}
219-
next[i] = data[idx]
220-
} else {
221-
*num_of_distinct_key += 1;
231+
let flat_idx = flat_offset + row_idx;
232+
233+
if data[idx] != 0 {
234+
if next.is_empty() {
235+
*next = vec![0; total_num_rows]
236+
}
237+
next[flat_idx] = data[idx]
238+
} else {
239+
*num_of_distinct_key += 1;
240+
}
241+
data[idx] = flat_idx as u32 + 1;
222242
}
223-
data[idx] = (i) as u32 + 1;
224243
}
225244
}
226245
Ok(())
@@ -419,7 +438,7 @@ mod tests {
419438
#[test]
420439
fn test_array_map_limit_offset_duplicate_elements() -> Result<()> {
421440
let build: ArrayRef = Arc::new(Int32Array::from(vec![1, 1, 2]));
422-
let map = ArrayMap::try_new(&build, 1, 2)?;
441+
let map = ArrayMap::try_new(&[&build], build.len(), 1, 2)?;
423442
let probe = [Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef];
424443

425444
let mut prob_idx = Vec::new();
@@ -450,7 +469,7 @@ mod tests {
450469
#[test]
451470
fn test_array_map_with_limit_and_misses() -> Result<()> {
452471
let build: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
453-
let map = ArrayMap::try_new(&build, 1, 2)?;
472+
let map = ArrayMap::try_new(&[&build], build.len(), 1, 2)?;
454473
let probe = [Arc::new(Int32Array::from(vec![10, 1, 2])) as ArrayRef];
455474

456475
let (mut p_idx, mut b_idx) = (vec![], vec![]);
@@ -483,7 +502,7 @@ mod tests {
483502
#[test]
484503
fn test_array_map_with_build_duplicates_and_misses() -> Result<()> {
485504
let build_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 1]));
486-
let array_map = ArrayMap::try_new(&build_array, 1, 1)?;
505+
let array_map = ArrayMap::try_new(&[&build_array], build_array.len(), 1, 1)?;
487506
// prob: 10(m), 1(h1, h2), 20(m), 1(h1, h2)
488507
let probe_array: ArrayRef = Arc::new(Int32Array::from(vec![10, 1, 20, 1]));
489508
let prob_side_keys = [probe_array];
@@ -513,7 +532,12 @@ mod tests {
513532
let min_val = -5_i128;
514533
let max_val = 10_i128;
515534

516-
let array_map = ArrayMap::try_new(&build_array, min_val as u64, max_val as u64)?;
535+
let array_map = ArrayMap::try_new(
536+
&[&build_array],
537+
build_array.len(),
538+
min_val as u64,
539+
max_val as u64,
540+
)?;
517541

518542
// Probe array
519543
let probe_array: ArrayRef = Arc::new(Int64Array::from(vec![0, -5, 10, -1]));

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 21 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ use crate::{
6565
metrics::{ExecutionPlanMetricsSet, MetricsSet},
6666
};
6767

68-
use arrow::array::{Array, ArrayRef, BooleanBufferBuilder, UInt64Array, new_null_array};
68+
use arrow::array::{ArrayRef, BooleanBufferBuilder, UInt64Array};
6969
use arrow::datatypes::SchemaRef;
7070
use arrow::record_batch::RecordBatch;
7171
use arrow::util::bit_util;
@@ -112,7 +112,7 @@ fn try_create_array_map(
112112
perfect_hash_join_small_build_threshold: usize,
113113
perfect_hash_join_min_key_density: f64,
114114
null_equality: NullEquality,
115-
) -> Result<Option<(ArrayMap, Vec<ArrayRef>)>> {
115+
) -> Result<Option<ArrayMap>> {
116116
if on_left.len() != 1 {
117117
return Ok(None);
118118
}
@@ -177,32 +177,19 @@ fn try_create_array_map(
177177
let mem_size = ArrayMap::estimate_memory_size(min_val, max_val, num_row);
178178
reservation.try_grow(mem_size)?;
179179

180-
// Evaluate key expressions per-batch and concatenate for ArrayMap construction
181-
let per_batch_keys: Vec<Vec<ArrayRef>> = batches
180+
// Evaluate key expressions per-batch (no concatenation needed)
181+
let per_batch_keys: Vec<ArrayRef> = batches
182182
.iter()
183-
.map(|batch| evaluate_expressions_to_arrays(on_left, batch))
183+
.map(|batch| {
184+
let arrays = evaluate_expressions_to_arrays(on_left, batch)?;
185+
Ok(arrays.into_iter().next().unwrap())
186+
})
184187
.collect::<Result<Vec<_>>>()?;
185-
let left_values: Vec<ArrayRef> = if per_batch_keys.is_empty() || on_left.is_empty() {
186-
on_left
187-
.iter()
188-
.map(|_| new_null_array(&DataType::Null, 0))
189-
.collect()
190-
} else {
191-
let num_keys = on_left.len();
192-
(0..num_keys)
193-
.map(|key_idx| {
194-
let arrays: Vec<&dyn Array> = per_batch_keys
195-
.iter()
196-
.map(|keys| keys[key_idx].as_ref())
197-
.collect();
198-
Ok(arrow::compute::concat(&arrays)?)
199-
})
200-
.collect::<Result<Vec<_>>>()?
201-
};
188+
let key_refs: Vec<&ArrayRef> = per_batch_keys.iter().collect();
202189

203-
let array_map = ArrayMap::try_new(&left_values[0], min_val, max_val)?;
190+
let array_map = ArrayMap::try_new(&key_refs, num_row, min_val, max_val)?;
204191

205-
Ok(Some((array_map, left_values)))
192+
Ok(Some(array_map))
206193
}
207194

208195
/// Convert flat indices (used in the visited bitmap) to packed composite indices
@@ -2026,7 +2013,7 @@ async fn collect_left_input(
20262013
_ => None,
20272014
};
20282015

2029-
let (join_hash_map, should_reverse_batches) = if let Some((array_map, _left_value)) =
2016+
let (join_hash_map, should_reverse_batches) = if let Some(array_map) =
20302017
try_create_array_map(
20312018
&bounds,
20322019
&schema,
@@ -2152,38 +2139,24 @@ async fn collect_left_input(
21522139
let membership = if num_rows == 0 {
21532140
PushdownStrategy::Empty
21542141
} else {
2155-
// For membership testing, we need concatenated key columns.
2156-
// Concat from the per-batch values.
2157-
let concat_values: Vec<ArrayRef> =
2158-
if values_per_batch.is_empty() || values_per_batch[0].is_empty() {
2159-
vec![]
2160-
} else {
2161-
let num_keys = values_per_batch[0].len();
2162-
(0..num_keys)
2163-
.map(|key_idx| {
2164-
let arrays: Vec<&dyn Array> = values_per_batch
2165-
.iter()
2166-
.map(|keys| keys[key_idx].as_ref())
2167-
.collect();
2168-
Ok(arrow::compute::concat(&arrays)?)
2169-
})
2170-
.collect::<Result<Vec<_>>>()?
2171-
};
2172-
2173-
let estimated_size = concat_values
2142+
// Estimate total size from per-batch values (avoid concatenation for size check)
2143+
let estimated_size: usize = values_per_batch
21742144
.iter()
2145+
.flat_map(|keys| keys.iter())
21752146
.map(|arr| arr.get_array_memory_size())
2176-
.sum::<usize>();
2177-
if concat_values.is_empty()
2178-
|| concat_values[0].is_empty()
2147+
.sum();
2148+
if values_per_batch.is_empty()
2149+
|| values_per_batch[0].is_empty()
21792150
|| estimated_size > config.optimizer.hash_join_inlist_pushdown_max_size
21802151
|| map.num_of_distinct_key()
21812152
> config
21822153
.optimizer
21832154
.hash_join_inlist_pushdown_max_distinct_values
21842155
{
21852156
PushdownStrategy::Map(Arc::clone(&map))
2186-
} else if let Some(in_list_values) = build_struct_inlist_values(&concat_values)? {
2157+
} else if let Some(in_list_values) =
2158+
build_struct_inlist_values(&values_per_batch)?
2159+
{
21872160
PushdownStrategy::InList(in_list_values)
21882161
} else {
21892162
PushdownStrategy::Map(Arc::clone(&map))

0 commit comments

Comments
 (0)