Skip to content

Commit f982d8d

Browse files
committed
add count distinct group benchmarks
add count distinct group benchmarks add count distinct group benchmarks count group benchmark check count group benchmark check init implement_group_accumulators_count_distinct implement_group_accumulators_count_distinct implement_group_accumulators_count_distinct implement_group_accumulators_count_distinct implement_group_accumulators_count_distinct_use_hashtable implement_group_accumulators_count_distinct_use_hashtable add group benches Use same benchmark names for comparison count group benchmark check count group benchmark check
1 parent a9cdec8 commit f982d8d

4 files changed

Lines changed: 295 additions & 92 deletions

File tree

datafusion/functions-aggregate-common/src/aggregate/count_distinct.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717

1818
mod bytes;
1919
mod dict;
20+
mod groups;
2021
mod native;
2122

2223
pub use bytes::BytesDistinctCountAccumulator;
2324
pub use bytes::BytesViewDistinctCountAccumulator;
2425
pub use dict::DictionaryCountAccumulator;
26+
pub use groups::PrimitiveDistinctCountGroupsAccumulator;
2527
pub use native::FloatDistinctCountAccumulator;
2628
pub use native::PrimitiveDistinctCountAccumulator;
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::array::{
19+
ArrayRef, AsArray, BooleanArray, Int64Array, ListArray, PrimitiveArray,
20+
};
21+
use arrow::buffer::OffsetBuffer;
22+
use arrow::datatypes::{ArrowPrimitiveType, Field};
23+
use datafusion_common::HashSet;
24+
use datafusion_common::hash_utils::RandomState;
25+
use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};
26+
use std::hash::Hash;
27+
use std::mem::size_of;
28+
use std::sync::Arc;
29+
30+
use crate::aggregate::groups_accumulator::accumulate::accumulate;
31+
32+
pub struct PrimitiveDistinctCountGroupsAccumulator<T: ArrowPrimitiveType>
33+
where
34+
T::Native: Eq + Hash,
35+
{
36+
seen: HashSet<(usize, T::Native), RandomState>,
37+
num_groups: usize,
38+
}
39+
40+
impl<T: ArrowPrimitiveType> PrimitiveDistinctCountGroupsAccumulator<T>
41+
where
42+
T::Native: Eq + Hash,
43+
{
44+
pub fn new() -> Self {
45+
Self {
46+
seen: HashSet::default(),
47+
num_groups: 0,
48+
}
49+
}
50+
}
51+
52+
impl<T: ArrowPrimitiveType> Default for PrimitiveDistinctCountGroupsAccumulator<T>
53+
where
54+
T::Native: Eq + Hash,
55+
{
56+
fn default() -> Self {
57+
Self::new()
58+
}
59+
}
60+
61+
impl<T: ArrowPrimitiveType + Send + std::fmt::Debug> GroupsAccumulator
62+
for PrimitiveDistinctCountGroupsAccumulator<T>
63+
where
64+
T::Native: Eq + Hash,
65+
{
66+
fn update_batch(
67+
&mut self,
68+
values: &[ArrayRef],
69+
group_indices: &[usize],
70+
opt_filter: Option<&BooleanArray>,
71+
total_num_groups: usize,
72+
) -> datafusion_common::Result<()> {
73+
debug_assert_eq!(values.len(), 1);
74+
self.num_groups = self.num_groups.max(total_num_groups);
75+
let arr = values[0].as_primitive::<T>();
76+
accumulate(group_indices, arr, opt_filter, |group_idx, value| {
77+
self.seen.insert((group_idx, value));
78+
});
79+
Ok(())
80+
}
81+
82+
fn evaluate(&mut self, emit_to: EmitTo) -> datafusion_common::Result<ArrayRef> {
83+
let num_emitted = match emit_to {
84+
EmitTo::All => self.num_groups,
85+
EmitTo::First(n) => n,
86+
};
87+
88+
let mut counts = vec![0i64; num_emitted];
89+
90+
if matches!(emit_to, EmitTo::All) {
91+
for &(group_idx, _) in self.seen.iter() {
92+
counts[group_idx] += 1;
93+
}
94+
self.seen.clear();
95+
self.num_groups = 0;
96+
} else {
97+
let mut remaining = HashSet::default();
98+
for (group_idx, value) in self.seen.drain() {
99+
if group_idx < num_emitted {
100+
counts[group_idx] += 1;
101+
} else {
102+
remaining.insert((group_idx - num_emitted, value));
103+
}
104+
}
105+
self.seen = remaining;
106+
self.num_groups = self.num_groups.saturating_sub(num_emitted);
107+
}
108+
109+
Ok(Arc::new(Int64Array::from(counts)))
110+
}
111+
112+
fn state(&mut self, emit_to: EmitTo) -> datafusion_common::Result<Vec<ArrayRef>> {
113+
let num_emitted = match emit_to {
114+
EmitTo::All => self.num_groups,
115+
EmitTo::First(n) => n,
116+
};
117+
118+
let mut group_values: Vec<Vec<T::Native>> = vec![Vec::new(); num_emitted];
119+
120+
if matches!(emit_to, EmitTo::All) {
121+
for (group_idx, value) in self.seen.drain() {
122+
group_values[group_idx].push(value);
123+
}
124+
self.num_groups = 0;
125+
} else {
126+
let mut remaining = HashSet::default();
127+
for (group_idx, value) in self.seen.drain() {
128+
if group_idx < num_emitted {
129+
group_values[group_idx].push(value);
130+
} else {
131+
remaining.insert((group_idx - num_emitted, value));
132+
}
133+
}
134+
self.seen = remaining;
135+
self.num_groups = self.num_groups.saturating_sub(num_emitted);
136+
}
137+
138+
let mut offsets = vec![0i32];
139+
let mut all_values = Vec::new();
140+
for values in &group_values {
141+
all_values.extend(values.iter().copied());
142+
offsets.push(all_values.len() as i32);
143+
}
144+
145+
let values_array = Arc::new(PrimitiveArray::<T>::from_iter_values(all_values));
146+
let list_array = ListArray::new(
147+
Arc::new(Field::new_list_field(T::DATA_TYPE, true)),
148+
OffsetBuffer::new(offsets.into()),
149+
values_array,
150+
None,
151+
);
152+
153+
Ok(vec![Arc::new(list_array)])
154+
}
155+
156+
fn merge_batch(
157+
&mut self,
158+
values: &[ArrayRef],
159+
group_indices: &[usize],
160+
_opt_filter: Option<&BooleanArray>,
161+
total_num_groups: usize,
162+
) -> datafusion_common::Result<()> {
163+
debug_assert_eq!(values.len(), 1);
164+
self.num_groups = self.num_groups.max(total_num_groups);
165+
let list_array = values[0].as_list::<i32>();
166+
167+
for (row_idx, group_idx) in group_indices.iter().enumerate() {
168+
let inner = list_array.value(row_idx);
169+
let inner_arr = inner.as_primitive::<T>();
170+
for value in inner_arr.values().iter() {
171+
self.seen.insert((*group_idx, *value));
172+
}
173+
}
174+
175+
Ok(())
176+
}
177+
178+
fn size(&self) -> usize {
179+
size_of::<Self>()
180+
+ self.seen.capacity() * (size_of::<(usize, T::Native)>() + size_of::<u64>())
181+
}
182+
}

