Skip to content

Commit 5ccd3ff

Browse files
committed
add count distinct group benchmarks
1 parent 90a8117 commit 5ccd3ff

1 file changed

Lines changed: 53 additions & 247 deletions

File tree

datafusion/functions-aggregate/benches/count_distinct.rs

Lines changed: 53 additions & 247 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818
use std::sync::Arc;
1919

2020
use arrow::array::{
21-
Array, ArrayRef, Int8Array, Int16Array, Int32Array, Int64Array, UInt8Array,
22-
UInt16Array, UInt32Array,
21+
ArrayRef, Int8Array, Int16Array, Int64Array, UInt8Array, UInt16Array,
2322
};
2423
use arrow::datatypes::{DataType, Field, Schema};
2524
use criterion::{Criterion, criterion_group, criterion_main};
@@ -88,20 +87,6 @@ fn create_i16_array(n_distinct: usize) -> Int16Array {
8887
.collect()
8988
}
9089

91-
fn create_u32_array(n_distinct: usize) -> UInt32Array {
92-
let mut rng = StdRng::seed_from_u64(42);
93-
(0..BATCH_SIZE)
94-
.map(|_| Some(rng.random_range(0..n_distinct as u32)))
95-
.collect()
96-
}
97-
98-
fn create_i32_array(n_distinct: usize) -> Int32Array {
99-
let mut rng = StdRng::seed_from_u64(42);
100-
(0..BATCH_SIZE)
101-
.map(|_| Some(rng.random_range(0..n_distinct as i32)))
102-
.collect()
103-
}
104-
10590
fn prepare_args(data_type: DataType) -> (Arc<Schema>, AccumulatorArgs<'static>) {
10691
let schema = Arc::new(Schema::new(vec![Field::new("f", data_type, true)]));
10792
let schema_leaked: &'static Schema = Box::leak(Box::new((*schema).clone()));
@@ -187,33 +172,6 @@ fn count_distinct_benchmark(c: &mut Criterion) {
187172
.unwrap()
188173
})
189174
});
190-
191-
// 32-bit integer types
192-
for pct in [80, 99] {
193-
let n_distinct = BATCH_SIZE * pct / 100;
194-
195-
// UInt32
196-
let values = Arc::new(create_u32_array(n_distinct)) as ArrayRef;
197-
c.bench_function(&format!("count_distinct u32 {pct}% distinct"), |b| {
198-
b.iter(|| {
199-
let mut accumulator = prepare_accumulator(DataType::UInt32);
200-
accumulator
201-
.update_batch(std::slice::from_ref(&values))
202-
.unwrap()
203-
})
204-
});
205-
206-
// Int32
207-
let values = Arc::new(create_i32_array(n_distinct)) as ArrayRef;
208-
c.bench_function(&format!("count_distinct i32 {pct}% distinct"), |b| {
209-
b.iter(|| {
210-
let mut accumulator = prepare_accumulator(DataType::Int32);
211-
accumulator
212-
.update_batch(std::slice::from_ref(&values))
213-
.unwrap()
214-
})
215-
});
216-
}
217175
}
218176

