Skip to content

Commit 1068686

Browse files
lyne7-scalamb
andauthored
perf: add fast path for uniform fill values in array_resize (#20617)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> `array_resize` currently does extra per-row work when resizing list arrays. This pr optimizes the common fast path where the fill value is uniform. ## What changes are included in this PR? - Add a fast path in `array_resize` for uniform fill values. - Precompute the maximum required growth and reuse a single fill buffer in the uniform-fill path. ### Benchmarks ``` group main optimized ----- ---- --------- array_resize_i64/grow_default_null_fill_10_to_500 11.24 3.2±0.07ms ? ?/sec 1.00 283.9±20.10µs ? ?/sec array_resize_i64/grow_uniform_fill_10_to_500 4.15 1648.0±38.12µs ? ?/sec 1.00 397.2±20.46µs ? ?/sec array_resize_i64/grow_variable_fill_10_to_500 1.00 1667.7±50.31µs ? ?/sec 1.02 1692.8±61.54µs ? ?/sec array_resize_i64/mixed_grow_shrink_1000x_100 4.83 373.0±8.91µs ? ?/sec 1.00 77.1±5.90µs ? ?/sec array_resize_i64/shrink_uniform_fill_500_to_10 1.00 8.1±0.51µs ? ?/sec 1.06 8.5±0.50µs ? ?/sec ``` <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? Yes <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? No <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent ef9a80c commit 1068686

4 files changed

Lines changed: 291 additions & 23 deletions

File tree

datafusion/functions-nested/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,3 +126,7 @@ name = "array_sort"
126126
[[bench]]
127127
harness = false
128128
name = "string_to_array"
129+
130+
[[bench]]
131+
harness = false
132+
name = "array_resize"
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::array::{ArrayRef, Int64Array, ListArray};
19+
use arrow::buffer::OffsetBuffer;
20+
use arrow::datatypes::{DataType, Field};
21+
use criterion::{
22+
BenchmarkGroup, Criterion, criterion_group, criterion_main, measurement::WallTime,
23+
};
24+
use datafusion_common::config::ConfigOptions;
25+
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
26+
use datafusion_functions_nested::resize::ArrayResize;
27+
use std::hint::black_box;
28+
use std::sync::Arc;
29+
30+
const NUM_ROWS: usize = 1_000;
31+
32+
fn criterion_benchmark(c: &mut Criterion) {
33+
let mut group = c.benchmark_group("array_resize_i64");
34+
let list_field: Arc<Field> = Field::new_list_field(DataType::Int64, true).into();
35+
let list_data_type = DataType::List(Arc::clone(&list_field));
36+
let arg_fields = vec![
37+
Field::new("array", list_data_type.clone(), true).into(),
38+
Field::new("size", DataType::Int64, false).into(),
39+
Field::new("value", DataType::Int64, true).into(),
40+
];
41+
let return_field: Arc<Field> = Field::new("result", list_data_type, true).into();
42+
let config_options = Arc::new(ConfigOptions::default());
43+
let two_arg_fields = arg_fields[..2].to_vec();
44+
45+
bench_case(
46+
&mut group,
47+
"grow_uniform_fill_10_to_500",
48+
&[
49+
ColumnarValue::Array(create_int64_list_array(NUM_ROWS, 10)),
50+
ColumnarValue::Array(repeated_int64_array(500)),
51+
ColumnarValue::Array(repeated_int64_array(7)),
52+
],
53+
&arg_fields,
54+
&return_field,
55+
&config_options,
56+
);
57+
58+
bench_case(
59+
&mut group,
60+
"shrink_uniform_fill_500_to_10",
61+
&[
62+
ColumnarValue::Array(create_int64_list_array(NUM_ROWS, 500)),
63+
ColumnarValue::Array(repeated_int64_array(10)),
64+
ColumnarValue::Array(repeated_int64_array(7)),
65+
],
66+
&arg_fields,
67+
&return_field,
68+
&config_options,
69+
);
70+
71+
bench_case(
72+
&mut group,
73+
"grow_default_null_fill_10_to_500",
74+
&[
75+
ColumnarValue::Array(create_int64_list_array(NUM_ROWS, 10)),
76+
ColumnarValue::Array(repeated_int64_array(500)),
77+
],
78+
&two_arg_fields,
79+
&return_field,
80+
&config_options,
81+
);
82+
83+
bench_case(
84+
&mut group,
85+
"grow_variable_fill_10_to_500",
86+
&[
87+
ColumnarValue::Array(create_int64_list_array(NUM_ROWS, 10)),
88+
ColumnarValue::Array(repeated_int64_array(500)),
89+
ColumnarValue::Array(distinct_fill_array()),
90+
],
91+
&arg_fields,
92+
&return_field,
93+
&config_options,
94+
);
95+
96+
bench_case(
97+
&mut group,
98+
"mixed_grow_shrink_1000x_100",
99+
&[
100+
ColumnarValue::Array(create_int64_list_array(NUM_ROWS, 100)),
101+
ColumnarValue::Array(mixed_size_array()),
102+
],
103+
&arg_fields[..2],
104+
&return_field,
105+
&config_options,
106+
);
107+
108+
group.finish();
109+
}
110+
111+
fn bench_case(
112+
group: &mut BenchmarkGroup<'_, WallTime>,
113+
name: &str,
114+
args: &[ColumnarValue],
115+
arg_fields: &[Arc<Field>],
116+
return_field: &Arc<Field>,
117+
config_options: &Arc<ConfigOptions>,
118+
) {
119+
let udf = ArrayResize::new();
120+
group.bench_function(name, |b| {
121+
b.iter(|| {
122+
black_box(
123+
udf.invoke_with_args(ScalarFunctionArgs {
124+
args: args.to_vec(),
125+
arg_fields: arg_fields.to_vec(),
126+
number_rows: NUM_ROWS,
127+
return_field: return_field.clone(),
128+
config_options: config_options.clone(),
129+
})
130+
.unwrap(),
131+
)
132+
})
133+
});
134+
}
135+
136+
fn create_int64_list_array(num_rows: usize, list_len: usize) -> ArrayRef {
137+
let values = (0..(num_rows * list_len))
138+
.map(|v| Some(v as i64))
139+
.collect::<Int64Array>();
140+
let offsets = (0..=num_rows)
141+
.map(|i| (i * list_len) as i32)
142+
.collect::<Vec<i32>>();
143+
144+
Arc::new(
145+
ListArray::try_new(
146+
Arc::new(Field::new_list_field(DataType::Int64, true)),
147+
OffsetBuffer::new(offsets.into()),
148+
Arc::new(values),
149+
None,
150+
)
151+
.unwrap(),
152+
)
153+
}
154+
155+
fn repeated_int64_array(value: i64) -> ArrayRef {
156+
Arc::new(Int64Array::from_value(value, NUM_ROWS))
157+
}
158+
159+
fn distinct_fill_array() -> ArrayRef {
160+
Arc::new(Int64Array::from_iter((0..NUM_ROWS).map(|i| Some(i as i64))))
161+
}
162+
163+
fn mixed_size_array() -> ArrayRef {
164+
Arc::new(Int64Array::from_iter(
165+
(0..NUM_ROWS).map(|i| Some(if i % 2 == 0 { 200_i64 } else { 10_i64 })),
166+
))
167+
}
168+
169+
criterion_group!(benches, criterion_benchmark);
170+
criterion_main!(benches);

datafusion/functions-nested/src/resize.rs

Lines changed: 104 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -198,24 +198,113 @@ fn general_list_resize<O: OffsetSizeTrait + TryInto<i64>>(
198198
let values = array.values();
199199
let original_data = values.to_data();
200200

201-
// create default element array
202-
let default_element = if let Some(default_element) = default_element {
203-
default_element
201+
// Track the largest per-row growth so the uniform-fill fast path can
202+
// materialize one reusable fill buffer of the required size.
203+
let mut max_extra: usize = 0;
204+
let mut output_values_len: usize = 0;
205+
for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
206+
if array.is_null(row_index) {
207+
continue;
208+
}
209+
let target_count = count_array.value(row_index).to_usize().ok_or_else(|| {
210+
internal_datafusion_err!("array_resize: failed to convert size to usize")
211+
})?;
212+
output_values_len =
213+
output_values_len.checked_add(target_count).ok_or_else(|| {
214+
internal_datafusion_err!("array_resize: output size overflow")
215+
})?;
216+
let current_len = (offset_window[1] - offset_window[0]).to_usize().unwrap();
217+
if target_count > current_len {
218+
max_extra = max_extra.max(target_count - current_len);
219+
}
220+
}
221+
222+
// The fast path is valid when at least one row grows and every row would
223+
// use the same fill value.
224+
let use_bulk_fill = max_extra > 0
225+
&& match &default_element {
226+
None => true,
227+
Some(fill_array) => {
228+
let len = fill_array.len();
229+
let null_count = fill_array.logical_null_count();
230+
231+
len <= 1
232+
|| null_count == len
233+
|| (null_count == 0 && {
234+
let first = fill_array.slice(0, 1);
235+
(1..len)
236+
.all(|i| fill_array.slice(i, 1).as_ref() == first.as_ref())
237+
})
238+
}
239+
};
240+
241+
if use_bulk_fill {
242+
// Fast path: materialize one reusable fill buffer for all grown rows.
243+
let fill_scalar = match &default_element {
244+
None => ScalarValue::try_from(&data_type)?,
245+
Some(fill_array) if fill_array.logical_null_count() == fill_array.len() => {
246+
ScalarValue::try_from(&data_type)?
247+
}
248+
Some(fill_array) => ScalarValue::try_from_array(fill_array.as_ref(), 0)?,
249+
};
250+
let fill_values = fill_scalar.to_array_of_size(max_extra)?;
251+
let default_value_data = fill_values.to_data();
252+
build_resized_list(
253+
array,
254+
count_array,
255+
field,
256+
&original_data,
257+
&default_value_data,
258+
output_values_len,
259+
|mutable, _, extra_count| mutable.extend(1, 0, extra_count),
260+
)
204261
} else {
205-
let null_scalar = ScalarValue::try_from(&data_type)?;
206-
null_scalar.to_array_of_size(original_data.len())?
207-
};
208-
let default_value_data = default_element.to_data();
262+
// Slow path: rows may need different fill values, so append from the
263+
// corresponding slot in the input fill array for each grown element.
264+
let fill_values = match default_element {
265+
Some(fill_values) => fill_values,
266+
None => {
267+
let null_scalar = ScalarValue::try_from(&data_type)?;
268+
null_scalar.to_array_of_size(original_data.len())?
269+
}
270+
};
271+
let default_value_data = fill_values.to_data();
272+
build_resized_list(
273+
array,
274+
count_array,
275+
field,
276+
&original_data,
277+
&default_value_data,
278+
output_values_len,
279+
|mutable, row_index, extra_count| {
280+
for _ in 0..extra_count {
281+
mutable.extend(1, row_index, row_index + 1);
282+
}
283+
},
284+
)
285+
}
286+
}
209287

