Skip to content

Commit d6fb360

Browse files
authored
perf: Optimize array_position for scalar needle (#20532)
## Which issue does this PR close? - Closes #20530 ## Rationale for this change The previous implementation of `array_position` used `compare_element_to_list` for every input row. When the needle is a scalar (quite common), we can do much better by searching over the entire flat haystack values array with a single call to `arrow_ord::cmp::not_distinct`. We can then iterate over the resulting set bits to determine per-row results. This is ~5-10x faster than the previous implementation for typical inputs. ## What changes are included in this PR? * Implement new fast path for `array_position` with scalar needle * Improve docs for `array_position` * Don't use `internal_err` to report a user-visible error ## Are these changes tested? Yes, and benchmarked. Additional tests added in a separate PR (#20531) ## Are there any user-facing changes? No.
1 parent a026e7d commit d6fb360

5 files changed

Lines changed: 497 additions & 16 deletions

File tree

datafusion/functions-nested/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,3 +97,7 @@ name = "array_repeat"
9797
[[bench]]
9898
harness = false
9999
name = "array_set_ops"
100+
101+
[[bench]]
102+
harness = false
103+
name = "array_position"
Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::array::{ArrayRef, Int64Array, ListArray};
19+
use arrow::buffer::OffsetBuffer;
20+
use arrow::datatypes::{DataType, Field};
21+
use criterion::{
22+
criterion_group, criterion_main, {BenchmarkId, Criterion},
23+
};
24+
use datafusion_common::ScalarValue;
25+
use datafusion_common::config::ConfigOptions;
26+
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
27+
use datafusion_functions_nested::position::ArrayPosition;
28+
use rand::Rng;
29+
use rand::SeedableRng;
30+
use rand::rngs::StdRng;
31+
use std::hint::black_box;
32+
use std::sync::Arc;
33+
34+
const NUM_ROWS: usize = 10000;
35+
const SEED: u64 = 42;
36+
const NULL_DENSITY: f64 = 0.1;
37+
const SENTINEL_NEEDLE: i64 = -1;
38+
39+
fn criterion_benchmark(c: &mut Criterion) {
40+
for size in [10, 100, 500] {
41+
bench_array_position(c, size);
42+
}
43+
}
44+
45+
fn bench_array_position(c: &mut Criterion, array_size: usize) {
46+
let mut group = c.benchmark_group("array_position_i64");
47+
let haystack_found_once = create_haystack_with_sentinel(
48+
NUM_ROWS,
49+
array_size,
50+
NULL_DENSITY,
51+
SENTINEL_NEEDLE,
52+
0,
53+
);
54+
let haystack_found_many = create_haystack_with_sentinels(
55+
NUM_ROWS,
56+
array_size,
57+
NULL_DENSITY,
58+
SENTINEL_NEEDLE,
59+
);
60+
let haystack_not_found =
61+
create_haystack_without_sentinel(NUM_ROWS, array_size, NULL_DENSITY);
62+
let num_rows = haystack_not_found.len();
63+
let arg_fields: Vec<Arc<Field>> = vec![
64+
Field::new("haystack", haystack_not_found.data_type().clone(), false).into(),
65+
Field::new("needle", DataType::Int64, false).into(),
66+
];
67+
let return_field: Arc<Field> = Field::new("result", DataType::UInt64, true).into();
68+
let config_options = Arc::new(ConfigOptions::default());
69+
let needle = ScalarValue::Int64(Some(SENTINEL_NEEDLE));
70+
71+
// Benchmark: one match per row.
72+
let args_found_once = vec![
73+
ColumnarValue::Array(haystack_found_once.clone()),
74+
ColumnarValue::Scalar(needle.clone()),
75+
];
76+
group.bench_with_input(
77+
BenchmarkId::new("found_once", array_size),
78+
&array_size,
79+
|b, _| {
80+
let udf = ArrayPosition::new();
81+
b.iter(|| {
82+
black_box(
83+
udf.invoke_with_args(ScalarFunctionArgs {
84+
args: args_found_once.clone(),
85+
arg_fields: arg_fields.clone(),
86+
number_rows: num_rows,
87+
return_field: return_field.clone(),
88+
config_options: config_options.clone(),
89+
})
90+
.unwrap(),
91+
)
92+
})
93+
},
94+
);
95+
96+
// Benchmark: many matches per row.
97+
let args_found_many = vec![
98+
ColumnarValue::Array(haystack_found_many.clone()),
99+
ColumnarValue::Scalar(needle.clone()),
100+
];
101+
group.bench_with_input(
102+
BenchmarkId::new("found_many", array_size),
103+
&array_size,
104+
|b, _| {
105+
let udf = ArrayPosition::new();
106+
b.iter(|| {
107+
black_box(
108+
udf.invoke_with_args(ScalarFunctionArgs {
109+
args: args_found_many.clone(),
110+
arg_fields: arg_fields.clone(),
111+
number_rows: num_rows,
112+
return_field: return_field.clone(),
113+
config_options: config_options.clone(),
114+
})
115+
.unwrap(),
116+
)
117+
})
118+
},
119+
);
120+
121+
// Benchmark: needle is not found in any row.
122+
let args_not_found = vec![
123+
ColumnarValue::Array(haystack_not_found.clone()),
124+
ColumnarValue::Scalar(needle.clone()),
125+
];
126+
group.bench_with_input(
127+
BenchmarkId::new("not_found", array_size),
128+
&array_size,
129+
|b, _| {
130+
let udf = ArrayPosition::new();
131+
b.iter(|| {
132+
black_box(
133+
udf.invoke_with_args(ScalarFunctionArgs {
134+
args: args_not_found.clone(),
135+
arg_fields: arg_fields.clone(),
136+
number_rows: num_rows,
137+
return_field: return_field.clone(),
138+
config_options: config_options.clone(),
139+
})
140+
.unwrap(),
141+
)
142+
})
143+
},
144+
);
145+
146+
group.finish();
147+
}
148+
149+
fn create_haystack_without_sentinel(
150+
num_rows: usize,
151+
array_size: usize,
152+
null_density: f64,
153+
) -> ArrayRef {
154+
create_haystack_from_fn(num_rows, array_size, |_, _, rng| {
155+
random_haystack_value(rng, array_size, null_density)
156+
})
157+
}
158+
159+
fn create_haystack_with_sentinel(
160+
num_rows: usize,
161+
array_size: usize,
162+
null_density: f64,
163+
sentinel: i64,
164+
sentinel_index: usize,
165+
) -> ArrayRef {
166+
assert!(sentinel_index < array_size);
167+
168+
create_haystack_from_fn(num_rows, array_size, |_, col, rng| {
169+
if col == sentinel_index {
170+
Some(sentinel)
171+
} else {
172+
random_haystack_value(rng, array_size, null_density)
173+
}
174+
})
175+
}
176+
177+
fn create_haystack_with_sentinels(
178+
num_rows: usize,
179+
array_size: usize,
180+
null_density: f64,
181+
sentinel: i64,
182+
) -> ArrayRef {
183+
create_haystack_from_fn(num_rows, array_size, |_, col, rng| {
184+
// Place the sentinel in half the positions to create many matches per row.
185+
if col % 2 == 0 {
186+
Some(sentinel)
187+
} else {
188+
random_haystack_value(rng, array_size, null_density)
189+
}
190+
})
191+
}
192+
193+
fn create_haystack_from_fn<F>(
194+
num_rows: usize,
195+
array_size: usize,
196+
mut value_at: F,
197+
) -> ArrayRef
198+
where
199+
F: FnMut(usize, usize, &mut StdRng) -> Option<i64>,
200+
{
201+
let mut rng = StdRng::seed_from_u64(SEED);
202+
let mut values = Vec::with_capacity(num_rows * array_size);
203+
for row in 0..num_rows {
204+
for col in 0..array_size {
205+
values.push(value_at(row, col, &mut rng));
206+
}
207+
}
208+
let values = values.into_iter().collect::<Int64Array>();
209+
let offsets = (0..=num_rows)
210+
.map(|i| (i * array_size) as i32)
211+
.collect::<Vec<i32>>();
212+
213+
Arc::new(
214+
ListArray::try_new(
215+
Arc::new(Field::new("item", DataType::Int64, true)),
216+
OffsetBuffer::new(offsets.into()),
217+
Arc::new(values),
218+
None,
219+
)
220+
.unwrap(),
221+
)
222+
}
223+
224+
fn random_haystack_value(
225+
rng: &mut StdRng,
226+
array_size: usize,
227+
null_density: f64,
228+
) -> Option<i64> {
229+
if rng.random::<f64>() < null_density {
230+
None
231+
} else {
232+
Some(rng.random_range(0..array_size as i64))
233+
}
234+
}
235+
236+
criterion_group!(benches, criterion_benchmark);
237+
criterion_main!(benches);

0 commit comments

Comments
 (0)