Skip to content

Commit bacee66

Browse files
committed
add count distinct group benchmarks
1 parent 16e578d commit bacee66

1 file changed

Lines changed: 113 additions & 2 deletions

File tree

datafusion/functions-aggregate/benches/count_distinct.rs

Lines changed: 113 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use arrow::array::{
2323
use arrow::datatypes::{DataType, Field, Schema};
2424
use criterion::{Criterion, criterion_group, criterion_main};
2525
use datafusion_expr::function::AccumulatorArgs;
26-
use datafusion_expr::{Accumulator, AggregateUDFImpl};
26+
use datafusion_expr::{Accumulator, AggregateUDFImpl, EmitTo};
2727
use datafusion_functions_aggregate::count::Count;
2828
use datafusion_physical_expr::expressions::col;
2929
use rand::rngs::StdRng;
@@ -87,6 +87,30 @@ fn create_i16_array(n_distinct: usize) -> Int16Array {
8787
.collect()
8888
}
8989

90+
fn prepare_args(data_type: DataType) -> (Arc<Schema>, AccumulatorArgs<'static>) {
91+
let schema = Arc::new(Schema::new(vec![Field::new("f", data_type, true)]));
92+
let schema_leaked: &'static Schema = Box::leak(Box::new((*schema).clone()));
93+
let expr = col("f", schema_leaked).unwrap();
94+
let expr_leaked: &'static _ = Box::leak(Box::new(expr));
95+
let return_field: Arc<Field> = Field::new("f", DataType::Int64, true).into();
96+
let return_field_leaked: &'static _ = Box::leak(Box::new(return_field.clone()));
97+
let expr_field = expr_leaked.return_field(schema_leaked).unwrap();
98+
let expr_field_leaked: &'static _ = Box::leak(Box::new(expr_field));
99+
100+
let accumulator_args = AccumulatorArgs {
101+
return_field: return_field_leaked.clone(),
102+
schema: schema_leaked,
103+
expr_fields: std::slice::from_ref(expr_field_leaked),
104+
ignore_nulls: false,
105+
order_bys: &[],
106+
is_reversed: false,
107+
name: "count(distinct f)",
108+
is_distinct: true,
109+
exprs: std::slice::from_ref(expr_leaked),
110+
};
111+
(schema, accumulator_args)
112+
}
113+
90114
fn count_distinct_benchmark(c: &mut Criterion) {
91115
for pct in [80, 99] {
92116
let n_distinct = BATCH_SIZE * pct / 100;
@@ -150,5 +174,92 @@ fn count_distinct_benchmark(c: &mut Criterion) {
150174
});
151175
}
152176

153-
criterion_group!(benches, count_distinct_benchmark);
177+
/// Create group indices with uniform distribution
178+
fn create_uniform_groups(num_groups: usize) -> Vec<usize> {
179+
let mut rng = StdRng::seed_from_u64(42);
180+
(0..BATCH_SIZE)
181+
.map(|_| rng.random_range(0..num_groups))
182+
.collect()
183+
}
184+
185+
/// Create group indices with skewed distribution (80% in 20% of groups)
186+
fn create_skewed_groups(num_groups: usize) -> Vec<usize> {
187+
let mut rng = StdRng::seed_from_u64(42);
188+
let hot_groups = (num_groups / 5).max(1);
189+
(0..BATCH_SIZE)
190+
.map(|_| {
191+
if rng.random_range(0..100) < 80 {
192+
rng.random_range(0..hot_groups)
193+
} else {
194+
rng.random_range(0..num_groups)
195+
}
196+
})
197+
.collect()
198+
}
199+
200+
fn count_distinct_groups_benchmark(c: &mut Criterion) {
201+
let count_fn = Count::new();
202+
203+
// bench different scenarios
204+
let scenarios = [
205+
// (name, num_groups, distinct_pct, group_fn)
206+
("sparse_uniform", 10, 80, "uniform"),
207+
("moderate_uniform", 100, 80, "uniform"),
208+
("dense_uniform", 1000, 80, "uniform"),
209+
("sparse_skewed", 10, 80, "skewed"),
210+
("dense_skewed", 1000, 80, "skewed"),
211+
("sparse_high_cardinality", 10, 99, "uniform"),
212+
("dense_low_cardinality", 1000, 20, "uniform"),
213+
];
214+
215+
for (name, num_groups, distinct_pct, group_type) in scenarios {
216+
let n_distinct = BATCH_SIZE * distinct_pct / 100;
217+
let values = Arc::new(create_i64_array(n_distinct)) as ArrayRef;
218+
let group_indices = if group_type == "uniform" {
219+
create_uniform_groups(num_groups)
220+
} else {
221+
create_skewed_groups(num_groups)
222+
};
223+
224+
let (_schema, args) = prepare_args(DataType::Int64);
225+
226+
if count_fn.groups_accumulator_supported(args.clone()) {
227+
c.bench_function(&format!("count_distinct_groups {name}"), |b| {
228+
b.iter(|| {
229+
let (_schema, args) = prepare_args(DataType::Int64);
230+
let mut acc = count_fn.create_groups_accumulator(args).unwrap();
231+
acc.update_batch(&[values.clone()], &group_indices, None, num_groups)
232+
.unwrap();
233+
acc.evaluate(EmitTo::All).unwrap()
234+
})
235+
});
236+
} else {
237+
c.bench_function(&format!("count_distinct_groups {name}"), |b| {
238+
b.iter(|| {
239+
let mut accumulators: Vec<_> = (0..num_groups)
240+
.map(|_| prepare_accumulator(DataType::Int64))
241+
.collect();
242+
243+
let arr = values.as_any().downcast_ref::<Int64Array>().unwrap();
244+
for (idx, group_idx) in group_indices.iter().enumerate() {
245+
if let Some(val) = arr.value(idx).into() {
246+
let single_val =
247+
Arc::new(Int64Array::from(vec![Some(val)])) as ArrayRef;
248+
accumulators[*group_idx]
249+
.update_batch(std::slice::from_ref(&single_val))
250+
.unwrap();
251+
}
252+
}
253+
254+
let _results: Vec<_> = accumulators
255+
.iter_mut()
256+
.map(|acc| acc.evaluate().unwrap())
257+
.collect();
258+
})
259+
});
260+
}
261+
}
262+
}
263+
264+
criterion_group!(benches, count_distinct_benchmark, count_distinct_groups_benchmark);
154265
criterion_main!(benches);

0 commit comments

Comments
 (0)