Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion datafusion/functions/benches/substr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ fn criterion_benchmark(c: &mut Criterion) {

group.finish();

// Scalar start, no count, long strings
// Scalar start, no count, long strings, start near middle
let len = 128;
let mut group = c.benchmark_group("substr, scalar start, no count, long strings");
group.sampling_mode(SamplingMode::Flat);
Expand All @@ -220,6 +220,26 @@ fn criterion_benchmark(c: &mut Criterion) {

group.finish();

// Scalar start=1, no count, long strings
let len = 128;
let mut group =
c.benchmark_group("substr, scalar start=1, no count, long strings");
group.sampling_mode(SamplingMode::Flat);
group.sample_size(10);

let args = create_args_without_count::<i32>(size, len, false, true, true);
group.bench_function(
format!("substr_string_view [size={size}, strlen={len}]"),
|b| b.iter(|| black_box(invoke_substr_with_args(args.clone(), size))),
);

let args = create_args_without_count::<i32>(size, len, false, false, true);
group.bench_function(format!("substr_string [size={size}, strlen={len}]"), |b| {
b.iter(|| black_box(invoke_substr_with_args(args.clone(), size)))
});

group.finish();

// Scalar start and count, short strings
let len = 12;
let count = 6;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions/src/strings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ impl LargeStringArrayBuilder {
/// - start_offset: The start offset of the substring in the view
///
/// LLVM is apparently overly eager to inline this function into some hot loops,
/// which bloats them and regresses performance, so we disable inling for now.
/// which bloats them and regresses performance, so we disable inlining for now.
#[inline(never)]
pub fn append_view(
views_buffer: &mut Vec<u128>,
Expand Down
150 changes: 131 additions & 19 deletions datafusion/functions/src/unicode/substr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use std::sync::Arc;
use crate::strings::append_view;
use crate::utils::make_scalar_function;
use arrow::array::{
Array, ArrayRef, AsArray, Int64Array, StringArrayType, StringViewArray,
StringViewBuilder,
Array, ArrayRef, AsArray, GenericStringArray, Int64Array, OffsetSizeTrait,
StringArrayType, StringViewArray, StringViewBuilder, make_view,
};
use arrow::buffer::{NullBuffer, ScalarBuffer};
use arrow::datatypes::DataType;
Expand Down Expand Up @@ -134,11 +134,11 @@ fn substr(args: &[ArrayRef]) -> Result<ArrayRef> {
match args[0].data_type() {
DataType::Utf8 => {
let string_array = args[0].as_string::<i32>();
string_substr::<_>(string_array, &args[1..])
generic_string_substr(string_array, &args[1..])
}
DataType::LargeUtf8 => {
let string_array = args[0].as_string::<i64>();
string_substr::<_>(string_array, &args[1..])
generic_string_substr(string_array, &args[1..])
}
DataType::Utf8View => {
let string_array = args[0].as_string_view();
Expand Down Expand Up @@ -275,7 +275,7 @@ fn string_view_substr(
let start_array = as_int64_array(&args[0])?;
let count_array_opt = args.get(1).map(|a| as_int64_array(a)).transpose()?;

let enable_ascii_fast_path =
let is_ascii =
enable_ascii_fast_path(&string_view_array, start_array, count_array_opt);
Comment thread
neilconway marked this conversation as resolved.

// Combine null bitmaps from all inputs in bulk.
Expand All @@ -296,11 +296,10 @@ fn string_view_substr(
let start = start_array.value(i);
let count = count_array_opt.map(|a| a.value(i));

let (start, end) =
get_true_start_end(string, start, count, enable_ascii_fast_path)?;
let substr = &string[start..end];
let (byte_start, byte_end) = get_true_start_end(string, start, count, is_ascii)?;
let substr = &string[byte_start..byte_end];

append_view(&mut views_buf, raw_view, substr, start as u32);
append_view(&mut views_buf, raw_view, substr, byte_start as u32);
}

let views_buf = ScalarBuffer::from(views_buf);
Expand All @@ -319,15 +318,104 @@ fn string_view_substr(
}
}

fn string_substr<'a, V>(string_array: V, args: &[ArrayRef]) -> Result<ArrayRef>
where
V: StringArrayType<'a> + Copy,
{
fn values_fit_in_i32<T: OffsetSizeTrait>(string_array: &GenericStringArray<T>) -> bool {
// The Arrow spec defines StringView offset fields as signed 32-bit
// integers, so the maximum representable offset is i32::MAX.
string_array
.offsets()
.last()
.map(|offset| offset.as_usize() <= i32::MAX as usize)
.unwrap_or(true)
}

#[inline]
fn append_view_from_buffer(
views_buf: &mut Vec<u128>,
substr: &str,
byte_offset: usize,
) -> bool {
let byte_offset =
u32::try_from(byte_offset).expect("validated string buffer offset fits in i32");
let view = make_view(substr.as_bytes(), 0, byte_offset);
views_buf.push(view);
substr.len() > 12
}

#[expect(clippy::needless_range_loop)]
fn generic_string_substr<T: OffsetSizeTrait>(
Comment thread
neilconway marked this conversation as resolved.
string_array: &GenericStringArray<T>,
args: &[ArrayRef],
) -> Result<ArrayRef> {
// We'd like to return a StringViewArray that points into the input string
// array's values buffer. Since the Arrow spec defines StringView offsets
// as i32, we can't use this approach when the values buffer is >2GB, so
// fallback to copying.
if !values_fit_in_i32(string_array) {
return generic_string_substr_copy(string_array, args);
}

let start_array = as_int64_array(&args[0])?;
let count_array_opt = args.get(1).map(|a| as_int64_array(a)).transpose()?;

let enable_ascii_fast_path =
enable_ascii_fast_path(&string_array, start_array, count_array_opt);
let is_ascii = enable_ascii_fast_path(&string_array, start_array, count_array_opt);
let offsets = string_array.value_offsets();
let mut views_buf = Vec::with_capacity(string_array.len());
let mut has_out_of_line = false;

// Combine null bitmaps from all inputs in bulk.
let nulls = NullBuffer::union(
NullBuffer::union(string_array.nulls(), start_array.nulls()).as_ref(),
count_array_opt.and_then(|a| a.nulls()),
);

for i in 0..string_array.len() {
if nulls.as_ref().is_some_and(|n| n.is_null(i)) {
views_buf.push(0);
continue;
}

let string = string_array.value(i);
let source_offset = offsets[i].as_usize();
let start = start_array.value(i);
let count = count_array_opt.map(|a| a.value(i));

let (byte_start, byte_end) = get_true_start_end(string, start, count, is_ascii)?;
has_out_of_line |= append_view_from_buffer(
&mut views_buf,
&string[byte_start..byte_end],
source_offset + byte_start,
);
}

let views_buf = ScalarBuffer::from(views_buf);

// If all result strings are stored inline, we don't need to retain the
// input string array.
let data_buffers = if has_out_of_line {
vec![string_array.values().clone()]
} else {
vec![]
};

// Safety:
// (1) The blocks of the given views are all provided
// (2) Each referenced range in the source values buffer is within bounds
unsafe {
let array = StringViewArray::new_unchecked(views_buf, data_buffers, nulls);
Ok(Arc::new(array) as ArrayRef)
}
}

// Fallback for `generic_string_substr` if we can't use zerocopy because the
// input string array is too large.
fn generic_string_substr_copy<T: OffsetSizeTrait>(
string_array: &GenericStringArray<T>,
args: &[ArrayRef],
) -> Result<ArrayRef> {
let start_array = as_int64_array(&args[0])?;
let count_array_opt = args.get(1).map(|a| as_int64_array(a)).transpose()?;

let is_ascii = enable_ascii_fast_path(&string_array, start_array, count_array_opt);

// Combine null bitmaps from all inputs in bulk.
let nulls = NullBuffer::union(
Expand All @@ -347,17 +435,20 @@ where
let start = start_array.value(i);
let count = count_array_opt.map(|a| a.value(i));

let (start, end) =
get_true_start_end(string, start, count, enable_ascii_fast_path)?;
result_builder.append_value(&string[start..end]);
let (byte_start, byte_end) = get_true_start_end(string, start, count, is_ascii)?;
result_builder.append_value(&string[byte_start..byte_end]);
}

Ok(Arc::new(result_builder.finish()) as ArrayRef)
}

#[cfg(test)]
mod tests {
use arrow::array::{Array, StringViewArray};
use std::sync::Arc;

use arrow::array::{
Array, ArrayRef, AsArray, Int64Array, StringArray, StringViewArray,
};
use arrow::datatypes::DataType::Utf8View;

use datafusion_common::{Result, ScalarValue, exec_err};
Expand Down Expand Up @@ -734,4 +825,25 @@ mod tests {

Ok(())
}

#[test]
fn test_sliced_string_array_array_args() -> Result<()> {
// Use strings longer than 12 bytes so the result views are out-of-line.
let string_array = Arc::new(StringArray::from(vec![
"skipped_prefix_value",
"alphabet_long_string",
"joséésojanother_long",
])) as ArrayRef;
let string_array = string_array.slice(1, 2);
let start_array = Arc::new(Int64Array::from(vec![3, 5])) as ArrayRef;
let count_array = Arc::new(Int64Array::from(vec![15, 14])) as ArrayRef;

let result = super::substr(&[string_array, start_array, count_array])?;
let result = result.as_string_view();

assert_eq!(result.value(0), "phabet_long_str");
assert_eq!(result.value(1), "ésojanother_lo");

Ok(())
}
}
Loading