Skip to content

Commit 0187a7c

Browse files
committed
modify benchmark
1 parent efd0f55 commit 0187a7c

2 files changed

Lines changed: 135 additions & 38 deletions

File tree

datafusion/core/benches/aggregate_query_sql.rs

Lines changed: 62 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
mod data_utils;
1919

20-
use criterion::{Criterion, criterion_group, criterion_main};
21-
use data_utils::create_table_provider;
20+
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
21+
use data_utils::{Utf8PayloadProfile, create_table_provider_with_payload};
2222
use datafusion::error::Result;
2323
use datafusion::execution::context::SessionContext;
2424
use parking_lot::Mutex;
@@ -36,13 +36,38 @@ fn create_context(
3636
partitions_len: usize,
3737
array_len: usize,
3838
batch_size: usize,
39+
) -> Result<Arc<Mutex<SessionContext>>> {
40+
create_context_with_payload(
41+
partitions_len,
42+
array_len,
43+
batch_size,
44+
Utf8PayloadProfile::Small,
45+
)
46+
}
47+
48+
fn create_context_with_payload(
49+
partitions_len: usize,
50+
array_len: usize,
51+
batch_size: usize,
52+
utf8_payload_profile: Utf8PayloadProfile,
3953
) -> Result<Arc<Mutex<SessionContext>>> {
4054
let ctx = SessionContext::new();
41-
let provider = create_table_provider(partitions_len, array_len, batch_size)?;
55+
let provider = create_table_provider_with_payload(
56+
partitions_len,
57+
array_len,
58+
batch_size,
59+
utf8_payload_profile,
60+
)?;
4261
ctx.register_table("t", provider)?;
4362
Ok(Arc::new(Mutex::new(ctx)))
4463
}
4564

65+
fn string_agg_sql(group_by_column: &str) -> String {
66+
format!(
67+
"SELECT {group_by_column}, string_agg(utf8, ',') FROM t GROUP BY {group_by_column}"
68+
)
69+
}
70+
4671
fn criterion_benchmark(c: &mut Criterion) {
4772
let partitions_len = 8;
4873
let array_len = 32768 * 2; // 2^16
@@ -296,38 +321,42 @@ fn criterion_benchmark(c: &mut Criterion) {
296321
})
297322
});
298323

299-
c.bench_function("string_agg_query_group_by_few_groups", |b| {
300-
b.iter(|| {
301-
query(
302-
ctx.clone(),
303-
&rt,
304-
"SELECT u64_narrow, string_agg(utf8, ',') \
305-
FROM t GROUP BY u64_narrow",
306-
)
307-
})
308-
});
324+
// These payload sizes keep the original 4-value cardinality while changing
325+
// only the bytes copied into grouped `string_agg` state:
326+
// - small_3b preserves the existing `hi0`..`hi3` baseline
327+
// - medium_64b makes copy costs measurable without overwhelming the query
328+
// - large_1024b stresses both CPU and memory behavior
329+
let string_agg_profiles = [
330+
(Utf8PayloadProfile::Small, "small_3b"),
331+
(Utf8PayloadProfile::Medium, "medium_64b"),
332+
(Utf8PayloadProfile::Large, "large_1024b"),
333+
]
334+
.into_iter()
335+
.map(|(profile, label)| {
336+
(
337+
label,
338+
create_context_with_payload(partitions_len, array_len, batch_size, profile)
339+
.unwrap(),
340+
)
341+
})
342+
.collect::<Vec<_>>();
309343

310-
c.bench_function("string_agg_query_group_by_mid_groups", |b| {
311-
b.iter(|| {
312-
query(
313-
ctx.clone(),
314-
&rt,
315-
"SELECT u64_mid, string_agg(utf8, ',') \
316-
FROM t GROUP BY u64_mid",
317-
)
318-
})
319-
});
344+
let string_agg_queries = [
345+
("few_groups", string_agg_sql("u64_narrow")),
346+
("mid_groups", string_agg_sql("u64_mid")),
347+
("many_groups", string_agg_sql("u64_wide")),
348+
];
320349

321-
c.bench_function("string_agg_query_group_by_many_groups", |b| {
322-
b.iter(|| {
323-
query(
324-
ctx.clone(),
325-
&rt,
326-
"SELECT u64_wide, string_agg(utf8, ',') \
327-
FROM t GROUP BY u64_wide",
328-
)
329-
})
330-
});
350+
let mut string_agg_group = c.benchmark_group("string_agg_payloads");
351+
for (query_name, sql) in &string_agg_queries {
352+
for (payload_name, payload_ctx) in &string_agg_profiles {
353+
string_agg_group
354+
.bench_function(BenchmarkId::new(*query_name, *payload_name), |b| {
355+
b.iter(|| query(payload_ctx.clone(), &rt, sql))
356+
});
357+
}
358+
}
359+
string_agg_group.finish();
331360
}
332361

333362
criterion_group!(benches, criterion_benchmark);

datafusion/core/benches/data_utils/mod.rs

Lines changed: 73 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,23 @@ use rand_distr::{Normal, Pareto};
3535
use std::fmt::Write;
3636
use std::sync::Arc;
3737

