Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion datafusion/functions/benches/lower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::{ArrayRef, StringArray, StringViewBuilder};
use arrow::array::{Array, ArrayRef, StringArray, StringViewBuilder};
use arrow::datatypes::{DataType, Field};
use arrow::util::bench_util::{
create_string_array_with_len, create_string_view_array_with_len,
Expand Down Expand Up @@ -195,6 +195,43 @@ fn criterion_benchmark(c: &mut Criterion) {
);
}

{
let parent_size = 65536;
let slice_len = 128;
let str_len = 32;
let parent = Arc::new(create_string_array_with_len::<i32>(
parent_size,
0.2,
str_len,
)) as ArrayRef;
let offset = (parent_size - slice_len) / 2;
let sliced = parent.slice(offset, slice_len);
let args = vec![ColumnarValue::Array(sliced)];
let arg_fields = args
.iter()
.enumerate()
.map(|(idx, arg)| {
Field::new(format!("arg_{idx}"), arg.data_type(), true).into()
})
.collect::<Vec<_>>();

c.bench_function(
&format!("lower_sliced_ascii: parent={parent_size}, slice={slice_len}, str_len={str_len}"),
|b| {
b.iter(|| {
let args_cloned = args.clone();
black_box(lower.invoke_with_args(ScalarFunctionArgs {
args: args_cloned,
arg_fields: arg_fields.clone(),
number_rows: slice_len,
return_field: Field::new("f", DataType::Utf8, true).into(),
config_options: Arc::clone(&config_options),
}))
})
},
);
}

let sizes = [4096, 8192];
let str_lens = [10, 64, 128];
let mixes = [true, false];
Expand Down
53 changes: 33 additions & 20 deletions datafusion/functions/src/string/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use arrow::array::{
Array, ArrayRef, GenericStringArray, GenericStringBuilder, NullBufferBuilder,
OffsetSizeTrait, StringViewArray, StringViewBuilder, new_null_array,
};
use arrow::buffer::{Buffer, ScalarBuffer};
use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer};
use arrow::datatypes::DataType;
use datafusion_common::Result;
use datafusion_common::cast::{as_generic_string_array, as_string_view_array};
Expand Down Expand Up @@ -390,16 +390,16 @@ where
const PRE_ALLOC_BYTES: usize = 8;

let string_array = as_generic_string_array::<O>(array)?;
let value_data = string_array.value_data();