219177
/// Create group indices with uniform distribution
@@ -242,218 +200,66 @@ fn create_skewed_groups(num_groups: usize) -> Vec<usize> {
242200
fn count_distinct_groups_benchmark(c: &mut Criterion) {
243201
let count_fn = Count::new();
244202

245-
let group_counts = [100, 1000, 10000];
246-
let cardinalities = [("low", 20), ("mid", 80), ("high", 99)];
247-
let distributions = ["uniform", "skewed"];
248-
249-
// i64 benchmarks
250-
for num_groups in group_counts {
251-
for (card_name, distinct_pct) in cardinalities {
252-
for dist in distributions {
253-
let name = format!("i64_g{num_groups}_{card_name}_{dist}");
254-
let n_distinct = BATCH_SIZE * distinct_pct / 100;
255-
let values = Arc::new(create_i64_array(n_distinct)) as ArrayRef;
256-
let group_indices = if dist == "uniform" {
257-
create_uniform_groups(num_groups)
258-
} else {
259-
create_skewed_groups(num_groups)
260-
};
261-
262-
let (_schema, args) = prepare_args(DataType::Int64);
203+
// bench different scenarios
204+
let scenarios = [
205+
// (name, num_groups, distinct_pct, group_fn)
206+
("sparse_uniform", 10, 80, "uniform"),
207+
("moderate_uniform", 100, 80, "uniform"),
208+
("dense_uniform", 1000, 80, "uniform"),
209+
("sparse_skewed", 10, 80, "skewed"),
210+
("dense_skewed", 1000, 80, "skewed"),
211+
("sparse_high_cardinality", 10, 99, "uniform"),
212+
("dense_low_cardinality", 1000, 20, "uniform"),
213+
];
214+
215+
for (name, num_groups, distinct_pct, group_type) in scenarios {
216+
let n_distinct = BATCH_SIZE * distinct_pct / 100;
217+
let values = Arc::new(create_i64_array(n_distinct)) as ArrayRef;
218+
let group_indices = if group_type == "uniform" {
219+
create_uniform_groups(num_groups)
220+
} else {
221+
create_skewed_groups(num_groups)
222+
};
223+
224+
let (_schema, args) = prepare_args(DataType::Int64);
225+
226+
if count_fn.groups_accumulator_supported(args.clone()) {
227+
c.bench_function(&format!("count_distinct_groups {name}"), |b| {
228+
b.iter(|| {
229+
let (_schema, args) = prepare_args(DataType::Int64);
230+
let mut acc = count_fn.create_groups_accumulator(args).unwrap();
231+
acc.update_batch(&[values.clone()], &group_indices, None, num_groups)
232+
.unwrap();
233+
acc.evaluate(EmitTo::All).unwrap()
234+
})
235+
});
236+
} else {
237+
c.bench_function(&format!("count_distinct_groups {name}"), |b| {
238+
b.iter(|| {
239+
let mut accumulators: Vec<_> = (0..num_groups)
240+
.map(|_| prepare_accumulator(DataType::Int64))
241+
.collect();
263242

264-
if count_fn.groups_accumulator_supported(args.clone()) {
265-
c.bench_function(&format!("count_distinct_groups {name}"), |b| {
266-
b.iter(|| {
267-
let mut acc =
268-
count_fn.create_groups_accumulator(args.clone()).unwrap();
269-
acc.update_batch(
270-
std::slice::from_ref(&values),
271-
&group_indices,
272-
None,
273-
num_groups,
274-
)
275-
.unwrap();
276-
acc.evaluate(EmitTo::All).unwrap()
277-
})
278-
});
279-
} else {
280243
let arr = values.as_any().downcast_ref::<Int64Array>().unwrap();
281-
let mut group_rows: Vec<Vec<i64>> = vec![Vec::new(); num_groups];
282-
for (idx, &group_idx) in group_indices.iter().enumerate() {
283-
if arr.is_valid(idx) {
284-
group_rows[group_idx].push(arr.value(idx));
244+
for (idx, group_idx) in group_indices.iter().enumerate() {
245+
if let Some(val) = arr.value(idx).into() {
246+
let single_val =
247+
Arc::new(Int64Array::from(vec![Some(val)])) as ArrayRef;
248+
accumulators[*group_idx]
249+
.update_batch(std::slice::from_ref(&single_val))
250+
.unwrap();
285251
}
286252
}
287-
let group_arrays: Vec<ArrayRef> = group_rows
288-
.iter()
289-
.map(|rows| Arc::new(Int64Array::from(rows.clone())) as ArrayRef)
290-
.collect();
291-
292-
c.bench_function(&format!("count_distinct_groups {name}"), |b| {
293-
b.iter(|| {
294-
let mut accumulators: Vec<_> = (0..num_groups)
295-
.map(|_| prepare_accumulator(DataType::Int64))
296-
.collect();
297253

298-
for (group_idx, batch) in group_arrays.iter().enumerate() {
299-
if !batch.is_empty() {
300-
accumulators[group_idx]
301-
.update_batch(std::slice::from_ref(batch))
302-
.unwrap();
303-
}
304-
}
305-
306-
let _results: Vec<_> = accumulators
307-
.iter_mut()
308-
.map(|acc| acc.evaluate().unwrap())
309-
.collect();
310-
})
311-
});
312-
}
313-
}
314-
}
315-
}
316-
317-
// i32 benchmarks
318-
for num_groups in group_counts {
319-
for (card_name, distinct_pct) in cardinalities {
320-
for dist in distributions {
321-
let name = format!("i32_g{num_groups}_{card_name}_{dist}");
322-
let n_distinct = BATCH_SIZE * distinct_pct / 100;
323-
let values = Arc::new(create_i32_array(n_distinct)) as ArrayRef;
324-
let group_indices = if dist == "uniform" {
325-
create_uniform_groups(num_groups)
326-
} else {
327-
create_skewed_groups(num_groups)
328-
};
329-
330-
let (_schema, args) = prepare_args(DataType::Int32);
331-
332-
if count_fn.groups_accumulator_supported(args.clone()) {
333-
c.bench_function(&format!("count_distinct_groups {name}"), |b| {
334-
b.iter(|| {
335-
let mut acc =
336-
count_fn.create_groups_accumulator(args.clone()).unwrap();
337-
acc.update_batch(
338-
std::slice::from_ref(&values),
339-
&group_indices,
340-
None,
341-
num_groups,
342-
)
343-
.unwrap();
344-
acc.evaluate(EmitTo::All).unwrap()
345-
})
346-
});
347-
} else {
348-
let arr = values.as_any().downcast_ref::<Int32Array>().unwrap();
349-
let mut group_rows: Vec<Vec<i32>> = vec![Vec::new(); num_groups];
350-
for (idx, &group_idx) in group_indices.iter().enumerate() {
351-
if arr.is_valid(idx) {
352-
group_rows[group_idx].push(arr.value(idx));
353-
}
354-
}
355-
let group_arrays: Vec<ArrayRef> = group_rows
356-
.iter()
357-
.map(|rows| Arc::new(Int32Array::from(rows.clone())) as ArrayRef)
254+
let _results: Vec<_> = accumulators
255+
.iter_mut()
256+
.map(|acc| acc.evaluate().unwrap())
358257
.collect();
359-
360-
c.bench_function(&format!("count_distinct_groups {name}"), |b| {
361-
b.iter(|| {
362-
let mut accumulators: Vec<_> = (0..num_groups)
363-
.map(|_| prepare_accumulator(DataType::Int32))
364-
.collect();
365-
366-
for (group_idx, batch) in group_arrays.iter().enumerate() {
367-
if !batch.is_empty() {
368-
accumulators[group_idx]
369-
.update_batch(std::slice::from_ref(batch))
370-
.unwrap();
371-
}
372-
}
373-
374-
let _results: Vec<_> = accumulators
375-
.iter_mut()
376-
.map(|acc| acc.evaluate().unwrap())
377-
.collect();
378-
})
379-
});
380-
}
381-
}
382-
}
383-
}
384-
385-
// u32 benchmarks
386-
for num_groups in group_counts {
387-
for (card_name, distinct_pct) in cardinalities {
388-
for dist in distributions {
389-
let name = format!("u32_g{num_groups}_{card_name}_{dist}");
390-
let n_distinct = BATCH_SIZE * distinct_pct / 100;
391-
let values = Arc::new(create_u32_array(n_distinct)) as ArrayRef;
392-
let group_indices = if dist == "uniform" {
393-
create_uniform_groups(num_groups)
394-
} else {
395-
create_skewed_groups(num_groups)
396-
};
397-
398-
let (_schema, args) = prepare_args(DataType::UInt32);
399-
400-
if count_fn.groups_accumulator_supported(args.clone()) {
401-
c.bench_function(&format!("count_distinct_groups {name}"), |b| {
402-
b.iter(|| {
403-
let mut acc =
404-
count_fn.create_groups_accumulator(args.clone()).unwrap();
405-
acc.update_batch(
406-
std::slice::from_ref(&values),
407-
&group_indices,
408-
None,
409-
num_groups,
410-
)
411-
.unwrap();
412-
acc.evaluate(EmitTo::All).unwrap()
413-
})
414-
});
415-
} else {
416-
let arr = values.as_any().downcast_ref::<UInt32Array>().unwrap();
417-
let mut group_rows: Vec<Vec<u32>> = vec![Vec::new(); num_groups];
418-
for (idx, &group_idx) in group_indices.iter().enumerate() {
419-
if arr.is_valid(idx) {
420-
group_rows[group_idx].push(arr.value(idx));
421-
}
422-
}
423-
let group_arrays: Vec<ArrayRef> = group_rows
424-
.iter()
425-
.map(|rows| Arc::new(UInt32Array::from(rows.clone())) as ArrayRef)
426-
.collect();
427-
428-
c.bench_function(&format!("count_distinct_groups {name}"), |b| {
429-
b.iter(|| {
430-
let mut accumulators: Vec<_> = (0..num_groups)
431-
.map(|_| prepare_accumulator(DataType::UInt32))
432-
.collect();
433-
434-
for (group_idx, batch) in group_arrays.iter().enumerate() {
435-
if !batch.is_empty() {
436-
accumulators[group_idx]
437-
.update_batch(std::slice::from_ref(batch))
438-
.unwrap();
439-
}
440-
}
441-
442-
let _results: Vec<_> = accumulators
443-
.iter_mut()
444-
.map(|acc| acc.evaluate().unwrap())
445-
.collect();
446-
})
447-
});
448-
}
449-
}
258+
})
259+
});
450260
}
451261
}
452262
}
453263

454-
criterion_group!(
455-
benches,
456-
count_distinct_benchmark,
457-
count_distinct_groups_benchmark
458-
);
264+
criterion_group!(benches, count_distinct_benchmark, count_distinct_groups_benchmark);
459265
criterion_main!(benches);

0 commit comments

Comments
 (0)