38+
/// Payload profile for the benchmark `utf8` column.
39+
///
40+
/// The small profile preserves the existing `hi0`..`hi3` baseline. Medium and
41+
/// large profiles keep the same low cardinality but scale each value's byte
42+
/// width so string aggregation can expose the cost of copying larger payloads.
43+
#[derive(Clone, Copy, Debug)]
44+
pub enum Utf8PayloadProfile {
45+
/// 3-byte baseline values such as `hi0`.
46+
Small,
47+
/// 64-byte payloads that are large enough to make copying noticeable
48+
/// without dominating the benchmark with allocator churn.
49+
Medium,
50+
/// 1024-byte payloads that amplify both CPU and memory pressure in
51+
/// grouped `string_agg` workloads.
52+
Large,
53+
}
54+
3855
/// create an in-memory table given the partition len, array len, and batch size,
3956
/// and the result table will be of array_len in total, and then partitioned, and batched.
4057
#[expect(clippy::allow_attributes)] // some issue where expect(dead_code) doesn't fire properly
@@ -44,9 +61,32 @@ pub fn create_table_provider(
4461
array_len: usize,
4562
batch_size: usize,
4663
) -> Result<Arc<MemTable>> {
64+
create_table_provider_with_payload(
65+
partitions_len,
66+
array_len,
67+
batch_size,
68+
Utf8PayloadProfile::Small,
69+
)
70+
}
71+
72+
/// Create an in-memory table with a configurable `utf8` payload size.
73+
#[expect(clippy::allow_attributes)] // some issue where expect(dead_code) doesn't fire properly
74+
#[allow(dead_code)]
75+
pub fn create_table_provider_with_payload(
76+
partitions_len: usize,
77+
array_len: usize,
78+
batch_size: usize,
79+
utf8_payload_profile: Utf8PayloadProfile,
80+
) -> Result<Arc<MemTable>> {
81+
let _ = Utf8PayloadProfile::all();
4782
let schema = Arc::new(create_schema());
48-
let partitions =
49-
create_record_batches(&schema, array_len, partitions_len, batch_size);
83+
let partitions = create_record_batches(
84+
&schema,
85+
array_len,
86+
partitions_len,
87+
batch_size,
88+
utf8_payload_profile,
89+
);
5090
// declare a table in memory. In spark API, this corresponds to createDataFrame(...).
5191
MemTable::try_new(schema, partitions).map(Arc::new)
5292
}
@@ -91,12 +131,14 @@ fn create_record_batch(
91131
rng: &mut StdRng,
92132
batch_size: usize,
93133
batch_index: usize,
134+
payloads: &[String; 4],
94135
) -> RecordBatch {
95136
// Randomly choose from 4 distinct key values; a higher number increases sparseness.
96137
let key_suffixes = [0, 1, 2, 3];
97-
let keys = StringArray::from_iter_values(
98-
(0..batch_size).map(|_| format!("hi{}", key_suffixes.choose(rng).unwrap())),
99-
);
138+
let keys = StringArray::from_iter_values((0..batch_size).map(|_| {
139+
let suffix = *key_suffixes.choose(rng).unwrap();
140+
payloads[suffix].as_str()
141+
}));
100142

101143
let values = create_data(rng, batch_size, 0.5);
102144

@@ -146,10 +188,12 @@ pub fn create_record_batches(
146188
array_len: usize,
147189
partitions_len: usize,
148190
batch_size: usize,
191+
utf8_payload_profile: Utf8PayloadProfile,
149192
) -> Vec<Vec<RecordBatch>> {
150193
let mut rng = StdRng::seed_from_u64(42);
151194
let mut partitions = Vec::with_capacity(partitions_len);
152195
let batches_per_partition = array_len / batch_size / partitions_len;
196+
let payloads = utf8_payload_profile.payloads();
153197

154198
for _ in 0..partitions_len {
155199
let mut batches = Vec::with_capacity(batches_per_partition);
@@ -159,13 +203,37 @@ pub fn create_record_batches(
159203
&mut rng,
160204
batch_size,
161205
batch_index,
206+
&payloads,
162207
));
163208
}
164209
partitions.push(batches);
165210
}
166211
partitions
167212
}
168213

214+
impl Utf8PayloadProfile {
215+
fn all() -> [Self; 3] {
216+
[Self::Small, Self::Medium, Self::Large]
217+
}
218+
219+
fn payloads(self) -> [String; 4] {
220+
std::array::from_fn(|idx| match self {
221+
Self::Small => format!("hi{idx}"),
222+
Self::Medium => payload_string("mid", idx, 64),
223+
Self::Large => payload_string("large", idx, 1024),
224+
})
225+
}
226+
}
227+
228+
fn payload_string(prefix: &str, suffix: usize, target_len: usize) -> String {
229+
let mut value = format!("{prefix}{suffix}_");
230+
value.extend(std::iter::repeat_n(
231+
(b'a' + suffix as u8) as char,
232+
target_len - value.len(),
233+
));
234+
value
235+
}
236+
169237
/// An enum that wraps either a regular StringBuilder or a GenericByteViewBuilder
170238
/// so that both can be used interchangeably.
171239
enum TraceIdBuilder {

0 commit comments

Comments
 (0)