Skip to content

Commit 352209b

Browse files
committed
Improve string_agg performance with eager mode adjustments
Rework the string_agg accumulator to initiate in eager mode, reducing unnecessary allocations. Restore an eager append helper for the hot path and enhance promotion logic to use lightweight size estimates from Arrow buffers. This allows short payloads to remain on the eager path while enabling deferred copying for larger batches. Add regression tests to ensure short payloads do not promote and mixed eager/deferred batches operate correctly.
1 parent efe7c64 commit 352209b

1 file changed

Lines changed: 150 additions & 78 deletions

File tree

datafusion/functions-aggregate/src/string_agg.rs

Lines changed: 150 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -330,12 +330,16 @@ struct StringAggGroupsAccumulator {
330330
values: Vec<Option<String>>,
331331
/// Running total of bytes stored in `values`.
332332
total_data_bytes: usize,
333+
/// Deferred bookkeeping is allocated lazily after promotion.
334+
deferred: Option<DeferredRows>,
335+
}
336+
337+
#[derive(Debug, Default)]
338+
struct DeferredRows {
333339
/// Source arrays retained from input batches or merged state batches.
334340
batches: Vec<ArrayRef>,
335341
/// Per-batch `(group_idx, row_idx)` pairs for non-null rows.
336342
batch_entries: Vec<Vec<(u32, u32)>>,
337-
/// Total number of groups tracked.
338-
num_groups: usize,
339343
}
340344

341345
enum StringInputArray<'a> {
@@ -355,12 +359,6 @@ macro_rules! dispatch_string_input_array {
355359
}
356360

