From 773f88df80996f41c78c42064f2b70f0ac157303 Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Sun, 29 Mar 2026 22:27:16 +0800 Subject: [PATCH 1/3] optimize map_extract --- datafusion/functions-nested/Cargo.toml | 4 + .../functions-nested/benches/map_extract.rs | 281 ++++++++++++++++++ .../functions-nested/src/map_extract.rs | 132 +++++++- datafusion/sqllogictest/test_files/map.slt | 16 + 4 files changed, 423 insertions(+), 10 deletions(-) create mode 100644 datafusion/functions-nested/benches/map_extract.rs diff --git a/datafusion/functions-nested/Cargo.toml b/datafusion/functions-nested/Cargo.toml index 6e96a44fc98c4..3d44101c620ff 100644 --- a/datafusion/functions-nested/Cargo.toml +++ b/datafusion/functions-nested/Cargo.toml @@ -94,6 +94,10 @@ name = "array_slice" harness = false name = "map" +[[bench]] +harness = false +name = "map_extract" + [[bench]] harness = false name = "array_remove" diff --git a/datafusion/functions-nested/benches/map_extract.rs b/datafusion/functions-nested/benches/map_extract.rs new file mode 100644 index 0000000000000..0ee6623cd5dae --- /dev/null +++ b/datafusion/functions-nested/benches/map_extract.rs @@ -0,0 +1,281 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ + ArrayRef, BinaryArray, BinaryViewArray, Int32Array, ListArray, StringArray, + StringViewArray, +}; +use arrow::buffer::{OffsetBuffer, ScalarBuffer}; +use arrow::datatypes::Field; +use criterion::{Criterion, criterion_group, criterion_main}; +use datafusion_common::config::ConfigOptions; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs}; +use datafusion_functions_nested::map::map_udf; +use datafusion_functions_nested::map_extract::map_extract_udf; +use rand::Rng; +use rand::prelude::ThreadRng; +use std::collections::HashSet; +use std::hash::Hash; +use std::hint::black_box; +use std::sync::Arc; + +const MAP_ROWS: usize = 1000; +const MAP_KEYS_PER_ROW: usize = 1000; + +fn gen_unique_values( + rng: &mut ThreadRng, + mut make_value: impl FnMut(i32) -> T, +) -> Vec +where + T: Eq + Hash, +{ + let mut values = HashSet::with_capacity(MAP_KEYS_PER_ROW); + + while values.len() < MAP_KEYS_PER_ROW { + values.insert(make_value(rng.random_range(0..10000))); + } + + values.into_iter().collect() +} + +fn gen_repeat_values(values: &[T], repeats: usize) -> Vec { + let mut repeated = Vec::with_capacity(values.len() * repeats); + + for _ in 0..repeats { + repeated.extend_from_slice(values); + } + + repeated +} + +fn gen_utf8_values(rng: &mut ThreadRng) -> Vec { + gen_unique_values(rng, |value| value.to_string()) +} + +fn gen_binary_values(rng: &mut ThreadRng) -> Vec> { + gen_unique_values(rng, |value| value.to_le_bytes().to_vec()) +} + +fn gen_primitive_values(rng: &mut ThreadRng) -> Vec { + gen_unique_values(rng, |value| value) +} + +fn list_array(values: ArrayRef, row_count: usize, values_per_row: usize) -> ArrayRef { + let offsets = (0..=row_count) + .map(|index| (index * values_per_row) as i32) + .collect::>(); + Arc::new(ListArray::new( + Arc::new(Field::new_list_field(values.data_type().clone(), true)), + OffsetBuffer::new(ScalarBuffer::from(offsets)), + values, + None, + )) +} + +fn build_map_array(keys: ArrayRef, values: ArrayRef) -> ArrayRef { + let number_rows = keys.len(); + let keys_arg = ColumnarValue::Array(keys); + let values_arg = ColumnarValue::Array(values); + let return_type = map_udf() + .return_type(&[keys_arg.data_type(), values_arg.data_type()]) + .expect("should get return type"); + let arg_fields = vec![ + Field::new("keys", keys_arg.data_type(), true).into(), + Field::new("values", values_arg.data_type(), true).into(), + ]; + let return_field = Field::new("map", return_type, true).into(); + let config_options = Arc::new(ConfigOptions::default()); + + match map_udf() + .invoke_with_args(ScalarFunctionArgs { + args: vec![keys_arg, values_arg], + arg_fields, + number_rows, + return_field, + config_options, + }) + .expect("map should work on valid values") + { + ColumnarValue::Array(array) => array, + other => panic!("expected array result, got {other:?}"), + } +} + +fn bench_map_extract_case( + c: &mut Criterion, + name: &str, + map_array: ArrayRef, + query_keys: ArrayRef, +) { + let number_rows = map_array.len(); + let map_arg = ColumnarValue::Array(map_array); + let key_arg = ColumnarValue::Array(query_keys); + let return_type = map_extract_udf() + .return_type(&[map_arg.data_type(), key_arg.data_type()]) + .expect("should get return type"); + let arg_fields = vec![ + Field::new("map", map_arg.data_type(), true).into(), + Field::new("key", key_arg.data_type(), true).into(), + ]; + let return_field = Field::new("result", return_type, true).into(); + let config_options = Arc::new(ConfigOptions::default()); + + c.bench_function(name, |b| { + b.iter(|| { + black_box( + map_extract_udf() + .invoke_with_args(ScalarFunctionArgs { + args: vec![map_arg.clone(), key_arg.clone()], + arg_fields: arg_fields.clone(), + number_rows, + return_field: Arc::clone(&return_field), + config_options: Arc::clone(&config_options), + }) + .expect("map_extract should work on valid values"), + ); + }); + }); +} + +fn criterion_benchmark(c: &mut Criterion) { + let mut rng = rand::rng(); + let primitive_values = gen_primitive_values(&mut rng); + let utf8_values = gen_utf8_values(&mut rng); + let binary_values = gen_binary_values(&mut rng); + let values = Arc::new(Int32Array::from(gen_repeat_values( + &primitive_values, + MAP_ROWS, + ))) as ArrayRef; + let values = list_array(values, MAP_ROWS, MAP_KEYS_PER_ROW); + + let map_extract_cases = [ + ( + "map_extract_1000_utf8_found_middle", + build_map_array( + list_array( + Arc::new(StringArray::from(gen_repeat_values(&utf8_values, MAP_ROWS))) + as ArrayRef, + MAP_ROWS, + MAP_KEYS_PER_ROW, + ), + Arc::clone(&values), + ), + Arc::new(StringArray::from(vec![ + utf8_values[MAP_KEYS_PER_ROW / 2] + .clone(); + MAP_ROWS + ])) as ArrayRef, + ), + ( + "map_extract_1000_utf8_found_last", + build_map_array( + list_array( + Arc::new(StringArray::from(gen_repeat_values(&utf8_values, MAP_ROWS))) + as ArrayRef, + MAP_ROWS, + MAP_KEYS_PER_ROW, + ), + Arc::clone(&values), + ), + Arc::new(StringArray::from(vec![ + utf8_values[MAP_KEYS_PER_ROW - 1] + .clone(); + MAP_ROWS + ])) as ArrayRef, + ), + ( + "map_extract_1000_binary_found_last", + build_map_array( + list_array( + Arc::new(BinaryArray::from_iter_values(gen_repeat_values( + &binary_values, + MAP_ROWS, + ))) as ArrayRef, + MAP_ROWS, + MAP_KEYS_PER_ROW, + ), + Arc::clone(&values), + ), + Arc::new(BinaryArray::from_iter_values(vec![ + binary_values[MAP_KEYS_PER_ROW - 1].clone(); + MAP_ROWS + ])) as ArrayRef, + ), + ( + "map_extract_1000_utf8_view_found_last", + build_map_array( + list_array( + Arc::new(StringViewArray::from(gen_repeat_values( + &utf8_values, + MAP_ROWS, + ))) as ArrayRef, + MAP_ROWS, + MAP_KEYS_PER_ROW, + ), + Arc::clone(&values), + ), + Arc::new(StringViewArray::from(vec![ + utf8_values[MAP_KEYS_PER_ROW - 1] + .clone(); + MAP_ROWS + ])) as ArrayRef, + ), + ( + "map_extract_1000_binary_view_found_last", + build_map_array( + list_array( + Arc::new(BinaryViewArray::from_iter_values(gen_repeat_values( + &binary_values, + MAP_ROWS, + ))) as ArrayRef, + MAP_ROWS, + MAP_KEYS_PER_ROW, + ), + Arc::clone(&values), + ), + Arc::new(BinaryViewArray::from_iter_values(vec![ + binary_values[MAP_KEYS_PER_ROW - 1].clone(); + MAP_ROWS + ])) as ArrayRef, + ), + ( + "map_extract_1000_int32_found_last", + build_map_array( + list_array( + Arc::new(Int32Array::from(gen_repeat_values( + &primitive_values, + MAP_ROWS, + ))) as ArrayRef, + MAP_ROWS, + MAP_KEYS_PER_ROW, + ), + Arc::clone(&values), + ), + Arc::new(Int32Array::from(vec![ + primitive_values[MAP_KEYS_PER_ROW - 1]; + MAP_ROWS + ])) as ArrayRef, + ), + ]; + + for (name, map_array, query_keys) in map_extract_cases { + bench_map_extract_case(c, name, map_array, query_keys); + } +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/functions-nested/src/map_extract.rs b/datafusion/functions-nested/src/map_extract.rs index 1c46bf5e81337..d618bfbcb0b75 100644 --- a/datafusion/functions-nested/src/map_extract.rs +++ b/datafusion/functions-nested/src/map_extract.rs @@ -19,10 +19,14 @@ use crate::utils::{get_map_entry_field, make_scalar_function}; use arrow::array::{ - Array, ArrayRef, Capacities, ListArray, MapArray, MutableArrayData, make_array, + Array, ArrayAccessor, ArrayRef, Capacities, ListArray, MapArray, MutableArrayData, + cast::AsArray, make_array, }; use arrow::buffer::OffsetBuffer; -use arrow::datatypes::{DataType, Field}; +use arrow::datatypes::{ + DataType, Date32Type, Date64Type, Field, Int8Type, Int16Type, Int32Type, Int64Type, + UInt8Type, UInt16Type, UInt32Type, UInt64Type, +}; use datafusion_common::utils::take_function_args; use datafusion_common::{Result, cast::as_map_array, exec_err}; use datafusion_expr::{ @@ -130,11 +134,41 @@ impl ScalarUDFImpl for MapExtract { } } +/// Fast path for key types that support direct typed value comparison. +/// +/// This avoids the generic single-element slice comparison used by +/// `general_map_extract_inner`. +fn specialized_map_extract_inner( + map_array: &MapArray, + keys: A, + query_keys: A, +) -> Result +where + A: ArrayAccessor, + A::Item: PartialEq, +{ + map_extract_with_match(map_array, move |row_index, key_index| { + query_keys.is_valid(row_index) + && keys.is_valid(key_index) + && keys.value(key_index) == query_keys.value(row_index) + }) +} + fn general_map_extract_inner( map_array: &MapArray, query_keys_array: &dyn Array, ) -> Result { - let keys = map_array.keys(); + map_extract_with_match(map_array, |row_index, key_index| { + let query_key = query_keys_array.slice(row_index, 1); + query_keys_array.is_valid(row_index) + && map_array.keys().slice(key_index, 1).as_ref() == query_key.as_ref() + }) +} + +fn map_extract_with_match( + map_array: &MapArray, + mut key_matches: impl FnMut(usize, usize) -> bool, +) -> Result { let mut offsets = vec![0_i32]; let values = map_array.values(); @@ -147,16 +181,12 @@ fn general_map_extract_inner( for (row_index, offset_window) in map_array.value_offsets().windows(2).enumerate() { let start = offset_window[0] as usize; let end = offset_window[1] as usize; - let len = end - start; - - let query_key = query_keys_array.slice(row_index, 1); - let value_index = - (0..len).find(|&i| keys.slice(start + i, 1).as_ref() == query_key.as_ref()); + (start..end).find(|&key_index| key_matches(row_index, key_index)); match value_index { Some(index) => { - mutable.extend(0, start + index, start + index + 1); + mutable.extend(0, index, index + 1); } None => { mutable.extend_nulls(1); @@ -193,5 +223,87 @@ fn map_extract_inner(args: &[ArrayRef]) -> Result { ); } - general_map_extract_inner(map_array, key_arg) + match key_type { + DataType::Int8 => specialized_map_extract_inner( + map_array, + map_array.keys().as_primitive::(), + key_arg.as_primitive::(), + ), + DataType::Int16 => specialized_map_extract_inner( + map_array, + map_array.keys().as_primitive::(), + key_arg.as_primitive::(), + ), + DataType::Int32 => specialized_map_extract_inner( + map_array, + map_array.keys().as_primitive::(), + key_arg.as_primitive::(), + ), + DataType::Int64 => specialized_map_extract_inner( + map_array, + map_array.keys().as_primitive::(), + key_arg.as_primitive::(), + ), + DataType::UInt8 => specialized_map_extract_inner( + map_array, + map_array.keys().as_primitive::(), + key_arg.as_primitive::(), + ), + DataType::UInt16 => specialized_map_extract_inner( + map_array, + map_array.keys().as_primitive::(), + key_arg.as_primitive::(), + ), + DataType::UInt32 => specialized_map_extract_inner( + map_array, + map_array.keys().as_primitive::(), + key_arg.as_primitive::(), + ), + DataType::UInt64 => specialized_map_extract_inner( + map_array, + map_array.keys().as_primitive::(), + key_arg.as_primitive::(), + ), + DataType::Date32 => specialized_map_extract_inner( + map_array, + map_array.keys().as_primitive::(), + key_arg.as_primitive::(), + ), + DataType::Date64 => specialized_map_extract_inner( + map_array, + map_array.keys().as_primitive::(), + key_arg.as_primitive::(), + ), + DataType::Utf8 => specialized_map_extract_inner( + map_array, + map_array.keys().as_string::(), + key_arg.as_string::(), + ), + DataType::LargeUtf8 => specialized_map_extract_inner( + map_array, + map_array.keys().as_string::(), + key_arg.as_string::(), + ), + DataType::Utf8View => specialized_map_extract_inner( + map_array, + map_array.keys().as_string_view(), + key_arg.as_string_view(), + ), + DataType::Binary => specialized_map_extract_inner( + map_array, + map_array.keys().as_binary::(), + key_arg.as_binary::(), + ), + DataType::LargeBinary => specialized_map_extract_inner( + map_array, + map_array.keys().as_binary::(), + key_arg.as_binary::(), + ), + DataType::BinaryView => specialized_map_extract_inner( + map_array, + map_array.keys().as_binary_view(), + key_arg.as_binary_view(), + ), + _ => general_map_extract_inner(map_array, key_arg), + } } diff --git a/datafusion/sqllogictest/test_files/map.slt b/datafusion/sqllogictest/test_files/map.slt index 62e70e6080bab..34cd80c86ecfa 100644 --- a/datafusion/sqllogictest/test_files/map.slt +++ b/datafusion/sqllogictest/test_files/map.slt @@ -642,6 +642,22 @@ select map_extract(MAP {1: 1, 2: 2, 3:3}, '1'), map_extract(MAP {1: 1, 2: 2, 3:3 ---- [1] [1] [1] [NULL] [1] +# binary and binary view keys +query ???? +select map_extract(MAP {arrow_cast('a', 'Binary'): 1, arrow_cast('b', 'Binary'): 2}, arrow_cast('b', 'Binary')), + map_extract(MAP {arrow_cast('a', 'LargeBinary'): 1, arrow_cast('b', 'LargeBinary'): 2}, arrow_cast('c', 'LargeBinary')), + map_extract(MAP {arrow_cast('a', 'BinaryView'): 1, arrow_cast('b', 'BinaryView'): 2}, arrow_cast('b', 'BinaryView')), + map_extract(MAP {arrow_cast('a', 'BinaryView'): 1, arrow_cast('b', 'BinaryView'): 2}, arrow_cast(NULL, 'BinaryView')); +---- +[2] [NULL] [2] [NULL] + +# nested datatype keys +query ?? +select map_extract(MAP {[1, 2]: 10, [3, 4]: 20}, [3, 4]), + map_extract(MAP {[1, 2]: 10, [3, 4]: 20}, [5, 6]); +---- +[20] [NULL] + # map_extract with columns query ??? select map_extract(column1, 1), map_extract(column1, 5), map_extract(column1, 7) from map_array_table_1; From 7e3177dc45e2b8b07718b612b910640edd9ac0d0 Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Tue, 7 Apr 2026 22:49:39 +0800 Subject: [PATCH 2/3] add bench case --- .../functions-nested/benches/map_extract.rs | 383 ++++++++++-------- 1 file changed, 212 insertions(+), 171 deletions(-) diff --git a/datafusion/functions-nested/benches/map_extract.rs b/datafusion/functions-nested/benches/map_extract.rs index 0ee6623cd5dae..7c890b0196915 100644 --- a/datafusion/functions-nested/benches/map_extract.rs +++ b/datafusion/functions-nested/benches/map_extract.rs @@ -19,7 +19,7 @@ use arrow::array::{ ArrayRef, BinaryArray, BinaryViewArray, Int32Array, ListArray, StringArray, StringViewArray, }; -use arrow::buffer::{OffsetBuffer, ScalarBuffer}; +use arrow::buffer::OffsetBuffer; use arrow::datatypes::Field; use criterion::{Criterion, criterion_group, criterion_main}; use datafusion_common::config::ConfigOptions; @@ -27,60 +27,39 @@ use datafusion_expr::{ColumnarValue, ScalarFunctionArgs}; use datafusion_functions_nested::map::map_udf; use datafusion_functions_nested::map_extract::map_extract_udf; use rand::Rng; -use rand::prelude::ThreadRng; -use std::collections::HashSet; -use std::hash::Hash; +use rand::SeedableRng; +use rand::rngs::StdRng; +use std::cmp::max; use std::hint::black_box; use std::sync::Arc; +const SEED: u64 = 42; const MAP_ROWS: usize = 1000; -const MAP_KEYS_PER_ROW: usize = 1000; +const MAX_MAP_LENGTHS: [usize; 2] = [8, 16]; +const HIT_MODES: [HitMode; 3] = + [HitMode::FoundEarly, HitMode::FoundRandom, HitMode::NotFound]; -fn gen_unique_values( - rng: &mut ThreadRng, - mut make_value: impl FnMut(i32) -> T, -) -> Vec -where - T: Eq + Hash, -{ - let mut values = HashSet::with_capacity(MAP_KEYS_PER_ROW); - - while values.len() < MAP_KEYS_PER_ROW { - values.insert(make_value(rng.random_range(0..10000))); - } - - values.into_iter().collect() +#[derive(Clone, Copy)] +enum HitMode { + FoundEarly, + FoundRandom, + NotFound, } -fn gen_repeat_values(values: &[T], repeats: usize) -> Vec { - let mut repeated = Vec::with_capacity(values.len() * repeats); - - for _ in 0..repeats { - repeated.extend_from_slice(values); +impl HitMode { + fn name(self) -> &'static str { + match self { + Self::FoundEarly => "found_early", + Self::FoundRandom => "found_random", + Self::NotFound => "not_found", + } } - - repeated -} - -fn gen_utf8_values(rng: &mut ThreadRng) -> Vec { - gen_unique_values(rng, |value| value.to_string()) } -fn gen_binary_values(rng: &mut ThreadRng) -> Vec> { - gen_unique_values(rng, |value| value.to_le_bytes().to_vec()) -} - -fn gen_primitive_values(rng: &mut ThreadRng) -> Vec { - gen_unique_values(rng, |value| value) -} - -fn list_array(values: ArrayRef, row_count: usize, values_per_row: usize) -> ArrayRef { - let offsets = (0..=row_count) - .map(|index| (index * values_per_row) as i32) - .collect::>(); +fn list_array(values: ArrayRef, lengths: &[usize]) -> ArrayRef { Arc::new(ListArray::new( Arc::new(Field::new_list_field(values.data_type().clone(), true)), - OffsetBuffer::new(ScalarBuffer::from(offsets)), + OffsetBuffer::from_lengths(lengths.iter().copied()), values, None, )) @@ -99,15 +78,16 @@ fn build_map_array(keys: ArrayRef, values: ArrayRef) -> ArrayRef { ]; let return_field = Field::new("map", return_type, true).into(); let config_options = Arc::new(ConfigOptions::default()); + let map_args = ScalarFunctionArgs { + args: vec![keys_arg, values_arg], + arg_fields, + number_rows, + return_field, + config_options, + }; match map_udf() - .invoke_with_args(ScalarFunctionArgs { - args: vec![keys_arg, values_arg], - arg_fields, - number_rows, - return_field, - config_options, - }) + .invoke_with_args(map_args) .expect("map should work on valid values") { ColumnarValue::Array(array) => array, @@ -119,11 +99,10 @@ fn bench_map_extract_case( c: &mut Criterion, name: &str, map_array: ArrayRef, - query_keys: ArrayRef, + key_arg: ColumnarValue, ) { let number_rows = map_array.len(); let map_arg = ColumnarValue::Array(map_array); - let key_arg = ColumnarValue::Array(query_keys); let return_type = map_extract_udf() .return_type(&[map_arg.data_type(), key_arg.data_type()]) .expect("should get return type"); @@ -151,129 +130,191 @@ fn bench_map_extract_case( }); } -fn criterion_benchmark(c: &mut Criterion) { - let mut rng = rand::rng(); - let primitive_values = gen_primitive_values(&mut rng); - let utf8_values = gen_utf8_values(&mut rng); - let binary_values = gen_binary_values(&mut rng); - let values = Arc::new(Int32Array::from(gen_repeat_values( - &primitive_values, - MAP_ROWS, - ))) as ArrayRef; - let values = list_array(values, MAP_ROWS, MAP_KEYS_PER_ROW); +fn query_index(rng: &mut StdRng, len: usize, mode: HitMode) -> Option { + match mode { + HitMode::FoundEarly => Some(rng.random_range(0..max(1, len / 5))), + HitMode::FoundRandom => Some(rng.random_range(0..len)), + HitMode::NotFound => None, + } +} - let map_extract_cases = [ - ( - "map_extract_1000_utf8_found_middle", - build_map_array( - list_array( - Arc::new(StringArray::from(gen_repeat_values(&utf8_values, MAP_ROWS))) - as ArrayRef, - MAP_ROWS, - MAP_KEYS_PER_ROW, - ), - Arc::clone(&values), - ), - Arc::new(StringArray::from(vec![ - utf8_values[MAP_KEYS_PER_ROW / 2] - .clone(); - MAP_ROWS - ])) as ArrayRef, - ), - ( - "map_extract_1000_utf8_found_last", - build_map_array( - list_array( - Arc::new(StringArray::from(gen_repeat_values(&utf8_values, MAP_ROWS))) - as ArrayRef, - MAP_ROWS, - MAP_KEYS_PER_ROW, - ), - Arc::clone(&values), - ), - Arc::new(StringArray::from(vec![ - utf8_values[MAP_KEYS_PER_ROW - 1] - .clone(); - MAP_ROWS - ])) as ArrayRef, - ), - ( - "map_extract_1000_binary_found_last", - build_map_array( - list_array( - Arc::new(BinaryArray::from_iter_values(gen_repeat_values( - &binary_values, - MAP_ROWS, - ))) as ArrayRef, - MAP_ROWS, - MAP_KEYS_PER_ROW, - ), - Arc::clone(&values), - ), - Arc::new(BinaryArray::from_iter_values(vec![ - binary_values[MAP_KEYS_PER_ROW - 1].clone(); - MAP_ROWS - ])) as ArrayRef, - ), - ( - "map_extract_1000_utf8_view_found_last", - build_map_array( - list_array( - Arc::new(StringViewArray::from(gen_repeat_values( - &utf8_values, - MAP_ROWS, - ))) as ArrayRef, - MAP_ROWS, - MAP_KEYS_PER_ROW, - ), - Arc::clone(&values), - ), - Arc::new(StringViewArray::from(vec![ - utf8_values[MAP_KEYS_PER_ROW - 1] - .clone(); - MAP_ROWS - ])) as ArrayRef, - ), - ( - "map_extract_1000_binary_view_found_last", - build_map_array( - list_array( - Arc::new(BinaryViewArray::from_iter_values(gen_repeat_values( - &binary_values, - MAP_ROWS, - ))) as ArrayRef, - MAP_ROWS, - MAP_KEYS_PER_ROW, - ), - Arc::clone(&values), - ), - Arc::new(BinaryViewArray::from_iter_values(vec![ - binary_values[MAP_KEYS_PER_ROW - 1].clone(); - MAP_ROWS - ])) as ArrayRef, +fn build_string_data( + rng: &mut StdRng, + lengths: &[usize], + mode: HitMode, +) -> (Vec, Vec) { + let mut keys = Vec::with_capacity(lengths.iter().sum()); + let mut queries = Vec::with_capacity(lengths.len()); + + for (row, len) in lengths.iter().copied().enumerate() { + let row_keys = (0..len) + .map(|index| format!("k_{row}_{index}")) + .collect::>(); + let query = match query_index(rng, len, mode) { + Some(index) => row_keys[index].clone(), + None => format!("missing_{row}"), + }; + + keys.extend(row_keys); + queries.push(query); + } + (keys, queries) +} + +fn build_i32_data( + rng: &mut StdRng, + lengths: &[usize], + mode: HitMode, +) -> (Vec, Vec) { + let mut keys = Vec::with_capacity(lengths.iter().sum()); + let mut queries = Vec::with_capacity(lengths.len()); + + for (row, len) in lengths.iter().copied().enumerate() { + let base = (row as i32) * 1_000; + let row_keys = (0..len) + .map(|index| base + index as i32) + .collect::>(); + let query = match query_index(rng, len, mode) { + Some(index) => row_keys[index], + None => base + 10_000, + }; + keys.extend(row_keys); + queries.push(query); + } + (keys, queries) +} + +fn build_binary_data( + rng: &mut StdRng, + lengths: &[usize], + mode: HitMode, +) -> (Vec>, Vec>) { + let mut keys = Vec::with_capacity(lengths.iter().sum()); + let mut queries = Vec::with_capacity(lengths.len()); + + for (row, len) in lengths.iter().copied().enumerate() { + let row_keys = (0..len) + .map(|index| format!("k_{row}_{index}").into_bytes()) + .collect::>(); + let query = match query_index(rng, len, mode) { + Some(index) => row_keys[index].clone(), + None => format!("missing_{row}").into_bytes(), + }; + + keys.extend(row_keys); + queries.push(query); + } + + (keys, queries) +} + +fn build_utf8_case( + rng: &mut StdRng, + lengths: &[usize], + mode: HitMode, +) -> (ArrayRef, ArrayRef) { + let (keys, queries) = build_string_data(rng, lengths, mode); + ( + list_array(Arc::new(StringArray::from(keys)) as ArrayRef, lengths), + Arc::new(StringArray::from(queries)) as ArrayRef, + ) +} + +fn build_utf8_view_case( + rng: &mut StdRng, + lengths: &[usize], + mode: HitMode, +) -> (ArrayRef, ArrayRef) { + let (keys, queries) = build_string_data(rng, lengths, mode); + ( + list_array(Arc::new(StringViewArray::from(keys)) as ArrayRef, lengths), + Arc::new(StringViewArray::from(queries)) as ArrayRef, + ) +} + +fn build_int32_case( + rng: &mut StdRng, + lengths: &[usize], + mode: HitMode, +) -> (ArrayRef, ArrayRef) { + let (keys, queries) = build_i32_data(rng, lengths, mode); + ( + list_array(Arc::new(Int32Array::from(keys)) as ArrayRef, lengths), + Arc::new(Int32Array::from(queries)) as ArrayRef, + ) +} + +fn build_binary_case( + rng: &mut StdRng, + lengths: &[usize], + mode: HitMode, +) -> (ArrayRef, ArrayRef) { + let (keys, queries) = build_binary_data(rng, lengths, mode); + ( + list_array( + Arc::new(BinaryArray::from_iter_values(keys)) as ArrayRef, + lengths, ), - ( - "map_extract_1000_int32_found_last", - build_map_array( - list_array( - Arc::new(Int32Array::from(gen_repeat_values( - &primitive_values, - MAP_ROWS, - ))) as ArrayRef, - MAP_ROWS, - MAP_KEYS_PER_ROW, - ), - Arc::clone(&values), - ), - Arc::new(Int32Array::from(vec![ - primitive_values[MAP_KEYS_PER_ROW - 1]; - MAP_ROWS - ])) as ArrayRef, + Arc::new(BinaryArray::from_iter_values(queries)) as ArrayRef, + ) +} + +fn build_binary_view_case( + rng: &mut StdRng, + lengths: &[usize], + mode: HitMode, +) -> (ArrayRef, ArrayRef) { + let (keys, queries) = build_binary_data(rng, lengths, mode); + ( + list_array( + Arc::new(BinaryViewArray::from_iter_values(keys)) as ArrayRef, + lengths, ), - ]; + Arc::new(BinaryViewArray::from_iter_values(queries)) as ArrayRef, + ) +} + +fn criterion_benchmark(c: &mut Criterion) { + let mut rng = StdRng::seed_from_u64(SEED); + let mut map_extract_cases = Vec::new(); + + macro_rules! add_cases { + ($type_name:literal, $case_builder:expr) => { + for max_len in MAX_MAP_LENGTHS { + let lengths = (0..MAP_ROWS) + .map(|_| rng.random_range(1..=max_len)) + .collect::>(); + let values = { + let values = (0..lengths.iter().sum::()) + .map(|value| value as i32) + .collect::>(); + list_array(Arc::new(Int32Array::from(values)) as ArrayRef, &lengths) + }; + + for mode in HIT_MODES { + let (keys, query_keys) = $case_builder(&mut rng, &lengths, mode); + map_extract_cases.push(( + format!( + "map_extract_{}_max_len{max_len}_{}", + $type_name, + mode.name() + ), + build_map_array(keys, Arc::clone(&values)), + ColumnarValue::Array(query_keys), + )); + } + } + }; + } + + add_cases!("utf8", build_utf8_case); + add_cases!("int32", build_int32_case); + add_cases!("string_view", build_utf8_view_case); + add_cases!("binary", build_binary_case); + add_cases!("binary_view", build_binary_view_case); - for (name, map_array, query_keys) in map_extract_cases { - bench_map_extract_case(c, name, map_array, query_keys); + for (name, map_array, key_arg) in map_extract_cases { + bench_map_extract_case(c, &name, map_array, key_arg); } } From 7e6eb12671c2f2982e63642d08daf64243b8ebc8 Mon Sep 17 00:00:00 2001 From: lyne7-sc <734432041@qq.com> Date: Tue, 7 Apr 2026 23:07:45 +0800 Subject: [PATCH 3/3] remove benchmark (moved to separate PR) --- datafusion/functions-nested/Cargo.toml | 4 - .../functions-nested/benches/map_extract.rs | 322 ------------------ 2 files changed, 326 deletions(-) delete mode 100644 datafusion/functions-nested/benches/map_extract.rs diff --git a/datafusion/functions-nested/Cargo.toml b/datafusion/functions-nested/Cargo.toml index 3d44101c620ff..6e96a44fc98c4 100644 --- a/datafusion/functions-nested/Cargo.toml +++ b/datafusion/functions-nested/Cargo.toml @@ -94,10 +94,6 @@ name = "array_slice" harness = false name = "map" -[[bench]] -harness = false -name = "map_extract" - [[bench]] harness = false name = "array_remove" diff --git a/datafusion/functions-nested/benches/map_extract.rs b/datafusion/functions-nested/benches/map_extract.rs deleted file mode 100644 index 7c890b0196915..0000000000000 --- a/datafusion/functions-nested/benches/map_extract.rs +++ /dev/null @@ -1,322 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow::array::{ - ArrayRef, BinaryArray, BinaryViewArray, Int32Array, ListArray, StringArray, - StringViewArray, -}; -use arrow::buffer::OffsetBuffer; -use arrow::datatypes::Field; -use criterion::{Criterion, criterion_group, criterion_main}; -use datafusion_common::config::ConfigOptions; -use datafusion_expr::{ColumnarValue, ScalarFunctionArgs}; -use datafusion_functions_nested::map::map_udf; -use datafusion_functions_nested::map_extract::map_extract_udf; -use rand::Rng; -use rand::SeedableRng; -use rand::rngs::StdRng; -use std::cmp::max; -use std::hint::black_box; -use std::sync::Arc; - -const SEED: u64 = 42; -const MAP_ROWS: usize = 1000; -const MAX_MAP_LENGTHS: [usize; 2] = [8, 16]; -const HIT_MODES: [HitMode; 3] = - [HitMode::FoundEarly, HitMode::FoundRandom, HitMode::NotFound]; - -#[derive(Clone, Copy)] -enum HitMode { - FoundEarly, - FoundRandom, - NotFound, -} - -impl HitMode { - fn name(self) -> &'static str { - match self { - Self::FoundEarly => "found_early", - Self::FoundRandom => "found_random", - Self::NotFound => "not_found", - } - } -} - -fn list_array(values: ArrayRef, lengths: &[usize]) -> ArrayRef { - Arc::new(ListArray::new( - Arc::new(Field::new_list_field(values.data_type().clone(), true)), - OffsetBuffer::from_lengths(lengths.iter().copied()), - values, - None, - )) -} - -fn build_map_array(keys: ArrayRef, values: ArrayRef) -> ArrayRef { - let number_rows = keys.len(); - let keys_arg = ColumnarValue::Array(keys); - let values_arg = ColumnarValue::Array(values); - let return_type = map_udf() - .return_type(&[keys_arg.data_type(), values_arg.data_type()]) - .expect("should get return type"); - let arg_fields = vec![ - Field::new("keys", keys_arg.data_type(), true).into(), - Field::new("values", values_arg.data_type(), true).into(), - ]; - let return_field = Field::new("map", return_type, true).into(); - let config_options = Arc::new(ConfigOptions::default()); - let map_args = ScalarFunctionArgs { - args: vec![keys_arg, values_arg], - arg_fields, - number_rows, - return_field, - config_options, - }; - - match map_udf() - .invoke_with_args(map_args) - .expect("map should work on valid values") - { - ColumnarValue::Array(array) => array, - other => panic!("expected array result, got {other:?}"), - } -} - -fn bench_map_extract_case( - c: &mut Criterion, - name: &str, - map_array: ArrayRef, - key_arg: ColumnarValue, -) { - let number_rows = map_array.len(); - let map_arg = ColumnarValue::Array(map_array); - let return_type = map_extract_udf() - .return_type(&[map_arg.data_type(), key_arg.data_type()]) - .expect("should get return type"); - let arg_fields = vec![ - Field::new("map", map_arg.data_type(), true).into(), - Field::new("key", key_arg.data_type(), true).into(), - ]; - let return_field = Field::new("result", return_type, true).into(); - let config_options = Arc::new(ConfigOptions::default()); - - c.bench_function(name, |b| { - b.iter(|| { - black_box( - map_extract_udf() - .invoke_with_args(ScalarFunctionArgs { - args: vec![map_arg.clone(), key_arg.clone()], - arg_fields: arg_fields.clone(), - number_rows, - return_field: Arc::clone(&return_field), - config_options: Arc::clone(&config_options), - }) - .expect("map_extract should work on valid values"), - ); - }); - }); -} - -fn query_index(rng: &mut StdRng, len: usize, mode: HitMode) -> Option { - match mode { - HitMode::FoundEarly => Some(rng.random_range(0..max(1, len / 5))), - HitMode::FoundRandom => Some(rng.random_range(0..len)), - HitMode::NotFound => None, - } -} - -fn build_string_data( - rng: &mut StdRng, - lengths: &[usize], - mode: HitMode, -) -> (Vec, Vec) { - let mut keys = Vec::with_capacity(lengths.iter().sum()); - let mut queries = Vec::with_capacity(lengths.len()); - - for (row, len) in lengths.iter().copied().enumerate() { - let row_keys = (0..len) - .map(|index| format!("k_{row}_{index}")) - .collect::>(); - let query = match query_index(rng, len, mode) { - Some(index) => row_keys[index].clone(), - None => format!("missing_{row}"), - }; - - keys.extend(row_keys); - queries.push(query); - } - (keys, queries) -} - -fn build_i32_data( - rng: &mut StdRng, - lengths: &[usize], - mode: HitMode, -) -> (Vec, Vec) { - let mut keys = Vec::with_capacity(lengths.iter().sum()); - let mut queries = Vec::with_capacity(lengths.len()); - - for (row, len) in lengths.iter().copied().enumerate() { - let base = (row as i32) * 1_000; - let row_keys = (0..len) - .map(|index| base + index as i32) - .collect::>(); - let query = match query_index(rng, len, mode) { - Some(index) => row_keys[index], - None => base + 10_000, - }; - keys.extend(row_keys); - queries.push(query); - } - (keys, queries) -} - -fn build_binary_data( - rng: &mut StdRng, - lengths: &[usize], - mode: HitMode, -) -> (Vec>, Vec>) { - let mut keys = Vec::with_capacity(lengths.iter().sum()); - let mut queries = Vec::with_capacity(lengths.len()); - - for (row, len) in lengths.iter().copied().enumerate() { - let row_keys = (0..len) - .map(|index| format!("k_{row}_{index}").into_bytes()) - .collect::>(); - let query = match query_index(rng, len, mode) { - Some(index) => row_keys[index].clone(), - None => format!("missing_{row}").into_bytes(), - }; - - keys.extend(row_keys); - queries.push(query); - } - - (keys, queries) -} - -fn build_utf8_case( - rng: &mut StdRng, - lengths: &[usize], - mode: HitMode, -) -> (ArrayRef, ArrayRef) { - let (keys, queries) = build_string_data(rng, lengths, mode); - ( - list_array(Arc::new(StringArray::from(keys)) as ArrayRef, lengths), - Arc::new(StringArray::from(queries)) as ArrayRef, - ) -} - -fn build_utf8_view_case( - rng: &mut StdRng, - lengths: &[usize], - mode: HitMode, -) -> (ArrayRef, ArrayRef) { - let (keys, queries) = build_string_data(rng, lengths, mode); - ( - list_array(Arc::new(StringViewArray::from(keys)) as ArrayRef, lengths), - Arc::new(StringViewArray::from(queries)) as ArrayRef, - ) -} - -fn build_int32_case( - rng: &mut StdRng, - lengths: &[usize], - mode: HitMode, -) -> (ArrayRef, ArrayRef) { - let (keys, queries) = build_i32_data(rng, lengths, mode); - ( - list_array(Arc::new(Int32Array::from(keys)) as ArrayRef, lengths), - Arc::new(Int32Array::from(queries)) as ArrayRef, - ) -} - -fn build_binary_case( - rng: &mut StdRng, - lengths: &[usize], - mode: HitMode, -) -> (ArrayRef, ArrayRef) { - let (keys, queries) = build_binary_data(rng, lengths, mode); - ( - list_array( - Arc::new(BinaryArray::from_iter_values(keys)) as ArrayRef, - lengths, - ), - Arc::new(BinaryArray::from_iter_values(queries)) as ArrayRef, - ) -} - -fn build_binary_view_case( - rng: &mut StdRng, - lengths: &[usize], - mode: HitMode, -) -> (ArrayRef, ArrayRef) { - let (keys, queries) = build_binary_data(rng, lengths, mode); - ( - list_array( - Arc::new(BinaryViewArray::from_iter_values(keys)) as ArrayRef, - lengths, - ), - Arc::new(BinaryViewArray::from_iter_values(queries)) as ArrayRef, - ) -} - -fn criterion_benchmark(c: &mut Criterion) { - let mut rng = StdRng::seed_from_u64(SEED); - let mut map_extract_cases = Vec::new(); - - macro_rules! add_cases { - ($type_name:literal, $case_builder:expr) => { - for max_len in MAX_MAP_LENGTHS { - let lengths = (0..MAP_ROWS) - .map(|_| rng.random_range(1..=max_len)) - .collect::>(); - let values = { - let values = (0..lengths.iter().sum::()) - .map(|value| value as i32) - .collect::>(); - list_array(Arc::new(Int32Array::from(values)) as ArrayRef, &lengths) - }; - - for mode in HIT_MODES { - let (keys, query_keys) = $case_builder(&mut rng, &lengths, mode); - map_extract_cases.push(( - format!( - "map_extract_{}_max_len{max_len}_{}", - $type_name, - mode.name() - ), - build_map_array(keys, Arc::clone(&values)), - ColumnarValue::Array(query_keys), - )); - } - } - }; - } - - add_cases!("utf8", build_utf8_case); - add_cases!("int32", build_int32_case); - add_cases!("string_view", build_utf8_view_case); - add_cases!("binary", build_binary_case); - add_cases!("binary_view", build_binary_view_case); - - for (name, map_array, key_arg) in map_extract_cases { - bench_map_extract_case(c, &name, map_array, key_arg); - } -} - -criterion_group!(benches, criterion_benchmark); -criterion_main!(benches);