Skip to content

Commit e5c69a4

Browse files
neilconwayDandandanalamb
authored
perf: Optimize array_sort() (#21083)
## Which issue does this PR close? - Closes #21005. - Closes #21041. ## Rationale for this change The previous `array_sort` implementation called the Arrow sort kernel for every row, and then used `concat` to produce the final results. This was quite inefficient. Instead, we employee three different techniques depending on the input: (1) For arrays of primitives types without null elements, we copy all values into a single `Vec`, sort each row's slice of the `Vec` in-place, and then wrap the `Vec` in a `GenericListArray`. (2) For arrays of primitives types with null elements, we use a similar approach but we need to incur some more bookkeeping to place null elements in the right place and construct the null buffer. (3) For arrays of non-primitive types, we use `RowConverter` to convert the entire input into the row format in one call, sort row indices by comparing the encoded row values, and then use a single `take()` to construct the result of the sort. Benchmarks (8192 rows, vs main): int32/5 elements: 886 µs → 57 µs (-94%) int32/20 elements: 1.64 ms → 846 µs (-48%) int32/100 elements: 4.03 ms → 3.22 ms (-20%) int32_null_elements/5: 1.17 ms → 168 µs (-86%) int32_null_elements/1000: 47.2 ms → 44.1 ms (-7%) string/5 elements: 2.12 ms → 727 µs (-66%) string/1000 elements: 405 ms → 293 ms (-28%) ## What changes are included in this PR? * New `array_sort` benchmark * Extended unit test coverage * Improve docs * Implement optimizations as described above ## Are these changes tested? No. ## Are there any user-facing changes? No. --------- Co-authored-by: Daniël Heres <danielheres@gmail.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 3bdcdf5 commit e5c69a4

5 files changed

Lines changed: 581 additions & 68 deletions

File tree

datafusion/functions-nested/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,3 +109,7 @@ name = "array_to_string"
109109
[[bench]]
110110
harness = false
111111
name = "array_position"
112+
113+
[[bench]]
114+
harness = false
115+
name = "array_sort"
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::hint::black_box;
19+
use std::sync::Arc;
20+
21+
use arrow::array::{ArrayRef, BooleanBufferBuilder, Int32Array, ListArray, StringArray};
22+
use arrow::buffer::{NullBuffer, OffsetBuffer};
23+
use arrow::datatypes::{DataType, Field};
24+
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
25+
use datafusion_common::config::ConfigOptions;
26+
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
27+
use datafusion_functions_nested::sort::ArraySort;
28+
use rand::SeedableRng;
29+
use rand::rngs::StdRng;
30+
use rand::seq::SliceRandom;
31+
32+
const SEED: u64 = 42;
33+
const NUM_ROWS: usize = 8192;
34+
35+
fn create_int32_list_array(
36+
num_rows: usize,
37+
elements_per_row: usize,
38+
with_nulls: bool,
39+
) -> ArrayRef {
40+
let mut rng = StdRng::seed_from_u64(SEED);
41+
let total_values = num_rows * elements_per_row;
42+
43+
let mut values: Vec<i32> = (0..total_values as i32).collect();
44+
values.shuffle(&mut rng);
45+
46+
let values = Arc::new(Int32Array::from(values));
47+
let offsets: Vec<i32> = (0..=num_rows)
48+
.map(|i| (i * elements_per_row) as i32)
49+
.collect();
50+
51+
let nulls = if with_nulls {
52+
// Every 10th row is null
53+
Some(NullBuffer::from(
54+
(0..num_rows).map(|i| i % 10 != 0).collect::<Vec<bool>>(),
55+
))
56+
} else {
57+
None
58+
};
59+
60+
Arc::new(ListArray::new(
61+
Arc::new(Field::new("item", DataType::Int32, true)),
62+
OffsetBuffer::new(offsets.into()),
63+
values,
64+
nulls,
65+
))
66+
}
67+
68+
/// Creates a ListArray where ~10% of elements within each row are null.
69+
fn create_int32_list_array_with_null_elements(
70+
num_rows: usize,
71+
elements_per_row: usize,
72+
) -> ArrayRef {
73+
let mut rng = StdRng::seed_from_u64(SEED);
74+
let total_values = num_rows * elements_per_row;
75+
76+
let mut values: Vec<i32> = (0..total_values as i32).collect();
77+
values.shuffle(&mut rng);
78+
79+
// ~10% of elements are null
80+
let mut validity = BooleanBufferBuilder::new(total_values);
81+
for i in 0..total_values {
82+
validity.append(i % 10 != 0);
83+
}
84+
let null_buffer = NullBuffer::from(validity.finish());
85+
86+
let values = Arc::new(Int32Array::new(values.into(), Some(null_buffer)));
87+
let offsets: Vec<i32> = (0..=num_rows)
88+
.map(|i| (i * elements_per_row) as i32)
89+
.collect();
90+
91+
Arc::new(ListArray::new(
92+
Arc::new(Field::new("item", DataType::Int32, true)),
93+
OffsetBuffer::new(offsets.into()),
94+
values,
95+
None,
96+
))
97+
}
98+
99+
fn create_string_list_array(num_rows: usize, elements_per_row: usize) -> ArrayRef {
100+
let mut rng = StdRng::seed_from_u64(SEED);
101+
let total_values = num_rows * elements_per_row;
102+
103+
let mut indices: Vec<usize> = (0..total_values).collect();
104+
indices.shuffle(&mut rng);
105+
let string_values: Vec<String> =
106+
indices.iter().map(|i| format!("value_{i:06}")).collect();
107+
let values = Arc::new(StringArray::from(string_values));
108+
109+
let offsets: Vec<i32> = (0..=num_rows)
110+
.map(|i| (i * elements_per_row) as i32)
111+
.collect();
112+
113+
Arc::new(ListArray::new(
114+
Arc::new(Field::new("item", DataType::Utf8, true)),
115+
OffsetBuffer::new(offsets.into()),
116+
values,
117+
None,
118+
))
119+
}
120+
121+
fn invoke_array_sort(udf: &ArraySort, array: &ArrayRef) -> ColumnarValue {
122+
udf.invoke_with_args(ScalarFunctionArgs {
123+
args: vec![ColumnarValue::Array(Arc::clone(array))],
124+
arg_fields: vec![Field::new("arr", array.data_type().clone(), true).into()],
125+
number_rows: array.len(),
126+
return_field: Field::new("result", array.data_type().clone(), true).into(),
127+
config_options: Arc::new(ConfigOptions::default()),
128+
})
129+
.unwrap()
130+
}
131+
132+
/// Vary elements_per_row over [5, 20, 100, 1000]: for small arrays, per-row
133+
/// overhead dominates, whereas for larger arrays the sort kernel dominates.
134+
fn bench_array_sort(c: &mut Criterion) {
135+
let mut group = c.benchmark_group("array_sort");
136+
let udf = ArraySort::new();
137+
138+
// Int32 arrays
139+
for &elements_per_row in &[5, 20, 100, 1000] {
140+
let array = create_int32_list_array(NUM_ROWS, elements_per_row, false);
141+
group.bench_with_input(
142+
BenchmarkId::new("int32", elements_per_row),
143+
&elements_per_row,
144+
|b, _| {
145+
b.iter(|| {
146+
black_box(invoke_array_sort(&udf, &array));
147+
});
148+
},
149+
);
150+
}
151+
152+
// Int32 with nulls in the outer list (10% null rows), single size
153+
{
154+
let array = create_int32_list_array(NUM_ROWS, 50, true);
155+
group.bench_function("int32_with_nulls", |b| {
156+
b.iter(|| {
157+
black_box(invoke_array_sort(&udf, &array));
158+
});
159+
});
160+
}
161+
162+
// Int32 with null elements (~10% of elements within rows are null)
163+
for &elements_per_row in &[5, 20, 100, 1000] {
164+
let array =
165+
create_int32_list_array_with_null_elements(NUM_ROWS, elements_per_row);
166+
group.bench_with_input(
167+
BenchmarkId::new("int32_null_elements", elements_per_row),
168+
&elements_per_row,
169+
|b, _| {
170+
b.iter(|| {
171+
black_box(invoke_array_sort(&udf, &array));
172+
});
173+
},
174+
);
175+
}
176+
177+
// String arrays
178+
for &elements_per_row in &[5, 20, 100, 1000] {
179+
let array = create_string_list_array(NUM_ROWS, elements_per_row);
180+
group.bench_with_input(
181+
BenchmarkId::new("string", elements_per_row),
182+
&elements_per_row,
183+
|b, _| {
184+
b.iter(|| {
185+
black_box(invoke_array_sort(&udf, &array));
186+
});
187+
},
188+
);
189+
}
190+
191+
group.finish();
192+
}
193+
194+
criterion_group!(benches, bench_array_sort);
195+
criterion_main!(benches);

0 commit comments

Comments
 (0)