Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@ use arrow::array::{Array, ArrayRef, AsArray, ByteView, GenericByteViewArray, mak
use arrow::buffer::{Buffer, ScalarBuffer};
use arrow::datatypes::ByteViewType;
use datafusion_common::Result;
use datafusion_common::hash_utils::RandomState;
use hashbrown::HashMap;
use itertools::izip;
use std::hash::BuildHasher;
use std::marker::PhantomData;
use std::mem::{replace, size_of};
use std::sync::Arc;

const BYTE_VIEW_MAX_BLOCK_SIZE: usize = 2 * 1024 * 1024;
const DEDUP_HASH_STATE: RandomState = RandomState::with_seed(0);

/// An implementation of [`GroupColumn`] for binary view and utf8 view types.
///
Expand Down Expand Up @@ -69,6 +73,10 @@ pub struct ByteViewGroupValueBuilder<B: ByteViewType> {
/// Nulls
nulls: MaybeNullBufferBuilder,

/// Dedup map: hash of non-inline value → view, reused across batches
/// to avoid storing duplicate string bytes in buffers.
dedup_map: HashMap<u64, u128>,

/// phantom data so the type requires `<B>`
_phantom: PhantomData<B>,
}
Expand All @@ -87,6 +95,7 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
completed: Vec::new(),
max_block_size: BYTE_VIEW_MAX_BLOCK_SIZE,
nulls: MaybeNullBufferBuilder::new(),
dedup_map: HashMap::new(),
_phantom: PhantomData {},
}
}
Expand Down Expand Up @@ -115,7 +124,7 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {

// Not null row case
self.nulls.append(false);
self.do_append_val_inner(arr, row);
self.do_append_val_inner_no_dedup(arr, row);
}

// Don't inline to keep the code small and give LLVM the best chance of
Expand Down Expand Up @@ -160,7 +169,13 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
match all_null_or_non_null {
Nulls::Some => {
for &row in rows {
self.append_val_inner(array, row);
if arr.is_null(row) {
self.nulls.append(true);
self.views.push(0);
} else {
self.nulls.append(false);
self.do_append_val_inner(arr, row);
}
}
}

Expand All @@ -179,29 +194,58 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
}
}

fn do_append_val_inner(&mut self, array: &GenericByteViewArray<B>, row: usize)
where
/// Append a non-null value without deduplication (used in single-row path).
fn do_append_val_inner_no_dedup(
&mut self,
array: &GenericByteViewArray<B>,
row: usize,
) where
B: ByteViewType,
{
let value: &[u8] = array.value(row).as_ref();
let view = self.append_value_to_buffer(value);
self.views.push(view);
}

/// Append a non-null value with deduplication (used in vectorized path).
///
/// For values > 12 bytes, checks the dedup map to reuse an existing view
/// and avoid copying the bytes again.
fn do_append_val_inner(
&mut self,
array: &GenericByteViewArray<B>,
row: usize,
) where
B: ByteViewType,
{
let value: &[u8] = array.value(row).as_ref();
let value_len = value.len();
let view = if value_len <= 12 {
make_view(value, 0, 0)
} else {
// Ensure big enough block to hold the value firstly
self.ensure_in_progress_big_enough(value_len);

// Append value
let buffer_index = self.completed.len();
let offset = self.in_progress.len();
self.in_progress.extend_from_slice(value);

make_view(value, buffer_index as u32, offset as u32)
};
if value_len > 12 {
let hash = DEDUP_HASH_STATE.hash_one(value);
if let Some(&existing_view) = self.dedup_map.get(&hash) {
self.views.push(existing_view);
return;
}
let view = self.append_value_to_buffer(value);
self.views.push(view);
self.dedup_map.insert(hash, view);
} else {
self.views.push(make_view(value, 0, 0));
}
}

// Append view
self.views.push(view);
/// Write value bytes to the buffer and return the view.
fn append_value_to_buffer(&mut self, value: &[u8]) -> u128 {
let value_len = value.len();
if value_len <= 12 {
return make_view(value, 0, 0);
}
self.ensure_in_progress_big_enough(value_len);
let buffer_index = self.completed.len();
let offset = self.in_progress.len();
self.in_progress.extend_from_slice(value);
make_view(value, buffer_index as u32, offset as u32)
}

fn ensure_in_progress_big_enough(&mut self, value_len: usize) {
Expand Down Expand Up @@ -344,6 +388,9 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
}

// The `n < len` case
// Clear dedup map since buffer indices will shift
self.dedup_map.clear();

// Take n for nulls
let null_buffer = self.nulls.take_n(n);

Expand Down Expand Up @@ -567,6 +614,7 @@ impl<B: ByteViewType> GroupColumn for ByteViewGroupValueBuilder<B> {
+ self.views.capacity() * size_of::<u128>()
+ self.in_progress.capacity() * size_of::<u8>()
+ buffers_size
+ self.dedup_map.capacity() * (size_of::<u64>() + size_of::<u128>())
+ size_of::<Self>()
}

Expand Down Expand Up @@ -834,6 +882,65 @@ mod tests {
assert!(equal_to_results[11]);
}

#[test]
fn test_byte_view_vectorized_append_dedup() {
// Verify that vectorized_append deduplicates non-inline values
// (values > 12 bytes share buffer storage) within and across batches
let mut builder =
ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(1024);

let long_str = "this string is quite long and repeated";
let batch1 = Arc::new(StringViewArray::from(vec![
Some(long_str),
Some("short"),
Some(long_str),
Some(long_str),
Some("another long string value here!"),
Some("another long string value here!"),
])) as ArrayRef;

builder
.vectorized_append(&batch1, &[0, 1, 2, 3, 4, 5])
.unwrap();

// Each unique long string stored once
let expected_buffer_len = long_str.len() + "another long string value here!".len();
assert_eq!(builder.in_progress.len(), expected_buffer_len);
assert_eq!(builder.views.len(), 6);

// Second batch with the same long strings — should not grow the buffer
let batch2 = Arc::new(StringViewArray::from(vec![
Some(long_str),
Some("another long string value here!"),
Some("a brand new long string value!"),
])) as ArrayRef;

builder
.vectorized_append(&batch2, &[0, 1, 2])
.unwrap();

// Only the new string should have been added to the buffer
let expected_buffer_len =
expected_buffer_len + "a brand new long string value!".len();
assert_eq!(builder.in_progress.len(), expected_buffer_len);
assert_eq!(builder.views.len(), 9);

// Output should still be correct
let expected = Arc::new(StringViewArray::from(vec![
Some(long_str),
Some("short"),
Some(long_str),
Some(long_str),
Some("another long string value here!"),
Some("another long string value here!"),
Some(long_str),
Some("another long string value here!"),
Some("a brand new long string value!"),
])) as ArrayRef;
let output = Box::new(builder).build();
assert_eq!(&output, &expected);
}

#[test]
fn test_byte_view_take_n() {
// ####### Define cases and init #######
Expand Down
Loading