357361
impl<'a> StringInputArray<'a> {
358-
fn sample_non_null_len(&self) -> Option<usize> {
359-
dispatch_string_input_array!(self, array => {
360-
array.iter().flatten().next().map(str::len)
361-
})
362-
}
363-
364362
fn try_new(array: &'a ArrayRef) -> Result<Self> {
365363
match array.data_type() {
366364
DataType::Utf8 => Ok(Self::Utf8(array.as_string::<i32>())),
@@ -376,22 +374,6 @@ impl<'a> StringInputArray<'a> {
376374
})
377375
}
378376

379-
fn append_materialized(
380-
&self,
381-
values: &mut [Option<String>],
382-
group_indices: &[usize],
383-
delimiter: &str,
384-
) -> usize {
385-
dispatch_string_input_array!(self, array => {
386-
StringAggGroupsAccumulator::append_batch_typed(
387-
values,
388-
array.iter(),
389-
group_indices,
390-
delimiter,
391-
)
392-
})
393-
}
394-
395377
fn append_batch_values(
396378
&self,
397379
values: &mut [Option<String>],
@@ -420,9 +402,7 @@ impl StringAggGroupsAccumulator {
420402
delimiter,
421403
values: Vec::new(),
422404
total_data_bytes: 0,
423-
batches: Vec::new(),
424-
batch_entries: Vec::new(),
425-
num_groups: 0,
405+
deferred: None,
426406
}
427407
}
428408

@@ -431,17 +411,19 @@ impl StringAggGroupsAccumulator {
431411
// buffers instead of using `clear()`.
432412
self.values = Vec::new();
433413
self.total_data_bytes = 0;
434-
self.batches = Vec::new();
435-
self.batch_entries = Vec::new();
436-
self.num_groups = 0;
414+
self.deferred = None;
437415
}
438416

439-
fn retain_after_emit(&mut self, emit_groups: usize) {
417+
fn retain_after_emit(deferred: &mut DeferredRows, emit_groups: usize) {
440418
let emit_groups = emit_groups as u32;
441-
let mut retained_batches = Vec::with_capacity(self.batches.len());
442-
let mut retained_entries = Vec::with_capacity(self.batch_entries.len());
443-
444-
for (batch, entries) in self.batches.drain(..).zip(self.batch_entries.drain(..)) {
419+
let mut retained_batches = Vec::with_capacity(deferred.batches.len());
420+
let mut retained_entries = Vec::with_capacity(deferred.batch_entries.len());
421+
422+
for (batch, entries) in deferred
423+
.batches
424+
.drain(..)
425+
.zip(deferred.batch_entries.drain(..))
426+
{
445427
let entries: Vec<_> = entries
446428
.into_iter()
447429
.filter_map(|(group_idx, row_idx)| {
@@ -463,8 +445,8 @@ impl StringAggGroupsAccumulator {
463445
retained_entries.push(entries);
464446
}
465447

466-
self.batches = retained_batches;
467-
self.batch_entries = retained_entries;
448+
deferred.batches = retained_batches;
449+
deferred.batch_entries = retained_entries;
468450
}
469451

470452
fn append_rows_typed<'a, A>(array: &A, group_indices: &[usize]) -> Vec<(u32, u32)>
@@ -520,6 +502,36 @@ impl StringAggGroupsAccumulator {
520502
.sum()
521503
}
522504

505+
fn append_eager_batch(
506+
&mut self,
507+
array: &ArrayRef,
508+
group_indices: &[usize],
509+
) -> Result<()> {
510+
let added = match array.data_type() {
511+
DataType::Utf8 => Self::append_batch_typed(
512+
&mut self.values,
513+
array.as_string::<i32>().iter(),
514+
group_indices,
515+
&self.delimiter,
516+
),
517+
DataType::LargeUtf8 => Self::append_batch_typed(
518+
&mut self.values,
519+
array.as_string::<i64>().iter(),
520+
group_indices,
521+
&self.delimiter,
522+
),
523+
DataType::Utf8View => Self::append_batch_typed(
524+
&mut self.values,
525+
array.as_string_view().iter(),
526+
group_indices,
527+
&self.delimiter,
528+
),
529+
other => return internal_err!("string_agg unexpected data type: {other}"),
530+
};
531+
self.total_data_bytes += added;
532+
Ok(())
533+
}
534+
523535
fn append_batch_values_typed<'a, A>(
524536
values: &mut [Option<String>],
525537
entries: &[(u32, u32)],
@@ -562,16 +574,48 @@ impl StringAggGroupsAccumulator {
562574
Ok(())
563575
}
564576

565-
fn should_defer(
566-
&self,
567-
input: &StringInputArray<'_>,
568-
total_num_groups: usize,
569-
) -> bool {
577+
fn estimated_payload_len(array: &ArrayRef) -> Option<usize> {
578+
let non_null_rows = array.len().saturating_sub(array.null_count());
579+
if non_null_rows == 0 {
580+
return None;
581+
}
582+
583+
match array.data_type() {
584+
DataType::Utf8 => {
585+
Some(array.as_string::<i32>().value_data().len() / non_null_rows)
586+
}
587+
DataType::LargeUtf8 => {
588+
Some(array.as_string::<i64>().value_data().len() / non_null_rows)
589+
}
590+
DataType::Utf8View => Some(
591+
array
592+
.as_string_view()
593+
.data_buffers()
594+
.iter()
595+
.map(|buffer| buffer.len())
596+
.sum::<usize>()
597+
/ non_null_rows,
598+
),
599+
_ => None,
600+
}
601+
}
602+
603+
fn should_promote(&self, array: &ArrayRef, total_num_groups: usize) -> bool {
570604
total_num_groups >= Self::DEFER_GROUP_THRESHOLD
571-
&& input
572-
.sample_non_null_len()
605+
&& Self::estimated_payload_len(array)
573606
.is_some_and(|len| len >= Self::DEFER_PAYLOAD_LEN_THRESHOLD)
574607
}
608+
609+
fn defer_batch(&mut self, array: ArrayRef, group_indices: &[usize]) -> Result<()> {
610+
let input = StringInputArray::try_new(&array)?;
611+
let entries = input.append_rows(group_indices);
612+
if !entries.is_empty() {
613+
let deferred = self.deferred.get_or_insert_with(DeferredRows::default);
614+
deferred.batches.push(array);
615+
deferred.batch_entries.push(entries);
616+
}
617+
Ok(())
618+
}
575619
}
576620

577621
impl GroupsAccumulator for StringAggGroupsAccumulator {
@@ -582,23 +626,15 @@ impl GroupsAccumulator for StringAggGroupsAccumulator {
582626
opt_filter: Option<&BooleanArray>,
583627
total_num_groups: usize,
584628
) -> Result<()> {
585-
self.num_groups = self.num_groups.max(total_num_groups);
586629
self.values.resize(total_num_groups, None);
587630
let array = apply_filter_as_nulls(&values[0], opt_filter)?;
588-
let input = StringInputArray::try_new(&array)?;
589631

590-
if self.should_defer(&input, total_num_groups) {
591-
let entries = input.append_rows(group_indices);
592-
if !entries.is_empty() {
593-
self.batches.push(array);
594-
self.batch_entries.push(entries);
595-
}
632+
if self.deferred.is_some() {
633+
self.defer_batch(array, group_indices)?;
634+
} else if self.should_promote(&array, total_num_groups) {
635+
self.defer_batch(array, group_indices)?;
596636
} else {
597-
self.total_data_bytes += input.append_materialized(
598-
&mut self.values,
599-
group_indices,
600-
&self.delimiter,
601-
);
637+
self.append_eager_batch(&array, group_indices)?;
602638
}
603639

604640
Ok(())
@@ -613,21 +649,27 @@ impl GroupsAccumulator for StringAggGroupsAccumulator {
613649
.sum();
614650
self.total_data_bytes -= emitted_bytes;
615651

616-
for (batch, entries) in self.batches.iter().zip(&self.batch_entries) {
617-
Self::append_batch_values(
618-
&mut to_emit,
619-
entries,
620-
batch,
621-
&self.delimiter,
622-
emit_groups,
623-
)?;
652+
if let Some(deferred) = &self.deferred {
653+
for (batch, entries) in deferred.batches.iter().zip(&deferred.batch_entries) {
654+
Self::append_batch_values(
655+
&mut to_emit,
656+
entries,
657+
batch,
658+
&self.delimiter,
659+
emit_groups,
660+
)?;
661+
}
624662
}
625663

626664
match emit_to {
627665
EmitTo::All => self.clear_state(),
628666
EmitTo::First(_) => {
629-
self.retain_after_emit(emit_groups);
630-
self.num_groups = self.values.len();
667+
if let Some(deferred) = &mut self.deferred {
668+
Self::retain_after_emit(deferred, emit_groups);
669+
if deferred.batches.is_empty() {
670+
self.deferred = None;
671+
}
672+
}
631673
}
632674
}
633675

@@ -671,17 +713,25 @@ impl GroupsAccumulator for StringAggGroupsAccumulator {
671713
self.total_data_bytes
672714
+ self.values.capacity() * size_of::<Option<String>>()
673715
+ self
674-
.batches
675-
.iter()
676-
.map(|arr| arr.to_data().get_slice_memory_size().unwrap_or_default())
677-
.sum::<usize>()
678-
+ self.batches.capacity() * size_of::<ArrayRef>()
679-
+ self
680-
.batch_entries
681-
.iter()
682-
.map(|entries| entries.capacity() * size_of::<(u32, u32)>())
683-
.sum::<usize>()
684-
+ self.batch_entries.capacity() * size_of::<Vec<(u32, u32)>>()
716+
.deferred
717+
.as_ref()
718+
.map(|deferred| {
719+
deferred
720+
.batches
721+
.iter()
722+
.map(|arr| {
723+
arr.to_data().get_slice_memory_size().unwrap_or_default()
724+
})
725+
.sum::<usize>()
726+
+ deferred.batches.capacity() * size_of::<ArrayRef>()
727+
+ deferred
728+
.batch_entries
729+
.iter()
730+
.map(|entries| entries.capacity() * size_of::<(u32, u32)>())
731+
.sum::<usize>()
732+
+ deferred.batch_entries.capacity() * size_of::<Vec<(u32, u32)>>()
733+
})
734+
.unwrap_or_default()
685735
+ self.delimiter.capacity()
686736
+ size_of_val(self)
687737
}
@@ -1201,13 +1251,15 @@ mod tests {
12011251
let eager_values: ArrayRef =
12021252
Arc::new(LargeStringArray::from(vec!["a", "b", "c", "d"]));
12031253
acc.update_batch(&[eager_values], &[0, 1, 0, 1], None, 40)?;
1254+
assert!(acc.deferred.is_none());
12041255

12051256
let deferred_values: ArrayRef = Arc::new(LargeStringArray::from(vec![
12061257
"large0_abcdefghijklmnopqrstuvwxyzabcdef",
12071258
"large1_bcdefghijklmnopqrstuvwxyzabcdefg",
12081259
"large2_cdefghijklmnopqrstuvwxyzabcdefgh",
12091260
]));
12101261
acc.update_batch(&[deferred_values], &[0, 1, 39], None, 40)?;
1262+
assert!(acc.deferred.is_some());
12111263

12121264
let result = evaluate_groups(&mut acc, EmitTo::First(2));
12131265
assert_eq!(
@@ -1224,4 +1276,24 @@ mod tests {
12241276
assert_eq!(remaining, expected);
12251277
Ok(())
12261278
}
1279+
1280+
#[test]
1281+
fn groups_short_payloads_do_not_promote_to_deferred() -> Result<()> {
1282+
let mut acc = make_groups_acc(",");
1283+
let values: ArrayRef = Arc::new(LargeStringArray::from(vec![
1284+
"aaa", "bbb", "ccc", "ddd", "eee", "fff",
1285+
]));
1286+
1287+
acc.update_batch(&[values], &[0, 1, 39, 38, 0, 1], None, 40)?;
1288+
1289+
assert!(acc.deferred.is_none());
1290+
let result = evaluate_groups(&mut acc, EmitTo::All);
1291+
let mut expected = vec![None; 40];
1292+
expected[0] = Some("aaa,eee".to_string());
1293+
expected[1] = Some("bbb,fff".to_string());
1294+
expected[38] = Some("ddd".to_string());
1295+
expected[39] = Some("ccc".to_string());
1296+
assert_eq!(result, expected);
1297+
Ok(())
1298+
}
12271299
}

0 commit comments

Comments
 (0)