Skip to content

Commit 36f232c

Browse files
committed
add count distinct group benchmarks
1 parent 6477e5a commit 36f232c

4 files changed

Lines changed: 298 additions & 92 deletions

File tree

datafusion/functions-aggregate-common/src/aggregate/count_distinct.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,19 @@
1717

1818
mod bytes;
1919
mod dict;
20+
mod groups;
2021
mod native;
2122

2223
pub use bytes::BytesDistinctCountAccumulator;
2324
pub use bytes::BytesViewDistinctCountAccumulator;
2425
pub use dict::DictionaryCountAccumulator;
26+
<<<<<<< HEAD
2527
pub use native::Bitmap65536DistinctCountAccumulator;
2628
pub use native::Bitmap65536DistinctCountAccumulatorI16;
2729
pub use native::BoolArray256DistinctCountAccumulator;
2830
pub use native::BoolArray256DistinctCountAccumulatorI8;
31+
=======
32+
pub use groups::PrimitiveDistinctCountGroupsAccumulator;
33+
>>>>>>> f982d8de2 (add count distinct group benchmarks)
2934
pub use native::FloatDistinctCountAccumulator;
3035
pub use native::PrimitiveDistinctCountAccumulator;
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::array::{
19+
ArrayRef, AsArray, BooleanArray, Int64Array, ListArray, PrimitiveArray,
20+
};
21+
use arrow::buffer::OffsetBuffer;
22+
use arrow::datatypes::{ArrowPrimitiveType, Field};
23+
use datafusion_common::HashSet;
24+
use datafusion_common::hash_utils::RandomState;
25+
use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};
26+
use std::hash::Hash;
27+
use std::mem::size_of;
28+
use std::sync::Arc;
29+
30+
use crate::aggregate::groups_accumulator::accumulate::accumulate;
31+
32+
pub struct PrimitiveDistinctCountGroupsAccumulator<T: ArrowPrimitiveType>
33+
where
34+
T::Native: Eq + Hash,
35+
{
36+
seen: HashSet<(usize, T::Native), RandomState>,
37+
num_groups: usize,
38+
}
39+
40+
impl<T: ArrowPrimitiveType> PrimitiveDistinctCountGroupsAccumulator<T>
41+
where
42+
T::Native: Eq + Hash,
43+
{
44+
pub fn new() -> Self {
45+
Self {
46+
seen: HashSet::default(),
47+
num_groups: 0,
48+
}
49+
}
50+
}
51+
52+
impl<T: ArrowPrimitiveType> Default for PrimitiveDistinctCountGroupsAccumulator<T>
53+
where
54+
T::Native: Eq + Hash,
55+
{
56+
fn default() -> Self {
57+
Self::new()
58+
}
59+
}
60+
61+
impl<T: ArrowPrimitiveType + Send + std::fmt::Debug> GroupsAccumulator
62+
for PrimitiveDistinctCountGroupsAccumulator<T>
63+
where
64+
T::Native: Eq + Hash,
65+
{
66+
fn update_batch(
67+
&mut self,
68+
values: &[ArrayRef],
69+
group_indices: &[usize],
70+
opt_filter: Option<&BooleanArray>,
71+
total_num_groups: usize,
72+
) -> datafusion_common::Result<()> {
73+
debug_assert_eq!(values.len(), 1);
74+
self.num_groups = self.num_groups.max(total_num_groups);
75+
let arr = values[0].as_primitive::<T>();
76+
accumulate(group_indices, arr, opt_filter, |group_idx, value| {
77+
self.seen.insert((group_idx, value));
78+
});
79+
Ok(())
80+
}
81+
82+
fn evaluate(&mut self, emit_to: EmitTo) -> datafusion_common::Result<ArrayRef> {
83+
let num_emitted = match emit_to {
84+
EmitTo::All => self.num_groups,
85+
EmitTo::First(n) => n,
86+
};
87+
88+
let mut counts = vec![0i64; num_emitted];
89+
90+
if matches!(emit_to, EmitTo::All) {
91+
for &(group_idx, _) in self.seen.iter() {
92+
counts[group_idx] += 1;
93+
}
94+
self.seen.clear();
95+
self.num_groups = 0;
96+
} else {
97+
let mut remaining = HashSet::default();
98+
for (group_idx, value) in self.seen.drain() {
99+
if group_idx < num_emitted {
100+
counts[group_idx] += 1;
101+
} else {
102+
remaining.insert((group_idx - num_emitted, value));
103+
}
104+
}
105+
self.seen = remaining;
106+
self.num_groups = self.num_groups.saturating_sub(num_emitted);
107+
}
108+
109+
Ok(Arc::new(Int64Array::from(counts)))
110+
}
111+
112+
fn state(&mut self, emit_to: EmitTo) -> datafusion_common::Result<Vec<ArrayRef>> {
113+
let num_emitted = match emit_to {
114+
EmitTo::All => self.num_groups,
115+
EmitTo::First(n) => n,
116+
};
117+
118+
let mut group_values: Vec<Vec<T::Native>> = vec![Vec::new(); num_emitted];
119+
120+
if matches!(emit_to, EmitTo::All) {
121+
for (group_idx, value) in self.seen.drain() {
122+
group_values[group_idx].push(value);
123+
}
124+
self.num_groups = 0;
125+
} else {
126+
let mut remaining = HashSet::default();
127+
for (group_idx, value) in self.seen.drain() {
128+
if group_idx < num_emitted {
129+
group_values[group_idx].push(value);
130+
} else {
131+
remaining.insert((group_idx - num_emitted, value));
132+
}
133+
}
134+
self.seen = remaining;
135+
self.num_groups = self.num_groups.saturating_sub(num_emitted);
136+
}
137+
138+
let mut offsets = vec![0i32];
139+
let mut all_values = Vec::new();
140+
for values in &group_values {
141+
all_values.extend(values.iter().copied());
142+
offsets.push(all_values.len() as i32);
143+
}
144+
145+
let values_array = Arc::new(PrimitiveArray::<T>::from_iter_values(all_values));
146+
let list_array = ListArray::new(
147+
Arc::new(Field::new_list_field(T::DATA_TYPE, true)),
148+
OffsetBuffer::new(offsets.into()),
149+
values_array,
150+
None,
151+
);
152+
153+
Ok(vec![Arc::new(list_array)])
154+
}
155+
156+
fn merge_batch(
157+
&mut self,
158+
values: &[ArrayRef],
159+
group_indices: &[usize],
160+
_opt_filter: Option<&BooleanArray>,
161+
total_num_groups: usize,
162+
) -> datafusion_common::Result<()> {
163+
debug_assert_eq!(values.len(), 1);
164+
self.num_groups = self.num_groups.max(total_num_groups);
165+
let list_array = values[0].as_list::<i32>();
166+
167+
for (row_idx, group_idx) in group_indices.iter().enumerate() {
168+
let inner = list_array.value(row_idx);
169+
let inner_arr = inner.as_primitive::<T>();
170+
for value in inner_arr.values().iter() {
171+
self.seen.insert((*group_idx, *value));
172+
}
173+
}
174+
175+
Ok(())
176+
}
177+
178+
fn size(&self) -> usize {
179+
size_of::<Self>()
180+
+ self.seen.capacity() * (size_of::<(usize, T::Native)>() + size_of::<u64>())
181+
}
182+
}

