Skip to content

Commit eb7b3c8

Browse files
committed
Refactor StringAggGroupsAccumulator for efficiency
Optimize StringAggGroupsAccumulator to retain input and state batches with metadata instead of building a Vec<Option<String>> on every update. Assemble concatenated strings lazily in evaluate() and state(). Adjust size() to reflect retained arrays and metadata. Support EmitTo::First(n) by emitting the required prefix and renumbering retained groups. Include note for future mixed-batch compaction work.
1 parent cdfade5 commit eb7b3c8

1 file changed

Lines changed: 153 additions & 38 deletions

File tree

datafusion/functions-aggregate/src/string_agg.rs

Lines changed: 153 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::sync::Arc;
2323

2424
use crate::array_agg::ArrayAgg;
2525

26-
use arrow::array::{ArrayRef, AsArray, BooleanArray, LargeStringArray};
26+
use arrow::array::{Array, ArrayRef, AsArray, BooleanArray, LargeStringArray};
2727
use arrow::datatypes::{DataType, Field, FieldRef};
2828
use datafusion_common::cast::{as_generic_string_array, as_string_view_array};
2929
use datafusion_common::{
@@ -323,46 +323,132 @@ fn filter_index<T: Clone>(values: &[T], index: usize) -> Vec<T> {
323323
struct StringAggGroupsAccumulator {
324324
/// The delimiter placed between concatenated values.
325325
delimiter: String,
326-
/// Accumulated string per group. `None` means no values have been seen
327-
/// (the group's output will be NULL).
328-
/// A potential improvement is to avoid this String allocation
329-
/// See <https://github.com/apache/datafusion/issues/21156>
330-
values: Vec<Option<String>>,
331-
/// Running total of string data bytes across all groups.
332-
total_data_bytes: usize,
326+
/// Source arrays retained from input batches or merged state batches.
327+
batches: Vec<ArrayRef>,
328+
/// Per-batch `(group_idx, row_idx)` pairs for non-null rows.
329+
batch_entries: Vec<Vec<(u32, u32)>>,
330+
/// Total number of groups tracked.
331+
num_groups: usize,
333332
}
334333

335334
impl StringAggGroupsAccumulator {
336335
fn new(delimiter: String) -> Self {
337336
Self {
338337
delimiter,
339-
values: Vec::new(),
340-
total_data_bytes: 0,
338+
batches: Vec::new(),
339+
batch_entries: Vec::new(),
340+
num_groups: 0,
341341
}
342342
}
343343

344-
fn append_batch<'a>(
344+
fn clear_state(&mut self) {
345+
// `size()` measures Vec capacity rather than len, so allocate new
346+
// buffers instead of using `clear()`.
347+
self.batches = Vec::new();
348+
self.batch_entries = Vec::new();
349+
self.num_groups = 0;
350+
}
351+
352+
fn retain_after_emit(&mut self, emit_groups: usize) {
353+
let emit_groups = emit_groups as u32;
354+
let mut retained_batches = Vec::with_capacity(self.batches.len());
355+
let mut retained_entries = Vec::with_capacity(self.batch_entries.len());
356+
357+
for (batch, mut entries) in
358+
self.batches.drain(..).zip(self.batch_entries.drain(..))
359+
{
360+
entries.retain(|(group_idx, _)| *group_idx >= emit_groups);
361+
if entries.is_empty() {
362+
continue;
363+
}
364+
365+
// Keep the original arrays for this prototype and only renumber
366+
// retained groups. SUB_ISSUE_04 will compact mixed batches so
367+
// partially emitted batches no longer pin their full inputs.
368+
for (group_idx, _) in &mut entries {
369+
*group_idx -= emit_groups;
370+
}
371+
372+
retained_batches.push(batch);
373+
retained_entries.push(entries);
374+
}
375+
376+
self.batches = retained_batches;
377+
self.batch_entries = retained_entries;
378+
self.num_groups -= emit_groups as usize;
379+
}
380+
381+
fn append_rows<'a>(
345382
&mut self,
346383
iter: impl Iterator<Item = Option<&'a str>>,
347384
group_indices: &[usize],
348-
) {
349-
for (opt_value, &group_idx) in iter.zip(group_indices.iter()) {
350-
if let Some(value) = opt_value {
351-
match &mut self.values[group_idx] {
385+
) -> Vec<(u32, u32)> {
386+
let mut entries = Vec::new();
387+
388+
for (row_idx, (opt_value, &group_idx)) in
389+
iter.zip(group_indices.iter()).enumerate()
390+
{
391+
if opt_value.is_some() {
392+
entries.push((group_idx as u32, row_idx as u32));
393+
}
394+
}
395+
396+
entries
397+
}
398+
399+
fn append_batch_values(
400+
values: &mut [Option<String>],
401+
entries: &[(u32, u32)],
402+
array: &ArrayRef,
403+
delimiter: &str,
404+
emit_groups: usize,
405+
) -> Result<()> {
406+
let append_value =
407+
|values: &mut [Option<String>], group_idx: usize, value: &str| {
408+
match &mut values[group_idx] {
352409
Some(existing) => {
353-
let added = self.delimiter.len() + value.len();
354-
existing.reserve(added);
355-
existing.push_str(&self.delimiter);
410+
existing.push_str(delimiter);
356411
existing.push_str(value);
357-
self.total_data_bytes += added;
358412
}
359-
slot @ None => {
360-
*slot = Some(value.to_string());
361-
self.total_data_bytes += value.len();
413+
slot @ None => *slot = Some(value.to_string()),
414+
}
415+
};
416+
417+
match array.data_type() {
418+
DataType::Utf8 => {
419+
let array = array.as_string::<i32>();
420+
for &(group_idx, row_idx) in entries {
421+
let group_idx = group_idx as usize;
422+
if group_idx >= emit_groups || array.is_null(row_idx as usize) {
423+
continue;
424+
}
425+
append_value(values, group_idx, array.value(row_idx as usize));
426+
}
427+
}
428+
DataType::LargeUtf8 => {
429+
let array = array.as_string::<i64>();
430+
for &(group_idx, row_idx) in entries {
431+
let group_idx = group_idx as usize;
432+
if group_idx >= emit_groups || array.is_null(row_idx as usize) {
433+
continue;
434+
}
435+
append_value(values, group_idx, array.value(row_idx as usize));
436+
}
437+
}
438+
DataType::Utf8View => {
439+
let array = array.as_string_view();
440+
for &(group_idx, row_idx) in entries {
441+
let group_idx = group_idx as usize;
442+
if group_idx >= emit_groups || array.is_null(row_idx as usize) {
443+
continue;
362444
}
445+
append_value(values, group_idx, array.value(row_idx as usize));
363446
}
364447
}
448+
other => return internal_err!("string_agg unexpected data type: {other}"),
365449
}
450+
451+
Ok(())
366452
}
367453
}
368454

@@ -374,32 +460,52 @@ impl GroupsAccumulator for StringAggGroupsAccumulator {
374460
opt_filter: Option<&BooleanArray>,
375461
total_num_groups: usize,
376462
) -> Result<()> {
377-
self.values.resize(total_num_groups, None);
463+
self.num_groups = self.num_groups.max(total_num_groups);
378464
let array = apply_filter_as_nulls(&values[0], opt_filter)?;
379-
match array.data_type() {
465+
466+
let entries = match array.data_type() {
380467
DataType::Utf8 => {
381-
self.append_batch(array.as_string::<i32>().iter(), group_indices)
468+
self.append_rows(array.as_string::<i32>().iter(), group_indices)
382469
}
383470
DataType::LargeUtf8 => {
384-
self.append_batch(array.as_string::<i64>().iter(), group_indices)
471+
self.append_rows(array.as_string::<i64>().iter(), group_indices)
385472
}
386473
DataType::Utf8View => {
387-
self.append_batch(array.as_string_view().iter(), group_indices)
388-
}
389-
other => {
390-
return internal_err!("string_agg unexpected data type: {other}");
474+
self.append_rows(array.as_string_view().iter(), group_indices)
391475
}
476+
other => return internal_err!("string_agg unexpected data type: {other}"),
477+
};
478+
479+
if !entries.is_empty() {
480+
self.batches.push(array);
481+
self.batch_entries.push(entries);
392482
}
483+
393484
Ok(())
394485
}
395486

396487
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
397-
let to_emit = emit_to.take_needed(&mut self.values);
398-
let emitted_bytes: usize = to_emit
399-
.iter()
400-
.filter_map(|opt| opt.as_ref().map(|s| s.len()))
401-
.sum();
402-
self.total_data_bytes -= emitted_bytes;
488+
let emit_groups = match emit_to {
489+
EmitTo::All => self.num_groups,
490+
EmitTo::First(n) => n,
491+
};
492+
493+
let mut to_emit = vec![None; emit_groups];
494+
495+
for (batch, entries) in self.batches.iter().zip(&self.batch_entries) {
496+
Self::append_batch_values(
497+
&mut to_emit,
498+
entries,
499+
batch,
500+
&self.delimiter,
501+
emit_groups,
502+
)?;
503+
}
504+
505+
match emit_to {
506+
EmitTo::All => self.clear_state(),
507+
EmitTo::First(_) => self.retain_after_emit(emit_groups),
508+
}
403509

404510
let result: ArrayRef = Arc::new(LargeStringArray::from(to_emit));
405511
Ok(result)
@@ -439,8 +545,17 @@ impl GroupsAccumulator for StringAggGroupsAccumulator {
439545
}
440546

441547
fn size(&self) -> usize {
442-
self.total_data_bytes
443-
+ self.values.capacity() * size_of::<Option<String>>()
548+
self.batches
549+
.iter()
550+
.map(|arr| arr.to_data().get_slice_memory_size().unwrap_or_default())
551+
.sum::<usize>()
552+
+ self.batches.capacity() * size_of::<ArrayRef>()
553+
+ self
554+
.batch_entries
555+
.iter()
556+
.map(|entries| entries.capacity() * size_of::<(u32, u32)>())
557+
.sum::<usize>()
558+
+ self.batch_entries.capacity() * size_of::<Vec<(u32, u32)>>()
444559
+ self.delimiter.capacity()
445560
+ size_of_val(self)
446561
}

0 commit comments

Comments
 (0)