Skip to content

Commit aa9d819

Browse files
lyne7-scneilconway
andauthored
perf: optimize map validation for common key types (#20805)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #. ## Rationale for this change This PR optimizes `map` key validation for common key types. Previously, `map` validation always used the generic `ScalarValue::try_from_array` path, which is more expensive than necessary for common key types such as primitive integers, strings, and binary values. <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? - specialize `map` key validation for primitive, string, and binary key types - keep the generic fallback for unsupported key types <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? Yes. current unit tests and slts test passed. benchmark added. <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? No. <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Neil Conway <neil.conway@gmail.com>
1 parent 4d5aea4 commit aa9d819

3 files changed

Lines changed: 367 additions & 86 deletions

File tree

datafusion/functions-nested/benches/map.rs

Lines changed: 150 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use arrow::array::{Int32Array, ListArray, StringArray};
18+
use arrow::array::{
19+
ArrayRef, BinaryArray, BinaryViewArray, Int32Array, ListArray, StringArray,
20+
StringViewArray,
21+
};
1922
use arrow::buffer::{OffsetBuffer, ScalarBuffer};
20-
use arrow::datatypes::{DataType, Field};
23+
use arrow::datatypes::Field;
2124
use criterion::{Criterion, criterion_group, criterion_main};
2225
use datafusion_common::ScalarValue;
2326
use datafusion_common::config::ConfigOptions;
@@ -28,33 +31,100 @@ use datafusion_functions_nested::planner::NestedFunctionPlanner;
2831
use rand::Rng;
2932
use rand::prelude::ThreadRng;
3033
use std::collections::HashSet;
34+
use std::hash::Hash;
3135
use std::hint::black_box;
3236
use std::sync::Arc;
3337

34-
fn keys(rng: &mut ThreadRng) -> Vec<String> {
35-
let mut keys = HashSet::with_capacity(1000);
38+
const MAP_ROWS: usize = 1000;
39+
const MAP_KEYS_PER_ROW: usize = 1000;
3640

37-
while keys.len() < 1000 {
38-
keys.insert(rng.random_range(0..10000).to_string());
41+
fn gen_unique_values<T>(
42+
rng: &mut ThreadRng,
43+
mut make_value: impl FnMut(i32) -> T,
44+
) -> Vec<T>
45+
where
46+
T: Eq + Hash,
47+
{
48+
let mut values = HashSet::with_capacity(MAP_KEYS_PER_ROW);
49+
50+
while values.len() < MAP_KEYS_PER_ROW {
51+
values.insert(make_value(rng.random_range(0..10000)));
3952
}
4053

41-
keys.into_iter().collect()
54+
values.into_iter().collect()
4255
}
4356

44-
fn values(rng: &mut ThreadRng) -> Vec<i32> {
45-
let mut values = HashSet::with_capacity(1000);
57+
fn gen_repeat_values<T: Clone>(values: &[T], repeats: usize) -> Vec<T> {
58+
let mut repeated = Vec::with_capacity(values.len() * repeats);
4659

47-
while values.len() < 1000 {
48-
values.insert(rng.random_range(0..10000));
60+
for _ in 0..repeats {
61+
repeated.extend_from_slice(values);
4962
}
50-
values.into_iter().collect()
63+
64+
repeated
65+
}
66+
67+
fn gen_utf8_values(rng: &mut ThreadRng) -> Vec<String> {
68+
gen_unique_values(rng, |value| value.to_string())
69+
}
70+
71+
fn gen_binary_values(rng: &mut ThreadRng) -> Vec<Vec<u8>> {
72+
gen_unique_values(rng, |value| value.to_le_bytes().to_vec())
73+
}
74+
75+
fn gen_primitive_values(rng: &mut ThreadRng) -> Vec<i32> {
76+
gen_unique_values(rng, |value| value)
77+
}
78+
79+
fn list_array(values: ArrayRef, row_count: usize, values_per_row: usize) -> ArrayRef {
80+
let offsets = (0..=row_count)
81+
.map(|index| (index * values_per_row) as i32)
82+
.collect::<Vec<_>>();
83+
Arc::new(ListArray::new(
84+
Arc::new(Field::new_list_field(values.data_type().clone(), true)),
85+
OffsetBuffer::new(ScalarBuffer::from(offsets)),
86+
values,
87+
None,
88+
))
89+
}
90+
91+
fn bench_map_case(c: &mut Criterion, name: &str, keys: ArrayRef, values: ArrayRef) {
92+
let number_rows = keys.len();
93+
let keys = ColumnarValue::Array(keys);
94+
let values = ColumnarValue::Array(values);
95+
96+
let return_type = map_udf()
97+
.return_type(&[keys.data_type(), values.data_type()])
98+
.expect("should get return type");
99+
let arg_fields = vec![
100+
Field::new("a", keys.data_type(), true).into(),
101+
Field::new("a", values.data_type(), true).into(),
102+
];
103+
let return_field = Field::new("f", return_type, true).into();
104+
let config_options = Arc::new(ConfigOptions::default());
105+
106+
c.bench_function(name, |b| {
107+
b.iter(|| {
108+
black_box(
109+
map_udf()
110+
.invoke_with_args(ScalarFunctionArgs {
111+
args: vec![keys.clone(), values.clone()],
112+
arg_fields: arg_fields.clone(),
113+
number_rows,
114+
return_field: Arc::clone(&return_field),
115+
config_options: Arc::clone(&config_options),
116+
})
117+
.expect("map should work on valid values"),
118+
);
119+
});
120+
});
51121
}
52122

53123
fn criterion_benchmark(c: &mut Criterion) {
54124
c.bench_function("make_map_1000", |b| {
55125
let mut rng = rand::rng();
56-
let keys = keys(&mut rng);
57-
let values = values(&mut rng);
126+
let keys = gen_utf8_values(&mut rng);
127+
let values = gen_primitive_values(&mut rng);
58128
let mut buffer = Vec::new();
59129
for i in 0..1000 {
60130
buffer.push(Expr::Literal(
@@ -63,9 +133,7 @@ fn criterion_benchmark(c: &mut Criterion) {
63133
));
64134
buffer.push(Expr::Literal(ScalarValue::Int32(Some(values[i])), None));
65135
}
66-
67136
let planner = NestedFunctionPlanner {};
68-
69137
b.iter(|| {
70138
black_box(
71139
planner
@@ -75,51 +143,73 @@ fn criterion_benchmark(c: &mut Criterion) {
75143
});
76144
});
77145

78-
c.bench_function("map_1000", |b| {
79-
let mut rng = rand::rng();
80-
let field = Arc::new(Field::new_list_field(DataType::Utf8, true));
81-
let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1000]));
82-
let key_list = ListArray::new(
83-
field,
84-
offsets,
85-
Arc::new(StringArray::from(keys(&mut rng))),
86-
None,
87-
);
88-
let field = Arc::new(Field::new_list_field(DataType::Int32, true));
89-
let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1000]));
90-
let value_list = ListArray::new(
91-
field,
92-
offsets,
93-
Arc::new(Int32Array::from(values(&mut rng))),
94-
None,
95-
);
96-
let keys = ColumnarValue::Scalar(ScalarValue::List(Arc::new(key_list)));
97-
let values = ColumnarValue::Scalar(ScalarValue::List(Arc::new(value_list)));
98-
99-
let return_type = map_udf()
100-
.return_type(&[keys.data_type(), values.data_type()])
101-
.expect("should get return type");
102-
let arg_fields = vec![
103-
Field::new("a", keys.data_type(), true).into(),
104-
Field::new("a", values.data_type(), true).into(),
105-
];
106-
let return_field = Field::new("f", return_type, true).into();
107-
let config_options = Arc::new(ConfigOptions::default());
146+
let mut rng = rand::rng();
147+
let values = Arc::new(Int32Array::from(gen_repeat_values(
148+
&gen_primitive_values(&mut rng),
149+
MAP_ROWS,
150+
))) as ArrayRef;
151+
let values = list_array(values, MAP_ROWS, MAP_KEYS_PER_ROW);
152+
let map_cases = [
153+
(
154+
"map_1000_utf8",
155+
list_array(
156+
Arc::new(StringArray::from(gen_repeat_values(
157+
&gen_utf8_values(&mut rng),
158+
MAP_ROWS,
159+
))) as ArrayRef,
160+
MAP_ROWS,
161+
MAP_KEYS_PER_ROW,
162+
),
163+
),
164+
(
165+
"map_1000_binary",
166+
list_array(
167+
Arc::new(BinaryArray::from_iter_values(gen_repeat_values(
168+
&gen_binary_values(&mut rng),
169+
MAP_ROWS,
170+
))) as ArrayRef,
171+
MAP_ROWS,
172+
MAP_KEYS_PER_ROW,
173+
),
174+
),
175+
(
176+
"map_1000_utf8_view",
177+
list_array(
178+
Arc::new(StringViewArray::from(gen_repeat_values(
179+
&gen_utf8_values(&mut rng),
180+
MAP_ROWS,
181+
))) as ArrayRef,
182+
MAP_ROWS,
183+
MAP_KEYS_PER_ROW,
184+
),
185+
),
186+
(
187+
"map_1000_binary_view",
188+
list_array(
189+
Arc::new(BinaryViewArray::from_iter_values(gen_repeat_values(
190+
&gen_binary_values(&mut rng),
191+
MAP_ROWS,
192+
))) as ArrayRef,
193+
MAP_ROWS,
194+
MAP_KEYS_PER_ROW,
195+
),
196+
),
197+
(
198+
"map_1000_int32",
199+
list_array(
200+
Arc::new(Int32Array::from(gen_repeat_values(
201+
&gen_primitive_values(&mut rng),
202+
MAP_ROWS,
203+
))) as ArrayRef,
204+
MAP_ROWS,
205+
MAP_KEYS_PER_ROW,
206+
),
207+
),
208+
];
108209

109-
b.iter(|| {
110-
black_box(
111-
map_udf()
112-
.invoke_with_args(ScalarFunctionArgs {
113-
args: vec![keys.clone(), values.clone()],
114-
arg_fields: arg_fields.clone(),
115-
number_rows: 1,
116-
return_field: Arc::clone(&return_field),
117-
config_options: Arc::clone(&config_options),
118-
})
119-
.expect("map should work on valid values"),
120-
);
121-
});
122-
});
210+
for (name, keys) in map_cases {
211+
bench_map_case(c, name, keys, Arc::clone(&values));
212+
}
123213
}
124214

125215
criterion_group!(benches, criterion_benchmark);

0 commit comments

Comments
 (0)