Skip to content

Commit 837b382

Browse files
committed
updated normalization fn to use a hashmap cache instead of doing a n^2 loop
1 parent f7e84cb commit 837b382

1 file changed

Lines changed: 57 additions & 1 deletion

File tree

  • datafusion/physical-plan/src/aggregates/group_values/single_group_by

datafusion/physical-plan/src/aggregates/group_values/single_group_by/dictionary.rs

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,8 @@ impl<K: ArrowDictionaryKeyType + Send> GroupValuesDictionary<K> {
203203
))),
204204
}
205205
}
206-
fn normalize_dict_array(
206+
#[inline]
207+
fn normalize_dict_linear(
207208
values: &ArrayRef,
208209
key_array: &PrimitiveArray<K>,
209210
) -> (ArrayRef, Vec<Option<usize>>) {
@@ -249,6 +250,61 @@ impl<K: ArrowDictionaryKeyType + Send> GroupValuesDictionary<K> {
249250

250251
(new_values, new_keys)
251252
}
253+
#[inline]
254+
pub fn normalize_dict_hash(
255+
values: &ArrayRef,
256+
key_array: &PrimitiveArray<K>,
257+
) -> (ArrayRef, Vec<Option<usize>>) {
258+
let mut key_to_canonical: Vec<Option<usize>> = vec![None; values.len()];
259+
let mut bytes_to_canonical: HashMap<Vec<u8>, usize> = HashMap::new();
260+
let mut canonical_indices: Vec<usize> = Vec::new();
261+
262+
let new_keys: Vec<Option<usize>> = (0..key_array.len())
263+
.map(|i| {
264+
if key_array.is_null(i) {
265+
return None;
266+
}
267+
let old_key = key_array.value(i).to_usize().unwrap();
268+
if let Some(canonical) = key_to_canonical[old_key] {
269+
return Some(canonical);
270+
}
271+
let raw = Self::get_raw_bytes(values, old_key).to_vec();
272+
let canonical = match bytes_to_canonical.get(&raw) {
273+
Some(&idx) => idx,
274+
None => {
275+
let idx = canonical_indices.len();
276+
canonical_indices.push(old_key);
277+
bytes_to_canonical.insert(raw, idx);
278+
idx
279+
}
280+
};
281+
key_to_canonical[old_key] = Some(canonical);
282+
Some(canonical)
283+
})
284+
.collect();
285+
286+
let indices = UInt64Array::from(
287+
canonical_indices
288+
.iter()
289+
.map(|&i| i as u64)
290+
.collect::<Vec<_>>(),
291+
);
292+
let new_values = arrow::compute::take(values.as_ref(), &indices, None).unwrap();
293+
(new_values, new_keys)
294+
}
295+
#[inline]
296+
fn normalize_dict_array(
297+
values: &ArrayRef,
298+
key_array: &PrimitiveArray<K>,
299+
) -> (ArrayRef, Vec<Option<usize>>) {
300+
if values.len() < 20 {
301+
//benchmarks show that for small arrays a nested loops works better due to cache locality and lower overhead,
302+
Self::normalize_dict_linear(values, key_array)
303+
} else {
304+
//for larger arrays the hash-based approach is much faster due to avoiding redundant comparisons
305+
Self::normalize_dict_hash(values, key_array)
306+
}
307+
}
252308
}
253309

254310
impl<K: ArrowDictionaryKeyType + Send> GroupValues for GroupValuesDictionary<K> {

0 commit comments

Comments
 (0)