Skip to content

Commit 203efc4

Browse files
committed
optimized scalarvalues
1 parent aa69892 commit 203efc4

6 files changed

Lines changed: 549 additions & 34 deletions

File tree

datafusion/physical-plan/Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,3 +106,11 @@ required-features = ["test_utils"]
106106
harness = false
107107
name = "aggregate_vectorized"
108108
required-features = ["test_utils"]
109+
110+
[[bench]]
111+
name = "single_column_aggr"
112+
harness = false
113+
114+
[profile.profiling]
115+
inherits = "release"
116+
debug = true
Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
use arrow::array::{ArrayRef, StringArray, StringDictionaryBuilder};
2+
use arrow::datatypes::{DataType, Field, Schema, UInt8Type};
3+
use criterion::{Criterion, criterion_group, criterion_main};
4+
use datafusion_expr::EmitTo;
5+
use datafusion_physical_plan::aggregates::group_values::single_group_by::dictionary::GroupValuesDictionary;
6+
use datafusion_physical_plan::aggregates::group_values::{GroupValues, new_group_values};
7+
use datafusion_physical_plan::aggregates::order::GroupOrdering;
8+
use std::sync::Arc;
9+
#[derive(Debug)]
10+
enum Cardinality {
11+
Xsmall, // 1
12+
Small, // 10
13+
Medium, // 50
14+
Large, // 200
15+
}
16+
#[derive(Debug)]
17+
enum BatchSize {
18+
Small, // 8192
19+
Medium, // 32768
20+
Large, // 65536
21+
}
22+
#[derive(Debug)]
23+
enum NullRate {
24+
Zero, // 0%
25+
Low, // 1%
26+
Medium, // 5%
27+
High, // 20%
28+
}
29+
#[derive(Debug, Clone)]
30+
enum GroupType {
31+
Dictionary,
32+
GroupValueRows,
33+
Utf8,
34+
}
35+
fn create_string_values(cardinality: &Cardinality) -> Vec<String> {
36+
let num_values = match cardinality {
37+
Cardinality::Xsmall => 3,
38+
Cardinality::Small => 10,
39+
Cardinality::Medium => 50,
40+
Cardinality::Large => 200,
41+
};
42+
(0..num_values)
43+
.map(|i| format!("group_value_{:06}", i))
44+
.collect()
45+
}
46+
fn create_batch(batch_size: &BatchSize, cardinality: &Cardinality) -> Vec<String> {
47+
let size = match batch_size {
48+
BatchSize::Small => 8192,
49+
BatchSize::Medium => 32768,
50+
BatchSize::Large => 65536,
51+
};
52+
let unique_strings = create_string_values(cardinality);
53+
if unique_strings.is_empty() {
54+
return Vec::new();
55+
}
56+
57+
unique_strings.iter().cycle().take(size).cloned().collect()
58+
}
59+
fn strings_to_dict_array(values: Vec<Option<String>>) -> ArrayRef {
60+
let mut builder = StringDictionaryBuilder::<UInt8Type>::new();
61+
for v in values {
62+
match v {
63+
Some(v) => builder.append_value(v),
64+
None => builder.append_null(),
65+
}
66+
}
67+
Arc::new(builder.finish())
68+
}
69+
fn introduce_nulls(values: Vec<String>, null_rate: &NullRate) -> Vec<Option<String>> {
70+
let rate = match null_rate {
71+
NullRate::Zero => 0.0,
72+
NullRate::Low => 0.01,
73+
NullRate::Medium => 0.05,
74+
NullRate::High => 0.20,
75+
};
76+
values
77+
.into_iter()
78+
.map(|v| {
79+
if rand::random::<f64>() < rate {
80+
None
81+
} else {
82+
Some(v)
83+
}
84+
})
85+
.collect()
86+
}
87+
88+
fn generate_group_values(kind: GroupType) -> Box<dyn GroupValues> {
89+
match kind {
90+
GroupType::GroupValueRows => {
91+
// we know this is going to hit the fallback path I.E GroupValueRows, but for the sake of avoiding making private items public call the public api
92+
let schema = Arc::new(Schema::new(vec![Field::new(
93+
"group_col",
94+
DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
95+
false,
96+
)]));
97+
new_group_values(schema, &GroupOrdering::None).unwrap()
98+
}
99+
GroupType::Dictionary => {
100+
// call custom path directly
101+
Box::new(GroupValuesDictionary::<UInt8Type>::new(&DataType::Utf8))
102+
}
103+
GroupType::Utf8 => {
104+
//let batch = create_batch(batch_size, cardinality);
105+
//let array = StringArray::from(batch);
106+
// Create GroupValues implementation for Utf8 type
107+
let schema = Arc::new(Schema::new(vec![Field::new(
108+
"group_col",
109+
DataType::Utf8,
110+
false,
111+
)]));
112+
new_group_values(schema, &GroupOrdering::None).unwrap()
113+
}
114+
}
115+
}
116+
117+
fn bench_single_column_group_values(c: &mut Criterion) {
118+
let group_types = [GroupType::GroupValueRows, GroupType::Dictionary];
119+
let cardinalities = [
120+
Cardinality::Xsmall,
121+
/*
122+
Cardinality::Small,
123+
Cardinality::Medium,*/
124+
Cardinality::Large,
125+
];
126+
let batch_sizes = [
127+
/*BatchSize::Small, BatchSize::Medium, */ BatchSize::Large,
128+
];
129+
let null_rates = [
130+
NullRate::Zero,
131+
/*NullRate::Low, NullRate::Medium,*/ NullRate::High,
132+
];
133+
134+
for cardinality in &cardinalities {
135+
for batch_size in &batch_sizes {
136+
for null_rate in &null_rates {
137+
for group_type in &group_types {
138+
let group_name = format!(
139+
"{:?}_cardinality_{:?}_batch_{:?}_null_rate_{:?}",
140+
group_type, cardinality, batch_size, null_rate
141+
);
142+
143+
let string_vec = create_batch(batch_size, cardinality);
144+
let nullable_values = introduce_nulls(string_vec, null_rate);
145+
let col_ref = match group_type {
146+
GroupType::Utf8 => {
147+
Arc::new(StringArray::from(nullable_values.clone()))
148+
as ArrayRef
149+
}
150+
GroupType::Dictionary | GroupType::GroupValueRows => {
151+
strings_to_dict_array(nullable_values.clone())
152+
}
153+
};
154+
c.bench_function(&group_name, |b| {
155+
b.iter_batched(
156+
|| {
157+
//create fresh group values for each iteration
158+
let gv = generate_group_values(group_type.clone());
159+
let col = col_ref.clone();
160+
(gv, col)
161+
},
162+
|(mut group_values, col)| {
163+
let mut groups = Vec::new();
164+
group_values.intern(&[col], &mut groups).unwrap();
165+
//group_values.emit(EmitTo::All).unwrap();
166+
},
167+
criterion::BatchSize::SmallInput,
168+
);
169+
});
170+
171+
/* Second benchmark that alternates between intern and emit to simulate more realistic usage patterns where the same group values is used across multiple batches of the same grouping column
172+
let multi_batch_name = format!(
173+
"multi_batch/{:?}_cardinality_{:?}_batch_{:?}_null_rate_{:?}",
174+
group_type, cardinality, batch_size, null_rate
175+
);
176+
c.bench_function(&multi_batch_name, |b| {
177+
b.iter_batched(
178+
|| {
179+
// setup - create 3 batches to simulate multiple record batches
180+
let gv = generate_group_values(group_type.clone());
181+
let batch1 = col_ref.clone();
182+
let batch2 = col_ref.clone();
183+
let batch3 = col_ref.clone();
184+
(gv, batch1, batch2, batch3)
185+
},
186+
|(mut group_values, batch1, batch2, batch3)| {
187+
// simulate realistic aggregation flow:
188+
// multiple intern calls (one per record batch)
189+
// followed by emit
190+
let mut groups = Vec::new();
191+
192+
group_values.intern(&[batch1], &mut groups).unwrap();
193+
groups.clear();
194+
group_values.intern(&[batch2], &mut groups).unwrap();
195+
groups.clear();
196+
group_values.intern(&[batch3], &mut groups).unwrap();
197+
198+
// emit once at the end like the real aggregation flow
199+
group_values.emit(EmitTo::All).unwrap();
200+
},
201+
criterion::BatchSize::SmallInput,
202+
);
203+
});*/
204+
}
205+
}
206+
}
207+
}
208+
}
209+
210+
fn bench_repeated_intern_prefab_cols(c: &mut Criterion) {
211+
let cardinality = Cardinality::Small;
212+
let batch_size = BatchSize::Small;
213+
let null_rate = NullRate::Low;
214+
let group_types = [GroupType::GroupValueRows, GroupType::Dictionary];
215+
216+
for group_type in &group_types {
217+
let group_type = group_type.clone();
218+
let string_vec = create_batch(&batch_size, &cardinality);
219+
let nullable_values = introduce_nulls(string_vec, &null_rate);
220+
let col_ref = match group_type {
221+
GroupType::Utf8 => {
222+
Arc::new(StringArray::from(nullable_values.clone())) as ArrayRef
223+
}
224+
GroupType::Dictionary | GroupType::GroupValueRows => {
225+
strings_to_dict_array(nullable_values.clone())
226+
}
227+
};
228+
229+
// Build once outside the benchmark iteration and reuse in intern calls.
230+
let arr1 = col_ref.clone();
231+
let arr2 = col_ref.clone();
232+
let arr3 = col_ref.clone();
233+
let arr4 = col_ref.clone();
234+
235+
let group_name = format!(
236+
"repeated_intern/{:?}_cardinality_{:?}_batch_{:?}_null_rate_{:?}",
237+
group_type, cardinality, batch_size, null_rate
238+
);
239+
c.bench_function(&group_name, |b| {
240+
b.iter_batched(
241+
|| generate_group_values(group_type.clone()),
242+
|mut group_values| {
243+
let mut groups = Vec::new();
244+
245+
group_values
246+
.intern(std::slice::from_ref(&arr1), &mut groups)
247+
.unwrap();
248+
groups.clear();
249+
group_values
250+
.intern(std::slice::from_ref(&arr2), &mut groups)
251+
.unwrap();
252+
groups.clear();
253+
group_values
254+
.intern(std::slice::from_ref(&arr3), &mut groups)
255+
.unwrap();
256+
groups.clear();
257+
group_values
258+
.intern(std::slice::from_ref(&arr4), &mut groups)
259+
.unwrap();
260+
},
261+
criterion::BatchSize::SmallInput,
262+
);
263+
});
264+
}
265+
}
266+
267+
criterion_group!(
268+
benches,
269+
bench_single_column_group_values,
270+
bench_repeated_intern_prefab_cols
271+
);
272+
criterion_main!(benches);
159 KB
Binary file not shown.

datafusion/physical-plan/src/aggregates/group_values/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ use datafusion_expr::EmitTo;
3030

3131
pub mod multi_group_by;
3232

33-
mod row;
34-
mod single_group_by;
33+
pub mod row;
34+
pub mod single_group_by;
3535
use datafusion_physical_expr::binary_map::OutputType;
3636
use multi_group_by::GroupValuesColumn;
3737
use row::GroupValuesRows;
@@ -197,6 +197,7 @@ pub fn new_group_values(
197197
DataType::Boolean => {
198198
return Ok(Box::new(GroupValuesBoolean::new()));
199199
}
200+
/*
200201
DataType::Dictionary(key_type, value_type) => {
201202
if supported_single_dictionary_value(value_type) {
202203
return match key_type.as_ref() {
@@ -246,7 +247,7 @@ pub fn new_group_values(
246247
)),
247248
};
248249
}
249-
}
250+
}*/
250251
_ => {}
251252
}
252253
}

0 commit comments

Comments
 (0)