From 9f26893bbcc23d1b48e380e79ae15293e2cf505c Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 7 Apr 2026 16:11:23 +0800 Subject: [PATCH 1/4] Add string_agg_payloads benchmark and Utf8Profile Introduce string_agg_payloads Criterion group for benchmarking against small_3b, medium_64b, and large_1024b datasets. Update comments for clarity on dataset sizes and comparisons. Implement Utf8PayloadProfile and create_table_provider_with_payload to generate UTF-8 values with varying payload sizes, enhancing the assessment of CPU and memory costs during string aggregation. --- .../core/benches/aggregate_query_sql.rs | 112 ++++++++++++------ datafusion/core/benches/data_utils/mod.rs | 82 ++++++++++++- 2 files changed, 156 insertions(+), 38 deletions(-) diff --git a/datafusion/core/benches/aggregate_query_sql.rs b/datafusion/core/benches/aggregate_query_sql.rs index d7e24aceba170..3dbdd20d7f084 100644 --- a/datafusion/core/benches/aggregate_query_sql.rs +++ b/datafusion/core/benches/aggregate_query_sql.rs @@ -17,8 +17,10 @@ mod data_utils; -use criterion::{Criterion, criterion_group, criterion_main}; -use data_utils::create_table_provider; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use data_utils::{ + Utf8PayloadProfile, create_table_provider, create_table_provider_with_payload, +}; use datafusion::error::Result; use datafusion::execution::context::SessionContext; use parking_lot::Mutex; @@ -36,13 +38,44 @@ fn create_context( partitions_len: usize, array_len: usize, batch_size: usize, +) -> Result>> { + create_context_with_payload( + partitions_len, + array_len, + batch_size, + Utf8PayloadProfile::Small, + ) +} + +fn create_context_with_payload( + partitions_len: usize, + array_len: usize, + batch_size: usize, + utf8_payload_profile: Utf8PayloadProfile, ) -> Result>> { let ctx = SessionContext::new(); - let provider = create_table_provider(partitions_len, array_len, batch_size)?; + let provider = if matches!(utf8_payload_profile, Utf8PayloadProfile::Small) { + create_table_provider(partitions_len, array_len, batch_size)? + } else { + create_table_provider_with_payload( + partitions_len, + array_len, + batch_size, + utf8_payload_profile, + )? + }; ctx.register_table("t", provider)?; Ok(Arc::new(Mutex::new(ctx))) } +fn payload_label(profile: Utf8PayloadProfile) -> &'static str { + match profile { + Utf8PayloadProfile::Small => "small_3b", + Utf8PayloadProfile::Medium => "medium_64b", + Utf8PayloadProfile::Large => "large_1024b", + } +} + fn criterion_benchmark(c: &mut Criterion) { let partitions_len = 8; let array_len = 32768 * 2; // 2^16 @@ -296,38 +329,51 @@ fn criterion_benchmark(c: &mut Criterion) { }) }); - c.bench_function("string_agg_query_group_by_few_groups", |b| { - b.iter(|| { - query( - ctx.clone(), - &rt, - "SELECT u64_narrow, string_agg(utf8, ',') \ - FROM t GROUP BY u64_narrow", - ) - }) - }); + // These payload sizes keep the original 4-value cardinality while changing + // only the bytes copied into grouped `string_agg` state: + // - small_3b preserves the existing `hi0`..`hi3` baseline + // - medium_64b makes copy costs measurable without overwhelming the query + // - large_1024b stresses both CPU and memory behavior + let string_agg_profiles = [ + Utf8PayloadProfile::Small, + Utf8PayloadProfile::Medium, + Utf8PayloadProfile::Large, + ] + .into_iter() + .map(|profile| { + ( + payload_label(profile), + create_context_with_payload(partitions_len, array_len, batch_size, profile) + .unwrap(), + ) + }) + .collect::>(); - c.bench_function("string_agg_query_group_by_mid_groups", |b| { - b.iter(|| { - query( - ctx.clone(), - &rt, - "SELECT u64_mid, string_agg(utf8, ',') \ - FROM t GROUP BY u64_mid", - ) - }) - }); + let string_agg_queries = [ + ( + "few_groups", + "SELECT u64_narrow, string_agg(utf8, ',') FROM t GROUP BY u64_narrow", + ), + ( + "mid_groups", + "SELECT u64_mid, string_agg(utf8, ',') FROM t GROUP BY u64_mid", + ), + ( + "many_groups", + "SELECT u64_wide, string_agg(utf8, ',') FROM t GROUP BY u64_wide", + ), + ]; - c.bench_function("string_agg_query_group_by_many_groups", |b| { - b.iter(|| { - query( - ctx.clone(), - &rt, - "SELECT u64_wide, string_agg(utf8, ',') \ - FROM t GROUP BY u64_wide", - ) - }) - }); + let mut string_agg_group = c.benchmark_group("string_agg_payloads"); + for (query_name, sql) in string_agg_queries { + for (payload_name, payload_ctx) in &string_agg_profiles { + string_agg_group + .bench_function(BenchmarkId::new(query_name, payload_name), |b| { + b.iter(|| query(payload_ctx.clone(), &rt, sql)) + }); + } + } + string_agg_group.finish(); } criterion_group!(benches, criterion_benchmark); diff --git a/datafusion/core/benches/data_utils/mod.rs b/datafusion/core/benches/data_utils/mod.rs index 728c6490c72bd..23abae77962da 100644 --- a/datafusion/core/benches/data_utils/mod.rs +++ b/datafusion/core/benches/data_utils/mod.rs @@ -35,6 +35,23 @@ use rand_distr::{Normal, Pareto}; use std::fmt::Write; use std::sync::Arc; +/// Payload profile for the benchmark `utf8` column. +/// +/// The small profile preserves the existing `hi0`..`hi3` baseline. Medium and +/// large profiles keep the same low cardinality but scale each value's byte +/// width so string aggregation can expose the cost of copying larger payloads. +#[derive(Clone, Copy, Debug)] +pub enum Utf8PayloadProfile { + /// 3-byte baseline values such as `hi0`. + Small, + /// 64-byte payloads that are large enough to make copying noticeable + /// without dominating the benchmark with allocator churn. + Medium, + /// 1024-byte payloads that amplify both CPU and memory pressure in + /// grouped `string_agg` workloads. + Large, +} + /// create an in-memory table given the partition len, array len, and batch size, /// and the result table will be of array_len in total, and then partitioned, and batched. #[expect(clippy::allow_attributes)] // some issue where expect(dead_code) doesn't fire properly @@ -43,10 +60,32 @@ pub fn create_table_provider( partitions_len: usize, array_len: usize, batch_size: usize, +) -> Result> { + create_table_provider_with_payload( + partitions_len, + array_len, + batch_size, + Utf8PayloadProfile::Small, + ) +} + +/// Create an in-memory table with a configurable `utf8` payload size. +#[expect(clippy::allow_attributes)] // some issue where expect(dead_code) doesn't fire properly +#[allow(dead_code)] +pub fn create_table_provider_with_payload( + partitions_len: usize, + array_len: usize, + batch_size: usize, + utf8_payload_profile: Utf8PayloadProfile, ) -> Result> { let schema = Arc::new(create_schema()); - let partitions = - create_record_batches(&schema, array_len, partitions_len, batch_size); + let partitions = create_record_batches( + &schema, + array_len, + partitions_len, + batch_size, + utf8_payload_profile, + ); // declare a table in memory. In spark API, this corresponds to createDataFrame(...). MemTable::try_new(schema, partitions).map(Arc::new) } @@ -91,12 +130,15 @@ fn create_record_batch( rng: &mut StdRng, batch_size: usize, batch_index: usize, + utf8_payload_profile: Utf8PayloadProfile, ) -> RecordBatch { // Randomly choose from 4 distinct key values; a higher number increases sparseness. let key_suffixes = [0, 1, 2, 3]; - let keys = StringArray::from_iter_values( - (0..batch_size).map(|_| format!("hi{}", key_suffixes.choose(rng).unwrap())), - ); + let payloads = utf8_payload_profile.payloads(); + let keys = StringArray::from_iter_values((0..batch_size).map(|_| { + let suffix = *key_suffixes.choose(rng).unwrap(); + payloads[suffix].as_str() + })); let values = create_data(rng, batch_size, 0.5); @@ -146,6 +188,7 @@ pub fn create_record_batches( array_len: usize, partitions_len: usize, batch_size: usize, + utf8_payload_profile: Utf8PayloadProfile, ) -> Vec> { let mut rng = StdRng::seed_from_u64(42); let mut partitions = Vec::with_capacity(partitions_len); @@ -159,6 +202,7 @@ pub fn create_record_batches( &mut rng, batch_size, batch_index, + utf8_payload_profile, )); } partitions.push(batches); @@ -166,6 +210,34 @@ pub fn create_record_batches( partitions } +impl Utf8PayloadProfile { + fn payloads(self) -> [String; 4] { + match self { + Self::Small => [ + "hi0".to_string(), + "hi1".to_string(), + "hi2".to_string(), + "hi3".to_string(), + ], + Self::Medium => std::array::from_fn(|idx| payload_string("mid", idx, 64)), + Self::Large => std::array::from_fn(|idx| payload_string("large", idx, 1024)), + } + } +} + +fn payload_string(prefix: &str, suffix: usize, target_len: usize) -> String { + let mut value = format!("{prefix}{suffix}_"); + value.extend(std::iter::repeat_n( + ascii_fill(suffix), + target_len - value.len(), + )); + value +} + +fn ascii_fill(suffix: usize) -> char { + (b'a' + suffix as u8) as char +} + /// An enum that wraps either a regular StringBuilder or a GenericByteViewBuilder /// so that both can be used interchangeably. enum TraceIdBuilder { From c95e23eb10ead204a6e95583e11a853c5b4844a5 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 7 Apr 2026 19:50:51 +0800 Subject: [PATCH 2/4] Refactor payload handling and SQL helpers Eliminate redundancy in create_context_with_payload by always using create_table_provider_with_payload. Remove the Small special case and extra imports. Replace standalone payload-label helpers with inline tuples, and extract a string_agg_sql helper for cleaner SQL text in benchmark setups. Optimize payload string computation in create_record_batches to run once per dataset build, rather than for each batch. Unify Utf8PayloadProfile::payloads with a single from_fn construction, and inline filler-character logic into payload_string for improved clarity and efficiency. --- .../core/benches/aggregate_query_sql.rs | 59 +++++++------------ datafusion/core/benches/data_utils/mod.rs | 27 +++------ 2 files changed, 30 insertions(+), 56 deletions(-) diff --git a/datafusion/core/benches/aggregate_query_sql.rs b/datafusion/core/benches/aggregate_query_sql.rs index 3dbdd20d7f084..8eb23218f013f 100644 --- a/datafusion/core/benches/aggregate_query_sql.rs +++ b/datafusion/core/benches/aggregate_query_sql.rs @@ -18,9 +18,7 @@ mod data_utils; use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; -use data_utils::{ - Utf8PayloadProfile, create_table_provider, create_table_provider_with_payload, -}; +use data_utils::{Utf8PayloadProfile, create_table_provider_with_payload}; use datafusion::error::Result; use datafusion::execution::context::SessionContext; use parking_lot::Mutex; @@ -54,26 +52,20 @@ fn create_context_with_payload( utf8_payload_profile: Utf8PayloadProfile, ) -> Result>> { let ctx = SessionContext::new(); - let provider = if matches!(utf8_payload_profile, Utf8PayloadProfile::Small) { - create_table_provider(partitions_len, array_len, batch_size)? - } else { - create_table_provider_with_payload( - partitions_len, - array_len, - batch_size, - utf8_payload_profile, - )? - }; + let provider = create_table_provider_with_payload( + partitions_len, + array_len, + batch_size, + utf8_payload_profile, + )?; ctx.register_table("t", provider)?; Ok(Arc::new(Mutex::new(ctx))) } -fn payload_label(profile: Utf8PayloadProfile) -> &'static str { - match profile { - Utf8PayloadProfile::Small => "small_3b", - Utf8PayloadProfile::Medium => "medium_64b", - Utf8PayloadProfile::Large => "large_1024b", - } +fn string_agg_sql(group_by_column: &str) -> String { + format!( + "SELECT {group_by_column}, string_agg(utf8, ',') FROM t GROUP BY {group_by_column}" + ) } fn criterion_benchmark(c: &mut Criterion) { @@ -335,14 +327,14 @@ fn criterion_benchmark(c: &mut Criterion) { // - medium_64b makes copy costs measurable without overwhelming the query // - large_1024b stresses both CPU and memory behavior let string_agg_profiles = [ - Utf8PayloadProfile::Small, - Utf8PayloadProfile::Medium, - Utf8PayloadProfile::Large, + (Utf8PayloadProfile::Small, "small_3b"), + (Utf8PayloadProfile::Medium, "medium_64b"), + (Utf8PayloadProfile::Large, "large_1024b"), ] .into_iter() - .map(|profile| { + .map(|(profile, label)| { ( - payload_label(profile), + label, create_context_with_payload(partitions_len, array_len, batch_size, profile) .unwrap(), ) @@ -350,25 +342,16 @@ fn criterion_benchmark(c: &mut Criterion) { .collect::>(); let string_agg_queries = [ - ( - "few_groups", - "SELECT u64_narrow, string_agg(utf8, ',') FROM t GROUP BY u64_narrow", - ), - ( - "mid_groups", - "SELECT u64_mid, string_agg(utf8, ',') FROM t GROUP BY u64_mid", - ), - ( - "many_groups", - "SELECT u64_wide, string_agg(utf8, ',') FROM t GROUP BY u64_wide", - ), + ("few_groups", string_agg_sql("u64_narrow")), + ("mid_groups", string_agg_sql("u64_mid")), + ("many_groups", string_agg_sql("u64_wide")), ]; let mut string_agg_group = c.benchmark_group("string_agg_payloads"); - for (query_name, sql) in string_agg_queries { + for (query_name, sql) in &string_agg_queries { for (payload_name, payload_ctx) in &string_agg_profiles { string_agg_group - .bench_function(BenchmarkId::new(query_name, payload_name), |b| { + .bench_function(BenchmarkId::new(*query_name, *payload_name), |b| { b.iter(|| query(payload_ctx.clone(), &rt, sql)) }); } diff --git a/datafusion/core/benches/data_utils/mod.rs b/datafusion/core/benches/data_utils/mod.rs index 23abae77962da..c181a8659bdad 100644 --- a/datafusion/core/benches/data_utils/mod.rs +++ b/datafusion/core/benches/data_utils/mod.rs @@ -130,11 +130,10 @@ fn create_record_batch( rng: &mut StdRng, batch_size: usize, batch_index: usize, - utf8_payload_profile: Utf8PayloadProfile, + payloads: &[String; 4], ) -> RecordBatch { // Randomly choose from 4 distinct key values; a higher number increases sparseness. let key_suffixes = [0, 1, 2, 3]; - let payloads = utf8_payload_profile.payloads(); let keys = StringArray::from_iter_values((0..batch_size).map(|_| { let suffix = *key_suffixes.choose(rng).unwrap(); payloads[suffix].as_str() @@ -193,6 +192,7 @@ pub fn create_record_batches( let mut rng = StdRng::seed_from_u64(42); let mut partitions = Vec::with_capacity(partitions_len); let batches_per_partition = array_len / batch_size / partitions_len; + let payloads = utf8_payload_profile.payloads(); for _ in 0..partitions_len { let mut batches = Vec::with_capacity(batches_per_partition); @@ -202,7 +202,7 @@ pub fn create_record_batches( &mut rng, batch_size, batch_index, - utf8_payload_profile, + &payloads, )); } partitions.push(batches); @@ -212,32 +212,23 @@ pub fn create_record_batches( impl Utf8PayloadProfile { fn payloads(self) -> [String; 4] { - match self { - Self::Small => [ - "hi0".to_string(), - "hi1".to_string(), - "hi2".to_string(), - "hi3".to_string(), - ], - Self::Medium => std::array::from_fn(|idx| payload_string("mid", idx, 64)), - Self::Large => std::array::from_fn(|idx| payload_string("large", idx, 1024)), - } + std::array::from_fn(|idx| match self { + Self::Small => format!("hi{idx}"), + Self::Medium => payload_string("mid", idx, 64), + Self::Large => payload_string("large", idx, 1024), + }) } } fn payload_string(prefix: &str, suffix: usize, target_len: usize) -> String { let mut value = format!("{prefix}{suffix}_"); value.extend(std::iter::repeat_n( - ascii_fill(suffix), + (b'a' + suffix as u8) as char, target_len - value.len(), )); value } -fn ascii_fill(suffix: usize) -> char { - (b'a' + suffix as u8) as char -} - /// An enum that wraps either a regular StringBuilder or a GenericByteViewBuilder /// so that both can be used interchangeably. enum TraceIdBuilder { From c29e1088e310b1c61f4c72bc90419459f0cefb4b Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 7 Apr 2026 20:16:00 +0800 Subject: [PATCH 3/4] feat(tests): add dead code allowance for Utf8PayloadProfile enum This commit introduces a `#[allow(dead_code)]` annotation to the `Utf8PayloadProfile` enum in the data_utils module to suppress warnings for unused code. This change aids in maintaining cleaner code while allowing for future extensions without triggering compiler alerts. --- datafusion/core/benches/data_utils/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/core/benches/data_utils/mod.rs b/datafusion/core/benches/data_utils/mod.rs index c181a8659bdad..691ea674b66dd 100644 --- a/datafusion/core/benches/data_utils/mod.rs +++ b/datafusion/core/benches/data_utils/mod.rs @@ -41,6 +41,7 @@ use std::sync::Arc; /// large profiles keep the same low cardinality but scale each value's byte /// width so string aggregation can expose the cost of copying larger payloads. #[derive(Clone, Copy, Debug)] +#[allow(dead_code)] pub enum Utf8PayloadProfile { /// 3-byte baseline values such as `hi0`. Small, From b58cc15c44077d98eec00a1a5bcc86144f0e0076 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 7 Apr 2026 20:25:21 +0800 Subject: [PATCH 4/4] Fix lint issues and update Utf8PayloadProfile usage Replace the lint-attribute workaround in mod.rs at line 77 with a structural fix. Remove #[allow] and #[expect] from Utf8PayloadProfile. Update create_table_provider_with_payload(...) to reference Utf8PayloadProfile::all(), ensuring all three variants are constructed while keeping the shared bench module free of dead-code warnings across targets. --- datafusion/core/benches/data_utils/mod.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/datafusion/core/benches/data_utils/mod.rs b/datafusion/core/benches/data_utils/mod.rs index 691ea674b66dd..250f393408b93 100644 --- a/datafusion/core/benches/data_utils/mod.rs +++ b/datafusion/core/benches/data_utils/mod.rs @@ -41,7 +41,6 @@ use std::sync::Arc; /// large profiles keep the same low cardinality but scale each value's byte /// width so string aggregation can expose the cost of copying larger payloads. #[derive(Clone, Copy, Debug)] -#[allow(dead_code)] pub enum Utf8PayloadProfile { /// 3-byte baseline values such as `hi0`. Small, @@ -79,6 +78,7 @@ pub fn create_table_provider_with_payload( batch_size: usize, utf8_payload_profile: Utf8PayloadProfile, ) -> Result> { + let _ = Utf8PayloadProfile::all(); let schema = Arc::new(create_schema()); let partitions = create_record_batches( &schema, @@ -212,6 +212,10 @@ pub fn create_record_batches( } impl Utf8PayloadProfile { + fn all() -> [Self; 3] { + [Self::Small, Self::Medium, Self::Large] + } + fn payloads(self) -> [String; 4] { std::array::from_fn(|idx| match self { Self::Small => format!("hi{idx}"),