@@ -87,6 +87,13 @@ fn create_i16_array(n_distinct: usize) -> Int16Array {
8787 . collect ( )
8888}
8989
90+ fn create_group_indices ( num_groups : usize ) -> Vec < usize > {
91+ let mut rng = StdRng :: seed_from_u64 ( 42 ) ;
92+ ( 0 ..BATCH_SIZE )
93+ . map ( |_| rng. random_range ( 0 ..num_groups) )
94+ . collect ( )
95+ }
96+
9097fn prepare_args ( data_type : DataType ) -> ( Arc < Schema > , AccumulatorArgs < ' static > ) {
9198 let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new( "f" , data_type, true ) ] ) ) ;
9299 let schema_leaked: & ' static Schema = Box :: leak ( Box :: new ( ( * schema) . clone ( ) ) ) ;
@@ -174,101 +181,58 @@ fn count_distinct_benchmark(c: &mut Criterion) {
174181 } ) ;
175182}
176183
177- /// Create group indices with uniform distribution
178- fn create_uniform_groups ( num_groups : usize ) -> Vec < usize > {
179- let mut rng = StdRng :: seed_from_u64 ( 42 ) ;
180- ( 0 ..BATCH_SIZE )
181- . map ( |_| rng. random_range ( 0 ..num_groups) )
182- . collect ( )
183- }
184-
185- /// Create group indices with skewed distribution (80% in 20% of groups)
186- fn create_skewed_groups ( num_groups : usize ) -> Vec < usize > {
187- let mut rng = StdRng :: seed_from_u64 ( 42 ) ;
188- let hot_groups = ( num_groups / 5 ) . max ( 1 ) ;
189- ( 0 ..BATCH_SIZE )
190- . map ( |_| {
191- if rng. random_range ( 0 ..100 ) < 80 {
192- rng. random_range ( 0 ..hot_groups)
193- } else {
194- rng. random_range ( 0 ..num_groups)
195- }
196- } )
197- . collect ( )
198- }
199-
200184fn count_distinct_groups_benchmark ( c : & mut Criterion ) {
201185 let count_fn = Count :: new ( ) ;
202186
203- // bench different scenarios
204- let scenarios = [
205- // (name, num_groups, distinct_pct, group_fn)
206- ( "sparse_uniform" , 10 , 80 , "uniform" ) ,
207- ( "moderate_uniform" , 100 , 80 , "uniform" ) ,
208- ( "dense_uniform" , 1000 , 80 , "uniform" ) ,
209- ( "sparse_skewed" , 10 , 80 , "skewed" ) ,
210- ( "dense_skewed" , 1000 , 80 , "skewed" ) ,
211- ( "sparse_high_cardinality" , 10 , 99 , "uniform" ) ,
212- ( "dense_low_cardinality" , 1000 , 20 , "uniform" ) ,
213- ] ;
214-
215- for ( name, num_groups, distinct_pct, group_type) in scenarios {
216- let n_distinct = BATCH_SIZE * distinct_pct / 100 ;
187+ for num_groups in [ 10 , 100 , 1000 ] {
188+ let n_distinct = BATCH_SIZE * 80 / 100 ;
217189 let values = Arc :: new ( create_i64_array ( n_distinct) ) as ArrayRef ;
218- let group_indices = if group_type == "uniform" {
219- create_uniform_groups ( num_groups)
220- } else {
221- create_skewed_groups ( num_groups)
222- } ;
190+ let group_indices = create_group_indices ( num_groups) ;
223191
224192 let ( _schema, args) = prepare_args ( DataType :: Int64 ) ;
225193
226194 if count_fn. groups_accumulator_supported ( args. clone ( ) ) {
227- c. bench_function ( & format ! ( "count_distinct_groups {name}" ) , |b| {
228- b. iter ( || {
229- let ( _schema, args) = prepare_args ( DataType :: Int64 ) ;
230- let mut acc = count_fn. create_groups_accumulator ( args) . unwrap ( ) ;
231- acc. update_batch (
232- std:: slice:: from_ref ( & values) ,
233- & group_indices,
234- None ,
235- num_groups,
236- )
237- . unwrap ( ) ;
238- acc. evaluate ( EmitTo :: All ) . unwrap ( )
239- } )
240- } ) ;
195+ c. bench_function (
196+ & format ! ( "count_distinct_groups i64 {num_groups} groups (GroupsAccumulator)" ) ,
197+ |b| {
198+ b. iter ( || {
199+ let ( _schema, args) = prepare_args ( DataType :: Int64 ) ;
200+ let mut acc = count_fn. create_groups_accumulator ( args) . unwrap ( ) ;
201+ acc. update_batch ( & [ values. clone ( ) ] , & group_indices, None , num_groups)
202+ . unwrap ( ) ;
203+ acc. evaluate ( EmitTo :: All ) . unwrap ( )
204+ } )
205+ } ,
206+ ) ;
241207 } else {
242- c. bench_function ( & format ! ( "count_distinct_groups {name}" ) , |b| {
243- b. iter ( || {
244- let mut accumulators: Vec < _ > = ( 0 ..num_groups)
245- . map ( |_| prepare_accumulator ( DataType :: Int64 ) )
246- . collect ( ) ;
247-
248- let arr = values. as_any ( ) . downcast_ref :: < Int64Array > ( ) . unwrap ( ) ;
249- for ( idx, group_idx) in group_indices. iter ( ) . enumerate ( ) {
250- if let Some ( val) = arr. value ( idx) . into ( ) {
251- let single_val =
252- Arc :: new ( Int64Array :: from ( vec ! [ Some ( val) ] ) ) as ArrayRef ;
253- accumulators[ * group_idx]
254- . update_batch ( std:: slice:: from_ref ( & single_val) )
255- . unwrap ( ) ;
208+ c. bench_function (
209+ & format ! ( "count_distinct_groups i64 {num_groups} groups (N Accumulators)" ) ,
210+ |b| {
211+ b. iter ( || {
212+ let mut accumulators: Vec < _ > = ( 0 ..num_groups)
213+ . map ( |_| prepare_accumulator ( DataType :: Int64 ) )
214+ . collect ( ) ;
215+
216+ let arr = values. as_any ( ) . downcast_ref :: < Int64Array > ( ) . unwrap ( ) ;
217+ for ( idx, group_idx) in group_indices. iter ( ) . enumerate ( ) {
218+ if let Some ( val) = arr. value ( idx) . into ( ) {
219+ let single_val = Arc :: new ( Int64Array :: from ( vec ! [ Some ( val) ] ) ) as ArrayRef ;
220+ accumulators[ * group_idx]
221+ . update_batch ( std:: slice:: from_ref ( & single_val) )
222+ . unwrap ( ) ;
223+ }
256224 }
257- }
258225
259- let _results: Vec < _ > = accumulators
260- . iter_mut ( )
261- . map ( |acc| acc. evaluate ( ) . unwrap ( ) )
262- . collect ( ) ;
263- } )
264- } ) ;
226+ let _results: Vec < _ > = accumulators
227+ . iter_mut ( )
228+ . map ( |acc| acc. evaluate ( ) . unwrap ( ) )
229+ . collect ( ) ;
230+ } )
231+ } ,
232+ ) ;
265233 }
266234 }
267235}
268236
269- criterion_group ! (
270- benches,
271- count_distinct_benchmark,
272- count_distinct_groups_benchmark
273- ) ;
237+ criterion_group ! ( benches, count_distinct_benchmark, count_distinct_groups_benchmark) ;
274238criterion_main ! ( benches) ;
0 commit comments