// All values are ASCII.
if value_data.is_ascii() {
if string_array.is_ascii() {
return case_conversion_ascii_array::<O, _>(string_array, op);
}

// Values contain non-ASCII.
let item_len = string_array.len();
let capacity = string_array.value_data().len() + PRE_ALLOC_BYTES;
let offsets = string_array.value_offsets();
let start = offsets.first().unwrap().as_usize();
let end = offsets.last().unwrap().as_usize();
let capacity = (end - start) + PRE_ALLOC_BYTES;
let mut builder = GenericStringBuilder::<O>::with_capacity(item_len, capacity);

if string_array.null_count() == 0 {
Expand All @@ -413,9 +413,10 @@ where
Ok(Arc::new(builder.finish()))
}

/// All values of string_array are ASCII, and when converting case, there is no changes in the byte
/// array length. Therefore, the StringArray can be treated as a complete ASCII string for
/// case conversion, and we can reuse the offsets buffer and the nulls buffer.
/// Fast path for case conversion on an all-ASCII string array. ASCII case
/// conversion is byte-length-preserving, so we can convert the entire addressed
/// range in one call and reuse the offsets and nulls buffers — rebasing the
/// offsets when the input is a sliced array.
fn case_conversion_ascii_array<'a, O, F>(
string_array: &'a GenericStringArray<O>,
op: F,
Expand All @@ -424,21 +425,33 @@ where
O: OffsetSizeTrait,
F: Fn(&'a str) -> String,
{
let value_data = string_array.value_data();
// SAFETY: all items stored in value_data satisfy UTF8.
// ref: impl ByteArrayNativeType for str {...}
let str_values = unsafe { std::str::from_utf8_unchecked(value_data) };
let value_offsets = string_array.value_offsets();
let start = value_offsets.first().unwrap().as_usize();
let end = value_offsets.last().unwrap().as_usize();
let relevant = &string_array.value_data()[start..end];

// SAFETY: `relevant` is a subslice of the string array's value buffer,
// which is valid UTF-8.
let str_values = unsafe { std::str::from_utf8_unchecked(relevant) };

// conversion
let converted_values = op(str_values);
assert_eq!(converted_values.len(), str_values.len());
let bytes = converted_values.into_bytes();
debug_assert_eq!(converted_values.len(), str_values.len());
let values = Buffer::from_vec(converted_values.into_bytes());

// Shift offsets from `start`-based to 0-based so they index into `values`.
let offsets = if start == 0 {
string_array.offsets().clone()
} else {
let s = O::usize_as(start);
let rebased: Vec<O> = value_offsets.iter().map(|&o| o - s).collect();
// SAFETY: subtracting a constant from monotonic offsets preserves
// monotonicity, and `start` is the minimum offset so no underflow.
unsafe { OffsetBuffer::new_unchecked(ScalarBuffer::from(rebased)) }
};

// build result
let values = Buffer::from_vec(bytes);
let offsets = string_array.offsets().clone();
let nulls = string_array.nulls().cloned();
// SAFETY: offsets and nulls are consistent with the input array.
// SAFETY: offsets are monotonic and in-bounds for `values`; nulls
// (if any) match the slice length.
Ok(Arc::new(unsafe {
GenericStringArray::<O>::new_unchecked(offsets, values, nulls)
}))
Expand Down
41 changes: 32 additions & 9 deletions datafusion/functions/src/string/lower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,24 @@ mod tests {
use datafusion_common::config::ConfigOptions;
use std::sync::Arc;

fn to_lower(input: ArrayRef, expected: ArrayRef) -> Result<()> {
fn invoke_lower(input: ArrayRef) -> Result<ArrayRef> {
let func = LowerFunc::new();
let arg_fields = vec![Field::new("a", input.data_type().clone(), true).into()];

let data_type = input.data_type().clone();
let args = ScalarFunctionArgs {
number_rows: input.len(),
args: vec![ColumnarValue::Array(input)],
arg_fields,
return_field: Field::new("f", expected.data_type().clone(), true).into(),
arg_fields: vec![Field::new("a", data_type.clone(), true).into()],
return_field: Field::new("f", data_type, true).into(),
config_options: Arc::new(ConfigOptions::default()),
};

let result = match func.invoke_with_args(args)? {
ColumnarValue::Array(result) => result,
match func.invoke_with_args(args)? {
ColumnarValue::Array(r) => Ok(r),
_ => unreachable!("lower"),
};
}
}

fn to_lower(input: ArrayRef, expected: ArrayRef) -> Result<()> {
let result = invoke_lower(input)?;
assert_eq!(&expected, &result);
Ok(())
}
Expand Down Expand Up @@ -207,4 +209,25 @@ mod tests {

to_lower(input, expected)
}

#[test]
fn lower_sliced_utf8() -> Result<()> {
let parent = Arc::new(StringArray::from(vec![
Some("AAAAAAAA"),
Some("HELLO"),
Some("WORLD"),
Some(""),
Some("ZZZZZZZZ"),
])) as ArrayRef;
let sliced = parent.slice(1, 3);
let result = invoke_lower(sliced)?;
let result_sa = result.as_any().downcast_ref::<StringArray>().unwrap();

let expected = StringArray::from(vec![Some("hello"), Some("world"), Some("")]);
assert_eq!(result_sa, &expected);
// The slice's addressed bytes are "HELLO" + "WORLD" = 10; the ASCII
// fast path must produce a tight output buffer (not the parent's).
assert_eq!(result_sa.value_data().len(), 10);
Ok(())
}
}
Loading