Skip to content

Commit 4d5aea4

Browse files
authored
perf: Optimize array_min, array_max for arrays of primitive types (#21101)
## Which issue does this PR close? - Closes #21100. ## Rationale for this change In the current implementation, we construct a `PrimitiveArray` for each row, feed it to the Arrow `min` / `max` kernel, and then collect the resulting `ScalarValue`s in a `Vec`. We then construct a final `PrimitiveArray` for the result via `ScalarValue::iter_to_array` of the `Vec`. We can do better for ListArrays of primitive types. First, we can iterate directly over the flat values buffer of the `ListArray` for the batch and compute the min/max from each row's slice directly. Second, Arrow's `min` / `max` kernels have a reasonable amount of per-call overhead; for small arrays, it is more efficient to compute the min/max ourselves via direct iteration. Benchmarks (8192 rows, arrays of int64 values, M4 Max): - no_nulls / list_size=10: 309 µs → 26.6 µs (11.6x faster) - no_nulls / list_size=100: 392 µs → 150 µs (2.6x faster) - no_nulls / list_size=1000: 1.20 ms → 951 µs (1.26x faster) - nulls / list_size=10: 385 µs → 69.0 µs (5.6x faster) - nulls / list_size=100: 790 µs → 616 µs (1.28x faster) - nulls / list_size=1000: 5.34 ms → 5.21 ms (1.02x faster) ## What changes are included in this PR? * Add benchmark for `array_max` * Expand SLT test coverage * Implement optimization ## Are these changes tested? Yes. ## Are there any user-facing changes? No.
1 parent 10fae81 commit 4d5aea4

4 files changed

Lines changed: 351 additions & 7 deletions

File tree

datafusion/functions-nested/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ rand = { workspace = true }
7070
harness = false
7171
name = "array_concat"
7272

73+
[[bench]]
74+
harness = false
75+
name = "array_min_max"
76+
7377
[[bench]]
7478
harness = false
7579
name = "array_expression"
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
20+
use arrow::array::{ArrayRef, Int64Array, ListArray};
21+
use arrow::buffer::{NullBuffer, OffsetBuffer};
22+
use arrow::datatypes::{DataType, Field};
23+
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
24+
use datafusion_common::config::ConfigOptions;
25+
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
26+
use datafusion_functions_nested::min_max::ArrayMax;
27+
use rand::rngs::StdRng;
28+
use rand::{Rng, SeedableRng};
29+
30+
const NUM_ROWS: usize = 8192;
31+
const SEED: u64 = 42;
32+
const LIST_NULL_DENSITY: f64 = 0.1;
33+
const ELEMENT_NULL_DENSITY: f64 = 0.1;
34+
35+
fn create_int64_list_array(
36+
num_rows: usize,
37+
list_size: usize,
38+
element_null_density: f64,
39+
) -> ArrayRef {
40+
let mut rng = StdRng::seed_from_u64(SEED);
41+
let total_values = num_rows * list_size;
42+
43+
if element_null_density > 0.0 {
44+
let values: Vec<Option<i64>> = (0..total_values)
45+
.map(|_| {
46+
if rng.random::<f64>() < element_null_density {
47+
None
48+
} else {
49+
Some(rng.random::<i64>() % 10_000)
50+
}
51+
})
52+
.collect();
53+
let values_array = Arc::new(Int64Array::from(values));
54+
55+
let offsets: Vec<i32> = (0..=num_rows).map(|i| (i * list_size) as i32).collect();
56+
let nulls: Vec<bool> = (0..num_rows)
57+
.map(|_| rng.random::<f64>() >= LIST_NULL_DENSITY)
58+
.collect();
59+
60+
Arc::new(ListArray::new(
61+
Arc::new(Field::new("item", DataType::Int64, true)),
62+
OffsetBuffer::new(offsets.into()),
63+
values_array,
64+
Some(NullBuffer::from(nulls)),
65+
))
66+
} else {
67+
// No element nulls — values array has no null buffer
68+
let values: Vec<i64> = (0..total_values)
69+
.map(|_| rng.random::<i64>() % 10_000)
70+
.collect();
71+
let values_array = Arc::new(Int64Array::from(values));
72+
73+
let offsets: Vec<i32> = (0..=num_rows).map(|i| (i * list_size) as i32).collect();
74+
let nulls: Vec<bool> = (0..num_rows)
75+
.map(|_| rng.random::<f64>() >= LIST_NULL_DENSITY)
76+
.collect();
77+
78+
Arc::new(ListArray::new(
79+
Arc::new(Field::new("item", DataType::Int64, false)),
80+
OffsetBuffer::new(offsets.into()),
81+
values_array,
82+
Some(NullBuffer::from(nulls)),
83+
))
84+
}
85+
}
86+
87+
fn criterion_benchmark(c: &mut Criterion) {
88+
let udf = ArrayMax::new();
89+
let config_options = Arc::new(ConfigOptions::default());
90+
91+
for list_size in [10, 100, 1000] {
92+
for (label, null_density) in [("nulls", ELEMENT_NULL_DENSITY), ("no_nulls", 0.0)]
93+
{
94+
let list_array = create_int64_list_array(NUM_ROWS, list_size, null_density);
95+
let args = vec![ColumnarValue::Array(Arc::clone(&list_array))];
96+
let arg_fields =
97+
vec![Field::new("arg_0", list_array.data_type().clone(), true).into()];
98+
let return_field: Arc<Field> = Field::new("f", DataType::Int64, true).into();
99+
100+
c.bench_with_input(
101+
BenchmarkId::new("array_max", format!("{label}/list_size={list_size}")),
102+
&list_array,
103+
|b, _| {
104+
b.iter(|| {
105+
udf.invoke_with_args(ScalarFunctionArgs {
106+
args: args.clone(),
107+
arg_fields: arg_fields.clone(),
108+
number_rows: NUM_ROWS,
109+
return_field: return_field.clone(),
110+
config_options: config_options.clone(),
111+
})
112+
.unwrap()
113+
});
114+
},
115+
);
116+
}
117+
}
118+
}
119+
120+
criterion_group!(benches, criterion_benchmark);
121+
criterion_main!(benches);

datafusion/functions-nested/src/min_max.rs

Lines changed: 111 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! [`ScalarUDFImpl`] definitions for array_max function.
18+
//! [`ScalarUDFImpl`] definitions for array_min and array_max functions.
1919
use crate::utils::make_scalar_function;
20-
use arrow::array::{ArrayRef, GenericListArray, OffsetSizeTrait};
20+
use arrow::array::{
21+
Array, ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, AsArray, GenericListArray,
22+
OffsetSizeTrait, PrimitiveBuilder, downcast_primitive,
23+
};
2124
use arrow::datatypes::DataType;
2225
use arrow::datatypes::DataType::{LargeList, List};
2326
use datafusion_common::Result;
@@ -32,6 +35,7 @@ use datafusion_functions_aggregate_common::min_max::{max_batch, min_batch};
3235
use datafusion_macros::user_doc;
3336
use itertools::Itertools;
3437
use std::any::Any;
38+
use std::sync::Arc;
3539

3640
make_udf_expr_and_func!(
3741
ArrayMax,
@@ -116,8 +120,8 @@ impl ScalarUDFImpl for ArrayMax {
116120
fn array_max_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
117121
let [array] = take_function_args("array_max", args)?;
118122
match array.data_type() {
119-
List(_) => array_min_max_helper(as_list_array(array)?, max_batch),
120-
LargeList(_) => array_min_max_helper(as_large_list_array(array)?, max_batch),
123+
List(_) => array_min_max_helper(as_list_array(array)?, false),
124+
LargeList(_) => array_min_max_helper(as_large_list_array(array)?, false),
121125
arg_type => exec_err!("array_max does not support type: {arg_type}"),
122126
}
123127
}
@@ -198,20 +202,120 @@ impl ScalarUDFImpl for ArrayMin {
198202
fn array_min_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
199203
let [array] = take_function_args("array_min", args)?;
200204
match array.data_type() {
201-
List(_) => array_min_max_helper(as_list_array(array)?, min_batch),
202-
LargeList(_) => array_min_max_helper(as_large_list_array(array)?, min_batch),
205+
List(_) => array_min_max_helper(as_list_array(array)?, true),
206+
LargeList(_) => array_min_max_helper(as_large_list_array(array)?, true),
203207
arg_type => exec_err!("array_min does not support type: {arg_type}"),
204208
}
205209
}
206210

207211
fn array_min_max_helper<O: OffsetSizeTrait>(
208212
array: &GenericListArray<O>,
209-
agg_fn: fn(&ArrayRef) -> Result<ScalarValue>,
213+
is_min: bool,
210214
) -> Result<ArrayRef> {
215+
// Try the primitive fast path first
216+
if let Some(result) = try_primitive_array_min_max(array, is_min) {
217+
return result;
218+
}
219+
220+
// Fallback: per-row ScalarValue path for non-primitive types
221+
let agg_fn = if is_min { min_batch } else { max_batch };
211222
let null_value = ScalarValue::try_from(array.value_type())?;
212223
let result_vec: Vec<ScalarValue> = array
213224
.iter()
214225
.map(|arr| arr.as_ref().map_or_else(|| Ok(null_value.clone()), agg_fn))
215226
.try_collect()?;
216227
ScalarValue::iter_to_array(result_vec)
217228
}
229+
230+
/// Dispatches to a typed primitive min/max implementation, or returns `None` if
231+
/// the element type is not a primitive.
232+
fn try_primitive_array_min_max<O: OffsetSizeTrait>(
233+
list_array: &GenericListArray<O>,
234+
is_min: bool,
235+
) -> Option<Result<ArrayRef>> {
236+
macro_rules! helper {
237+
($t:ty) => {
238+
return Some(primitive_array_min_max::<O, $t>(list_array, is_min))
239+
};
240+
}
241+
downcast_primitive! {
242+
list_array.value_type() => (helper),
243+
_ => {}
244+
}
245+
None
246+
}
247+
248+
/// Threshold to switch from direct iteration to using `min` / `max` kernel from
249+
/// `arrow::compute`. The latter has enough per-invocation overhead that direct
250+
/// iteration is faster for small lists.
251+
const ARROW_COMPUTE_THRESHOLD: usize = 32;
252+
253+
/// Computes min or max for each row of a primitive ListArray.
254+
fn primitive_array_min_max<O: OffsetSizeTrait, T: ArrowPrimitiveType>(
255+
list_array: &GenericListArray<O>,
256+
is_min: bool,
257+
) -> Result<ArrayRef> {
258+
let values_array = list_array.values().as_primitive::<T>();
259+
let values_slice = values_array.values();
260+
let values_nulls = values_array.nulls();
261+
let mut result_builder = PrimitiveBuilder::<T>::with_capacity(list_array.len())
262+
.with_data_type(values_array.data_type().clone());
263+
264+
for (row, w) in list_array.offsets().windows(2).enumerate() {
265+
let row_result = if list_array.is_null(row) {
266+
None
267+
} else {
268+
let start = w[0].as_usize();
269+
let end = w[1].as_usize();
270+
let len = end - start;
271+
272+
match len {
273+
0 => None,
274+
_ if len < ARROW_COMPUTE_THRESHOLD => {
275+
scalar_min_max::<T>(values_slice, values_nulls, start, end, is_min)
276+
}
277+
_ => {
278+
let slice = values_array.slice(start, len);
279+
if is_min {
280+
arrow::compute::min::<T>(&slice)
281+
} else {
282+
arrow::compute::max::<T>(&slice)
283+
}
284+
}
285+
}
286+
};
287+
288+
result_builder.append_option(row_result);
289+
}
290+
291+
Ok(Arc::new(result_builder.finish()) as ArrayRef)
292+
}
293+
294+
/// Computes min or max for a single list row by directly scanning a slice of
295+
/// the flat values buffer.
296+
#[inline]
297+
fn scalar_min_max<T: ArrowPrimitiveType>(
298+
values_slice: &[T::Native],
299+
values_nulls: Option<&arrow::buffer::NullBuffer>,
300+
start: usize,
301+
end: usize,
302+
is_min: bool,
303+
) -> Option<T::Native> {
304+
let mut best: Option<T::Native> = None;
305+
for (i, &val) in values_slice[start..end].iter().enumerate() {
306+
if let Some(nulls) = values_nulls
307+
&& !nulls.is_valid(start + i)
308+
{
309+
continue;
310+
}
311+
let update_best = match best {
312+
None => true,
313+
Some(current) if is_min => val.is_lt(current),
314+
Some(current) => val.is_gt(current),
315+
};
316+
if update_best {
317+
best = Some(val);
318+
}
319+
}
320+
best
321+
}

0 commit comments

Comments
 (0)