@@ -23,7 +23,7 @@ use arrow::array::{
2323use arrow:: datatypes:: { DataType , Field , Schema } ;
2424use criterion:: { Criterion , criterion_group, criterion_main} ;
2525use datafusion_expr:: function:: AccumulatorArgs ;
26- use datafusion_expr:: { Accumulator , AggregateUDFImpl } ;
26+ use datafusion_expr:: { Accumulator , AggregateUDFImpl , EmitTo } ;
2727use datafusion_functions_aggregate:: count:: Count ;
2828use datafusion_physical_expr:: expressions:: col;
2929use rand:: rngs:: StdRng ;
@@ -87,6 +87,30 @@ fn create_i16_array(n_distinct: usize) -> Int16Array {
8787 . collect ( )
8888}
8989
90+ fn prepare_args ( data_type : DataType ) -> ( Arc < Schema > , AccumulatorArgs < ' static > ) {
91+ let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new( "f" , data_type, true ) ] ) ) ;
92+ let schema_leaked: & ' static Schema = Box :: leak ( Box :: new ( ( * schema) . clone ( ) ) ) ;
93+ let expr = col ( "f" , schema_leaked) . unwrap ( ) ;
94+ let expr_leaked: & ' static _ = Box :: leak ( Box :: new ( expr) ) ;
95+ let return_field: Arc < Field > = Field :: new ( "f" , DataType :: Int64 , true ) . into ( ) ;
96+ let return_field_leaked: & ' static _ = Box :: leak ( Box :: new ( return_field. clone ( ) ) ) ;
97+ let expr_field = expr_leaked. return_field ( schema_leaked) . unwrap ( ) ;
98+ let expr_field_leaked: & ' static _ = Box :: leak ( Box :: new ( expr_field) ) ;
99+
100+ let accumulator_args = AccumulatorArgs {
101+ return_field : return_field_leaked. clone ( ) ,
102+ schema : schema_leaked,
103+ expr_fields : std:: slice:: from_ref ( expr_field_leaked) ,
104+ ignore_nulls : false ,
105+ order_bys : & [ ] ,
106+ is_reversed : false ,
107+ name : "count(distinct f)" ,
108+ is_distinct : true ,
109+ exprs : std:: slice:: from_ref ( expr_leaked) ,
110+ } ;
111+ ( schema, accumulator_args)
112+ }
113+
90114fn count_distinct_benchmark ( c : & mut Criterion ) {
91115 for pct in [ 80 , 99 ] {
92116 let n_distinct = BATCH_SIZE * pct / 100 ;
@@ -150,5 +174,92 @@ fn count_distinct_benchmark(c: &mut Criterion) {
150174 } ) ;
151175}
152176
153- criterion_group ! ( benches, count_distinct_benchmark) ;
177+ /// Create group indices with uniform distribution
178+ fn create_uniform_groups ( num_groups : usize ) -> Vec < usize > {
179+ let mut rng = StdRng :: seed_from_u64 ( 42 ) ;
180+ ( 0 ..BATCH_SIZE )
181+ . map ( |_| rng. random_range ( 0 ..num_groups) )
182+ . collect ( )
183+ }
184+
185+ /// Create group indices with skewed distribution (80% in 20% of groups)
186+ fn create_skewed_groups ( num_groups : usize ) -> Vec < usize > {
187+ let mut rng = StdRng :: seed_from_u64 ( 42 ) ;
188+ let hot_groups = ( num_groups / 5 ) . max ( 1 ) ;
189+ ( 0 ..BATCH_SIZE )
190+ . map ( |_| {
191+ if rng. random_range ( 0 ..100 ) < 80 {
192+ rng. random_range ( 0 ..hot_groups)
193+ } else {
194+ rng. random_range ( 0 ..num_groups)
195+ }
196+ } )
197+ . collect ( )
198+ }
199+
200+ fn count_distinct_groups_benchmark ( c : & mut Criterion ) {
201+ let count_fn = Count :: new ( ) ;
202+
203+ // Test different scenarios
204+ let scenarios = [
205+ // (name, num_groups, distinct_pct, group_fn)
206+ ( "sparse_uniform" , 10 , 80 , "uniform" ) ,
207+ ( "moderate_uniform" , 100 , 80 , "uniform" ) ,
208+ ( "dense_uniform" , 1000 , 80 , "uniform" ) ,
209+ ( "sparse_skewed" , 10 , 80 , "skewed" ) ,
210+ ( "dense_skewed" , 1000 , 80 , "skewed" ) ,
211+ ( "sparse_high_cardinality" , 10 , 99 , "uniform" ) ,
212+ ( "dense_low_cardinality" , 1000 , 20 , "uniform" ) ,
213+ ] ;
214+
215+ for ( name, num_groups, distinct_pct, group_type) in scenarios {
216+ let n_distinct = BATCH_SIZE * distinct_pct / 100 ;
217+ let values = Arc :: new ( create_i64_array ( n_distinct) ) as ArrayRef ;
218+ let group_indices = if group_type == "uniform" {
219+ create_uniform_groups ( num_groups)
220+ } else {
221+ create_skewed_groups ( num_groups)
222+ } ;
223+
224+ let ( _schema, args) = prepare_args ( DataType :: Int64 ) ;
225+
226+ if count_fn. groups_accumulator_supported ( args. clone ( ) ) {
227+ c. bench_function ( & format ! ( "count_distinct_groups {name}" ) , |b| {
228+ b. iter ( || {
229+ let ( _schema, args) = prepare_args ( DataType :: Int64 ) ;
230+ let mut acc = count_fn. create_groups_accumulator ( args) . unwrap ( ) ;
231+ acc. update_batch ( & [ values. clone ( ) ] , & group_indices, None , num_groups)
232+ . unwrap ( ) ;
233+ acc. evaluate ( EmitTo :: All ) . unwrap ( )
234+ } )
235+ } ) ;
236+ } else {
237+ c. bench_function ( & format ! ( "count_distinct_groups {name}" ) , |b| {
238+ b. iter ( || {
239+ let mut accumulators: Vec < _ > = ( 0 ..num_groups)
240+ . map ( |_| prepare_accumulator ( DataType :: Int64 ) )
241+ . collect ( ) ;
242+
243+ let arr = values. as_any ( ) . downcast_ref :: < Int64Array > ( ) . unwrap ( ) ;
244+ for ( idx, group_idx) in group_indices. iter ( ) . enumerate ( ) {
245+ if let Some ( val) = arr. value ( idx) . into ( ) {
246+ let single_val =
247+ Arc :: new ( Int64Array :: from ( vec ! [ Some ( val) ] ) ) as ArrayRef ;
248+ accumulators[ * group_idx]
249+ . update_batch ( std:: slice:: from_ref ( & single_val) )
250+ . unwrap ( ) ;
251+ }
252+ }
253+
254+ let _results: Vec < _ > = accumulators
255+ . iter_mut ( )
256+ . map ( |acc| acc. evaluate ( ) . unwrap ( ) )
257+ . collect ( ) ;
258+ } )
259+ } ) ;
260+ }
261+ }
262+ }
263+
264+ criterion_group ! ( benches, count_distinct_benchmark, count_distinct_groups_benchmark) ;
154265criterion_main ! ( benches) ;
0 commit comments