diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs index a91dd3115d879..318a5ce9e7bd3 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs @@ -23,12 +23,16 @@ use arrow::array::{Array, ArrayRef, AsArray, ByteView, GenericByteViewArray, mak use arrow::buffer::{Buffer, ScalarBuffer}; use arrow::datatypes::ByteViewType; use datafusion_common::Result; +use datafusion_common::hash_utils::RandomState; +use hashbrown::HashMap; use itertools::izip; +use std::hash::BuildHasher; use std::marker::PhantomData; use std::mem::{replace, size_of}; use std::sync::Arc; const BYTE_VIEW_MAX_BLOCK_SIZE: usize = 2 * 1024 * 1024; +const DEDUP_HASH_STATE: RandomState = RandomState::with_seed(0); /// An implementation of [`GroupColumn`] for binary view and utf8 view types. /// @@ -69,6 +73,10 @@ pub struct ByteViewGroupValueBuilder { /// Nulls nulls: MaybeNullBufferBuilder, + /// Dedup map: hash of non-inline value → view, reused across batches + /// to avoid storing duplicate string bytes in buffers. + dedup_map: HashMap, + /// phantom data so the type requires `` _phantom: PhantomData, } @@ -87,6 +95,7 @@ impl ByteViewGroupValueBuilder { completed: Vec::new(), max_block_size: BYTE_VIEW_MAX_BLOCK_SIZE, nulls: MaybeNullBufferBuilder::new(), + dedup_map: HashMap::new(), _phantom: PhantomData {}, } } @@ -115,7 +124,7 @@ impl ByteViewGroupValueBuilder { // Not null row case self.nulls.append(false); - self.do_append_val_inner(arr, row); + self.do_append_val_inner_no_dedup(arr, row); } // Don't inline to keep the code small and give LLVM the best chance of @@ -160,7 +169,13 @@ impl ByteViewGroupValueBuilder { match all_null_or_non_null { Nulls::Some => { for &row in rows { - self.append_val_inner(array, row); + if arr.is_null(row) { + self.nulls.append(true); + self.views.push(0); + } else { + self.nulls.append(false); + self.do_append_val_inner(arr, row); + } } } @@ -179,29 +194,58 @@ impl ByteViewGroupValueBuilder { } } - fn do_append_val_inner(&mut self, array: &GenericByteViewArray, row: usize) - where + /// Append a non-null value without deduplication (used in single-row path). + fn do_append_val_inner_no_dedup( + &mut self, + array: &GenericByteViewArray, + row: usize, + ) where B: ByteViewType, { let value: &[u8] = array.value(row).as_ref(); + let view = self.append_value_to_buffer(value); + self.views.push(view); + } + /// Append a non-null value with deduplication (used in vectorized path). + /// + /// For values > 12 bytes, checks the dedup map to reuse an existing view + /// and avoid copying the bytes again. + fn do_append_val_inner( + &mut self, + array: &GenericByteViewArray, + row: usize, + ) where + B: ByteViewType, + { + let value: &[u8] = array.value(row).as_ref(); let value_len = value.len(); - let view = if value_len <= 12 { - make_view(value, 0, 0) - } else { - // Ensure big enough block to hold the value firstly - self.ensure_in_progress_big_enough(value_len); - - // Append value - let buffer_index = self.completed.len(); - let offset = self.in_progress.len(); - self.in_progress.extend_from_slice(value); - make_view(value, buffer_index as u32, offset as u32) - }; + if value_len > 12 { + let hash = DEDUP_HASH_STATE.hash_one(value); + if let Some(&existing_view) = self.dedup_map.get(&hash) { + self.views.push(existing_view); + return; + } + let view = self.append_value_to_buffer(value); + self.views.push(view); + self.dedup_map.insert(hash, view); + } else { + self.views.push(make_view(value, 0, 0)); + } + } - // Append view - self.views.push(view); + /// Write value bytes to the buffer and return the view. + fn append_value_to_buffer(&mut self, value: &[u8]) -> u128 { + let value_len = value.len(); + if value_len <= 12 { + return make_view(value, 0, 0); + } + self.ensure_in_progress_big_enough(value_len); + let buffer_index = self.completed.len(); + let offset = self.in_progress.len(); + self.in_progress.extend_from_slice(value); + make_view(value, buffer_index as u32, offset as u32) } fn ensure_in_progress_big_enough(&mut self, value_len: usize) { @@ -344,6 +388,9 @@ impl ByteViewGroupValueBuilder { } // The `n < len` case + // Clear dedup map since buffer indices will shift + self.dedup_map.clear(); + // Take n for nulls let null_buffer = self.nulls.take_n(n); @@ -567,6 +614,7 @@ impl GroupColumn for ByteViewGroupValueBuilder { + self.views.capacity() * size_of::() + self.in_progress.capacity() * size_of::() + buffers_size + + self.dedup_map.capacity() * (size_of::() + size_of::()) + size_of::() } @@ -834,6 +882,65 @@ mod tests { assert!(equal_to_results[11]); } + #[test] + fn test_byte_view_vectorized_append_dedup() { + // Verify that vectorized_append deduplicates non-inline values + // (values > 12 bytes share buffer storage) within and across batches + let mut builder = + ByteViewGroupValueBuilder::::new().with_max_block_size(1024); + + let long_str = "this string is quite long and repeated"; + let batch1 = Arc::new(StringViewArray::from(vec![ + Some(long_str), + Some("short"), + Some(long_str), + Some(long_str), + Some("another long string value here!"), + Some("another long string value here!"), + ])) as ArrayRef; + + builder + .vectorized_append(&batch1, &[0, 1, 2, 3, 4, 5]) + .unwrap(); + + // Each unique long string stored once + let expected_buffer_len = long_str.len() + "another long string value here!".len(); + assert_eq!(builder.in_progress.len(), expected_buffer_len); + assert_eq!(builder.views.len(), 6); + + // Second batch with the same long strings — should not grow the buffer + let batch2 = Arc::new(StringViewArray::from(vec![ + Some(long_str), + Some("another long string value here!"), + Some("a brand new long string value!"), + ])) as ArrayRef; + + builder + .vectorized_append(&batch2, &[0, 1, 2]) + .unwrap(); + + // Only the new string should have been added to the buffer + let expected_buffer_len = + expected_buffer_len + "a brand new long string value!".len(); + assert_eq!(builder.in_progress.len(), expected_buffer_len); + assert_eq!(builder.views.len(), 9); + + // Output should still be correct + let expected = Arc::new(StringViewArray::from(vec![ + Some(long_str), + Some("short"), + Some(long_str), + Some(long_str), + Some("another long string value here!"), + Some("another long string value here!"), + Some(long_str), + Some("another long string value here!"), + Some("a brand new long string value!"), + ])) as ArrayRef; + let output = Box::new(builder).build(); + assert_eq!(&output, &expected); + } + #[test] fn test_byte_view_take_n() { // ####### Define cases and init #######