datafusion/functions-aggregate/benches/count_distinct.rs

Lines changed: 53 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use std::sync::Arc;
1919

2020
use arrow::array::{
21-
Array, ArrayRef, Int8Array, Int16Array, Int64Array, UInt8Array, UInt16Array,
21+
ArrayRef, Int8Array, Int16Array, Int64Array, UInt8Array, UInt16Array,
2222
};
2323
use arrow::datatypes::{DataType, Field, Schema};
2424
use criterion::{Criterion, criterion_group, criterion_main};
@@ -87,6 +87,13 @@ fn create_i16_array(n_distinct: usize) -> Int16Array {
8787
.collect()
8888
}
8989

90+
fn create_group_indices(num_groups: usize) -> Vec<usize> {
91+
let mut rng = StdRng::seed_from_u64(42);
92+
(0..BATCH_SIZE)
93+
.map(|_| rng.random_range(0..num_groups))
94+
.collect()
95+
}
96+
9097
fn prepare_args(data_type: DataType) -> (Arc<Schema>, AccumulatorArgs<'static>) {
9198
let schema = Arc::new(Schema::new(vec![Field::new("f", data_type, true)]));
9299
let schema_leaked: &'static Schema = Box::leak(Box::new((*schema).clone()));
@@ -174,103 +181,62 @@ fn count_distinct_benchmark(c: &mut Criterion) {
174181
});
175182
}
176183