datafusion/functions-aggregate/benches/count_distinct.rs

Lines changed: 53 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use std::sync::Arc;
1919

2020
use arrow::array::{
21-
Array, ArrayRef, Int8Array, Int16Array, Int64Array, UInt8Array, UInt16Array,
21+
ArrayRef, Int8Array, Int16Array, Int64Array, UInt8Array, UInt16Array,
2222
};
2323
use arrow::datatypes::{DataType, Field, Schema};
2424
use criterion::{Criterion, criterion_group, criterion_main};
@@ -87,6 +87,13 @@ fn create_i16_array(n_distinct: usize) -> Int16Array {
8787
.collect()
8888
}
8989

90+
fn create_group_indices(num_groups: usize) -> Vec<usize> {
91+
let mut rng = StdRng::seed_from_u64(42);
92+
(0..BATCH_SIZE)
93+
.map(|_| rng.random_range(0..num_groups))
94+
.collect()
95+
}
96+
9097
fn prepare_args(data_type: DataType) -> (Arc<Schema>, AccumulatorArgs<'static>) {
9198
let schema = Arc::new(Schema::new(vec![Field::new("f", data_type, true)]));
9299
let schema_leaked: &'static Schema = Box::leak(Box::new((*schema).clone()));
@@ -174,103 +181,62 @@ fn count_distinct_benchmark(c: &mut Criterion) {
174181
});
175182
}
176183