210-
// create a mutable array to store the original data
211-
let capacity = Capacities::Array(original_data.len() + default_value_data.len());
288+
fn build_resized_list<O, F>(
289+
array: &GenericListArray<O>,
290+
count_array: &Int64Array,
291+
field: &FieldRef,
292+
original_data: &arrow::array::ArrayData,
293+
default_value_data: &arrow::array::ArrayData,
294+
output_values_len: usize,
295+
mut append_fill_values: F,
296+
) -> Result<ArrayRef>
297+
where
298+
O: OffsetSizeTrait + TryInto<i64>,
299+
F: FnMut(&mut MutableArrayData, usize, usize),
300+
{
301+
let capacity = Capacities::Array(output_values_len);
212302
let mut offsets = vec![O::usize_as(0)];
213303
let mut mutable = MutableArrayData::with_capacities(
214-
vec![&original_data, &default_value_data],
304+
vec![original_data, default_value_data],
215305
false,
216306
capacity,
217307
);
218-
219308
let mut null_builder = NullBufferBuilder::new(array.len());
220309

221310
for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
@@ -232,21 +321,13 @@ fn general_list_resize<O: OffsetSizeTrait + TryInto<i64>>(
232321
let count = O::usize_as(count);
233322
let start = offset_window[0];
234323
if start + count > offset_window[1] {
235-
let extra_count =
236-
(start + count - offset_window[1]).try_into().map_err(|_| {
237-
internal_datafusion_err!(
238-
"array_resize: failed to convert size to i64"
239-
)
240-
})?;
324+
let extra_count = (start + count - offset_window[1]).to_usize().unwrap();
241325
let end = offset_window[1];
242-
mutable.extend(0, (start).to_usize().unwrap(), (end).to_usize().unwrap());
243-
// append default element
244-
for _ in 0..extra_count {
245-
mutable.extend(1, row_index, row_index + 1);
246-
}
326+
mutable.extend(0, start.to_usize().unwrap(), end.to_usize().unwrap());
327+
append_fill_values(&mut mutable, row_index, extra_count);
247328
} else {
248329
let end = start + count;
249-
mutable.extend(0, (start).to_usize().unwrap(), (end).to_usize().unwrap());
330+
mutable.extend(0, start.to_usize().unwrap(), end.to_usize().unwrap());
250331
};
251332
offsets.push(offsets[row_index] + count);
252333
}

datafusion/sqllogictest/test_files/array/array_resize.slt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,5 +156,18 @@ NULL
156156
[51, 52, 53, 54, 55, NULL, 57, 58, 59, 60, NULL, NULL, NULL]
157157
[61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 7, 7, 7, 7, 7]
158158

159+
# array_resize columnar test #3
160+
query ?
161+
select array_resize(column1, column2, 9) from array_resize_values;
162+
----
163+
[1, NULL]
164+
[11, 12, NULL, 14, 15]
165+
[21, 22, 23, 24, NULL, 26, 27, 28]
166+
[31, 32, 33, 34, 35, 36, NULL, 38, 39, 40, 9, 9]
167+
NULL
168+
[]
169+
[51, 52, 53, 54, 55, NULL, 57, 58, 59, 60, 9, 9, 9]
170+
[61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 9, 9, 9, 9, 9]
171+
159172

160173
include ./cleanup.slt.part

0 commit comments

Comments
 (0)