Skip to content

Commit 333e4ac

Browse files
committed
groups_acc_more_robust_checks
1 parent 83f4e7f commit 333e4ac

1 file changed

Lines changed: 90 additions & 59 deletions

File tree

datafusion/functions-aggregate/benches/count_distinct.rs

Lines changed: 90 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use std::sync::Arc;
1919

2020
use arrow::array::{
21-
ArrayRef, Int8Array, Int16Array, Int64Array, UInt8Array, UInt16Array,
21+
Array, ArrayRef, Int8Array, Int16Array, Int64Array, UInt8Array, UInt16Array,
2222
};
2323
use arrow::datatypes::{DataType, Field, Schema};
2424
use criterion::{Criterion, criterion_group, criterion_main};
@@ -87,13 +87,6 @@ fn create_i16_array(n_distinct: usize) -> Int16Array {
8787
.collect()
8888
}
8989

90-
fn create_group_indices(num_groups: usize) -> Vec<usize> {
91-
let mut rng = StdRng::seed_from_u64(42);
92-
(0..BATCH_SIZE)
93-
.map(|_| rng.random_range(0..num_groups))
94-
.collect()
95-
}
96-
9790
fn prepare_args(data_type: DataType) -> (Arc<Schema>, AccumulatorArgs<'static>) {
9891
let schema = Arc::new(Schema::new(vec![Field::new("f", data_type, true)]));
9992
let schema_leaked: &'static Schema = Box::leak(Box::new((*schema).clone()));
@@ -181,62 +174,100 @@ fn count_distinct_benchmark(c: &mut Criterion) {
181174
});
182175
}
183176

177+
/// Create group indices with uniform distribution
178+
fn create_uniform_groups(num_groups: usize) -> Vec<usize> {
179+
let mut rng = StdRng::seed_from_u64(42);
180+
(0..BATCH_SIZE)
181+
.map(|_| rng.random_range(0..num_groups))
182+
.collect()
183+
}
184+
185+
/// Create group indices with skewed distribution (80% in 20% of groups)
186+
fn create_skewed_groups(num_groups: usize) -> Vec<usize> {
187+
let mut rng = StdRng::seed_from_u64(42);
188+
let hot_groups = (num_groups / 5).max(1);
189+
(0..BATCH_SIZE)
190+
.map(|_| {
191+
if rng.random_range(0..100) < 80 {
192+
rng.random_range(0..hot_groups)
193+
} else {
194+
rng.random_range(0..num_groups)
195+
}
196+
})
197+
.collect()
198+
}
199+
184200
fn count_distinct_groups_benchmark(c: &mut Criterion) {
185201
let count_fn = Count::new();
186202

187-
for num_groups in [10, 100, 1000] {
188-
let n_distinct = BATCH_SIZE * 80 / 100;
189-
let values = Arc::new(create_i64_array(n_distinct)) as ArrayRef;
190-
let group_indices = create_group_indices(num_groups);
191-
192-
let (_schema, args) = prepare_args(DataType::Int64);
193-
194-
if count_fn.groups_accumulator_supported(args.clone()) {
195-
c.bench_function(
196-
&format!("count_distinct_groups i64 {num_groups} groups"),
197-
|b| {
198-
b.iter(|| {
199-
let (_schema, args) = prepare_args(DataType::Int64);
200-
let mut acc = count_fn.create_groups_accumulator(args).unwrap();
201-
acc.update_batch(
202-
std::slice::from_ref(&values),
203-
&group_indices,
204-
None,
205-
num_groups,
206-
)
207-
.unwrap();
208-
acc.evaluate(EmitTo::All).unwrap()
209-
})
210-
},
211-
);
212-
} else {
213-
c.bench_function(
214-
&format!("count_distinct_groups i64 {num_groups} groups"),
215-
|b| {
216-
b.iter(|| {
217-
let mut accumulators: Vec<_> = (0..num_groups)
218-
.map(|_| prepare_accumulator(DataType::Int64))
219-
.collect();
220-
221-
let arr = values.as_any().downcast_ref::<Int64Array>().unwrap();
222-
for (idx, group_idx) in group_indices.iter().enumerate() {
223-
if let Some(val) = arr.value(idx).into() {
224-
let single_val =
225-
Arc::new(Int64Array::from(vec![Some(val)]))
226-
as ArrayRef;
227-
accumulators[*group_idx]
228-
.update_batch(std::slice::from_ref(&single_val))
229-
.unwrap();
230-
}
203+
let group_counts = [100, 1000, 10000];
204+
let cardinalities = [("low", 20), ("mid", 80), ("high", 99)];
205+
let distributions = ["uniform", "skewed"];
206+
207+
for num_groups in group_counts {
208+
for (card_name, distinct_pct) in cardinalities {
209+
for dist in distributions {
210+
let name = format!("g{num_groups}_{card_name}_{dist}");
211+
let n_distinct = BATCH_SIZE * distinct_pct / 100;
212+
let values = Arc::new(create_i64_array(n_distinct)) as ArrayRef;
213+
let group_indices = if dist == "uniform" {
214+
create_uniform_groups(num_groups)
215+
} else {
216+
create_skewed_groups(num_groups)
217+
};
218+
219+
let (_schema, args) = prepare_args(DataType::Int64);
220+
221+
if count_fn.groups_accumulator_supported(args.clone()) {
222+
c.bench_function(&format!("count_distinct_groups {name}"), |b| {
223+
b.iter(|| {
224+
let mut acc =
225+
count_fn.create_groups_accumulator(args.clone()).unwrap();
226+
acc.update_batch(
227+
std::slice::from_ref(&values),
228+
&group_indices,
229+
None,
230+
num_groups,
231+
)
232+
.unwrap();
233+
acc.evaluate(EmitTo::All).unwrap()
234+
})
235+
});
236+
} else {
237+
let arr = values.as_any().downcast_ref::<Int64Array>().unwrap();
238+
let mut group_rows: Vec<Vec<i64>> = vec![Vec::new(); num_groups];
239+
for (idx, &group_idx) in group_indices.iter().enumerate() {
240+
if arr.is_valid(idx) {
241+
group_rows[group_idx].push(arr.value(idx));
231242
}
243+
}
244+
let group_arrays: Vec<ArrayRef> = group_rows
245+
.iter()
246+
.map(|rows| Arc::new(Int64Array::from(rows.clone())) as ArrayRef)
247+
.collect();
248+
249+
c.bench_function(&format!("count_distinct_groups {name}"), |b| {
250+
b.iter(|| {
251+
let mut accumulators: Vec<_> = (0..num_groups)
252+
.map(|_| prepare_accumulator(DataType::Int64))
253+
.collect();
254+
255+
for (group_idx, batch) in group_arrays.iter().enumerate() {
256+
if !batch.is_empty() {
257+
accumulators[group_idx]
258+
.update_batch(std::slice::from_ref(batch))
259+
.unwrap();
260+
}
261+
}
232262

233-
let _results: Vec<_> = accumulators
234-
.iter_mut()
235-
.map(|acc| acc.evaluate().unwrap())
236-
.collect();
237-
})
238-
},
239-
);
263+
let _results: Vec<_> = accumulators
264+
.iter_mut()
265+
.map(|acc| acc.evaluate().unwrap())
266+
.collect();
267+
})
268+
});
269+
}
270+
}
240271
}
241272
}
242273
}

0 commit comments

Comments
 (0)