177-
/// Create group indices with uniform distribution
178-
fn create_uniform_groups(num_groups: usize) -> Vec<usize> {
179-
let mut rng = StdRng::seed_from_u64(42);
180-
(0..BATCH_SIZE)
181-
.map(|_| rng.random_range(0..num_groups))
182-
.collect()
183-
}
184-
185-
/// Create group indices with skewed distribution (80% in 20% of groups)
186-
fn create_skewed_groups(num_groups: usize) -> Vec<usize> {
187-
let mut rng = StdRng::seed_from_u64(42);
188-
let hot_groups = (num_groups / 5).max(1);
189-
(0..BATCH_SIZE)
190-
.map(|_| {
191-
if rng.random_range(0..100) < 80 {
192-
rng.random_range(0..hot_groups)
193-
} else {
194-
rng.random_range(0..num_groups)
195-
}
196-
})
197-
.collect()
198-
}
199-
200184
fn count_distinct_groups_benchmark(c: &mut Criterion) {
201185
let count_fn = Count::new();
202186

203-
// bench different scenarios
204-
let scenarios = [
205-
// (name, num_groups, distinct_pct, group_fn)
206-
("sparse_uniform", 10, 80, "uniform"),
207-
("moderate_uniform", 100, 80, "uniform"),
208-
("dense_uniform", 1000, 80, "uniform"),
209-
("sparse_skewed", 10, 80, "skewed"),
210-
("dense_skewed", 1000, 80, "skewed"),
211-
("sparse_high_cardinality", 10, 99, "uniform"),
212-
("dense_low_cardinality", 1000, 20, "uniform"),
213-
];
214-
215-
for (name, num_groups, distinct_pct, group_type) in scenarios {
216-
let n_distinct = BATCH_SIZE * distinct_pct / 100;
187+
for num_groups in [10, 100, 1000] {
188+
let n_distinct = BATCH_SIZE * 80 / 100;
217189
let values = Arc::new(create_i64_array(n_distinct)) as ArrayRef;
218-
let group_indices = if group_type == "uniform" {
219-
create_uniform_groups(num_groups)
220-
} else {
221-
create_skewed_groups(num_groups)
222-
};
190+
let group_indices = create_group_indices(num_groups);
223191

224192
let (_schema, args) = prepare_args(DataType::Int64);
225193

226194
if count_fn.groups_accumulator_supported(args.clone()) {
227-
c.bench_function(&format!("count_distinct_groups {name}"), |b| {
228-
b.iter(|| {
229-
let mut acc =
230-
count_fn.create_groups_accumulator(args.clone()).unwrap();
231-
acc.update_batch(
232-
std::slice::from_ref(&values),
233-
&group_indices,
234-
None,
235-
num_groups,
236-
)
237-
.unwrap();
238-
acc.evaluate(EmitTo::All).unwrap()
239-
})
240-
});
195+
c.bench_function(
196+
&format!("count_distinct_groups i64 {num_groups} groups"),
197+
|b| {
198+
b.iter(|| {
199+
let (_schema, args) = prepare_args(DataType::Int64);
200+
let mut acc = count_fn.create_groups_accumulator(args).unwrap();
201+
acc.update_batch(
202+
std::slice::from_ref(&values),
203+
&group_indices,
204+
None,
205+
num_groups,
206+
)
207+
.unwrap();
208+
acc.evaluate(EmitTo::All).unwrap()
209+
})
210+
},
211+
);
241212
} else {
242-
let arr = values.as_any().downcast_ref::<Int64Array>().unwrap();
243-
let mut group_rows: Vec<Vec<i64>> = vec![Vec::new(); num_groups];
244-
for (idx, &group_idx) in group_indices.iter().enumerate() {
245-
if arr.is_valid(idx) {
246-
group_rows[group_idx].push(arr.value(idx));
247-
}
248-
}
249-
let group_arrays: Vec<ArrayRef> = group_rows
250-
.iter()
251-
.map(|rows| Arc::new(Int64Array::from(rows.clone())) as ArrayRef)
252-
.collect();
253-
254-
c.bench_function(&format!("count_distinct_groups {name}"), |b| {
255-
b.iter(|| {
256-
let mut accumulators: Vec<_> = (0..num_groups)
257-
.map(|_| prepare_accumulator(DataType::Int64))
258-
.collect();
259-
260-
for (group_idx, batch) in group_arrays.iter().enumerate() {
261-
if !batch.is_empty() {
262-
accumulators[group_idx]
263-
.update_batch(std::slice::from_ref(batch))
264-
.unwrap();
213+
c.bench_function(
214+
&format!("count_distinct_groups i64 {num_groups} groups"),
215+
|b| {
216+
b.iter(|| {
217+
let mut accumulators: Vec<_> = (0..num_groups)
218+
.map(|_| prepare_accumulator(DataType::Int64))
219+
.collect();
220+
221+
let arr = values.as_any().downcast_ref::<Int64Array>().unwrap();
222+
for (idx, group_idx) in group_indices.iter().enumerate() {
223+
if let Some(val) = arr.value(idx).into() {
224+
let single_val =
225+
Arc::new(Int64Array::from(vec![Some(val)]))
226+
as ArrayRef;
227+
accumulators[*group_idx]
228+
.update_batch(std::slice::from_ref(&single_val))
229+
.unwrap();
230+
}
265231
}
266-
}
267232

268-
let _results: Vec<_> = accumulators
269-
.iter_mut()
270-
.map(|acc| acc.evaluate().unwrap())
271-
.collect();
272-
})
273-
});
233+
let _results: Vec<_> = accumulators
234+
.iter_mut()
235+
.map(|acc| acc.evaluate().unwrap())
236+
.collect();
237+
})
238+
},
239+
);
274240
}
275241
}
276242
}

0 commit comments

Comments
 (0)