177-
/// Create group indices with uniform distribution
178-
fn create_uniform_groups(num_groups: usize) -> Vec<usize> {
179-
let mut rng = StdRng::seed_from_u64(42);
180-
(0..BATCH_SIZE)
181-
.map(|_| rng.random_range(0..num_groups))
182-
.collect()
183-
}
184-
185-
/// Create group indices with skewed distribution (80% in 20% of groups)
186-
fn create_skewed_groups(num_groups: usize) -> Vec<usize> {
187-
let mut rng = StdRng::seed_from_u64(42);
188-
let hot_groups = (num_groups / 5).max(1);
189-
(0..BATCH_SIZE)
190-
.map(|_| {
191-
if rng.random_range(0..100) < 80 {
192-
rng.random_range(0..hot_groups)
193-
} else {
194-
rng.random_range(0..num_groups)
195-
}
196-
})
197-
.collect()
198-
}
199-
200184
fn count_distinct_groups_benchmark(c: &mut Criterion) {
201185
let count_fn = Count::new();
202186

203-
// bench different scenarios
204-
let scenarios = [
205-
// (name, num_groups, distinct_pct, group_fn)
206-
("sparse_uniform", 10, 80, "uniform"),
207-
("moderate_uniform", 100, 80, "uniform"),
208-
("dense_uniform", 1000, 80, "uniform"),
209-
("sparse_skewed", 10, 80, "skewed"),
210-
("dense_skewed", 1000, 80, "skewed"),
211-
("sparse_high_cardinality", 10, 99, "uniform"),
212-
("dense_low_cardinality", 1000, 20, "uniform"),
213-
];
214-
215-
for (name, num_groups, distinct_pct, group_type) in scenarios {
216-
let n_distinct = BATCH_SIZE * distinct_pct / 100;
187+
for num_groups in [10, 100, 1000] {
188+
let n_distinct = BATCH_SIZE * 80 / 100;
217189
let values = Arc::new(create_i64_array(n_distinct)) as ArrayRef;
218-
let group_indices = if group_type == "uniform" {
219-
create_uniform_groups(num_groups)
220-
} else {
221-
create_skewed_groups(num_groups)
222-
};
190+
let group_indices = create_group_indices(num_groups);
223191

224192
let (_schema, args) = prepare_args(DataType::Int64);
225193

226194
if count_fn.groups_accumulator_supported(args.clone()) {
227-
c.bench_function(&format!("count_distinct_groups {name}"), |b| {
228-
b.iter(|| {
229-
let mut acc =
230-
count_fn.create_groups_accumulator(args.clone()).unwrap();
231-
acc.update_batch(
232-
std::slice::from_ref(&values),
233-
&group_indices,
234-
None,
235-
num_groups,
236-
)
237-
.unwrap();
238-
acc.evaluate(EmitTo::All).unwrap()
239-
})
240-
});
195+
c.bench_function(
196+
&format!("count_distinct_groups i64 {num_groups} groups"),
197+
|b| {
198+
b.iter(|| {
199+
let (_schema, args) = prepare_args(DataType::Int64);
200+
let mut acc = count_fn.create_groups_accumulator(args).unwrap();
201+
acc.update_batch(
202+
std::slice::from_ref(&values),
203+
&group_indices,
204+
None,
205+
num_groups,
206+
)
207+
.unwrap();
208+
acc.evaluate(EmitTo::All).unwrap()
209+
})
210+
},
211+
);
241212
} else {
242-
let arr = values.as_any().downcast_ref::<Int64Array>().unwrap();
243-
let mut group_rows: Vec<Vec<i64>> = vec![Vec::new(); num_groups];
244-
for (idx, &group_idx) in group_indices.iter().enumerate() {
245-
if arr.is_valid(idx) {
246-
group_rows[group_idx].push(arr.value(idx));
247-
}
248-
}
249-
let group_arrays: Vec<ArrayRef> = group_rows
250-
.iter()
251-
.map(|rows| Arc::new(Int64Array::from(rows.clone())) as ArrayRef)
252-
.collect();
253-
254-
c.bench_function(&format!("count_distinct_groups {name}"), |b| {
255-
b.iter(|| {
256-
let mut accumulators: Vec<_> = (0..num_groups)
257-
.map(|_| prepare_accumulator(DataType::Int64))
258-
.collect();
259-
260-
for (group_idx, batch) in group_arrays.iter().enumerate() {
261-
if !batch.is_empty() {
262-
accumulators[group_idx]
263-
.update_batch(std::slice::from_ref(batch))
264-
.unwrap();
213+
c.bench_function(
214+
&format!("count_distinct_groups i64 {num_groups} groups"),
215+
|b| {
216+
b.iter(|| {
217+
let mut accumulators: Vec<_> = (0..num_groups)
218+
.map(|_| prepare_accumulator(DataType::Int64))
219+
.collect();
220+
221+
let arr = values.as_any().downcast_ref::<Int64Array>().unwrap();
222+
for (idx, group_idx) in group_indices.iter().enumerate() {
223+
if let Some(val) = arr.value(idx).into() {
224+
let single_val =
225+
Arc::new(Int64Array::from(vec![Some(val)]))
226+
as ArrayRef;
227+
accumulators[*group_idx]
228+
.update_batch(std::slice::from_ref(&single_val))
229+
.unwrap();
230+
}
265231
}
266-
}
267232

268-
let _results: Vec<_> = accumulators
269-
.iter_mut()
270-
.map(|acc| acc.evaluate().unwrap())
271-
.collect();
272-
})
273-
});
233+
let _results: Vec<_> = accumulators
234+
.iter_mut()
235+
.map(|acc| acc.evaluate().unwrap())
236+
.collect();
237+
})
238+
},
239+
);
274240
}
275241
}
276242
}

0 commit comments

Comments
 (0)