Skip to content

Commit 4e2c0f1

Browse files
authored
perf: improve performance of array_union/array_intersect with batched row conversion (#20243)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> The current implementation of `array_union` and `array_intersect` performs `RowConverter::convert_columns()` on a per-row basis, which introduces avoidable overhead due to repeated conversions and intermediate allocations. This PR improves performance by: 1. converting all list values to rows in a batch 2. reusing hash sets across iterations 3. removing the `sorted().dedup()` pattern in favor of hash-based set operations ## What changes are included in this PR? Refactored the internal set operation implementation to use batch row conversion and a single-pass construction of result arrays. ### Benchmarks ``` group before optimized ----- ------ --------- array_intersect/high_overlap/10 2.99 1442.0±99.94µs ? ?/sec 1.00 481.6±21.45µs ? ?/sec array_intersect/high_overlap/100 1.90 9.5±0.63ms ? ?/sec 1.00 5.0±0.09ms ? ?/sec array_intersect/high_overlap/50 2.01 5.3±0.41ms ? ?/sec 1.00 2.6±0.05ms ? ?/sec array_intersect/low_overlap/10 3.47 1288.1±72.39µs ? ?/sec 1.00 371.4±14.08µs ? ?/sec array_intersect/low_overlap/100 2.35 9.2±0.43ms ? ?/sec 1.00 3.9±0.08ms ? ?/sec array_intersect/low_overlap/50 2.45 5.1±0.41ms ? ?/sec 1.00 2.1±0.07ms ? ?/sec array_union/high_overlap/10 4.01 1593.1±292.17µs ? ?/sec 1.00 396.9±13.43µs ? ?/sec array_union/high_overlap/100 2.54 9.8±0.18ms ? ?/sec 1.00 3.9±0.11ms ? ?/sec array_union/high_overlap/50 2.65 5.4±0.10ms ? ?/sec 1.00 2.0±0.07ms ? ?/sec array_union/low_overlap/10 3.74 1622.7±96.50µs ? ?/sec 1.00 434.3±17.87µs ? ?/sec array_union/low_overlap/100 2.39 10.3±0.92ms ? ?/sec 1.00 4.3±0.11ms ? ?/sec array_union/low_overlap/50 2.63 5.8±0.27ms ? ?/sec 1.00 2.2±0.11ms ? ?/sec ``` <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? Yes. Existing SQL logic tests updated to reflect new output order. <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 4. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? Yes. The output order may differ from the previous implementation. Previously, results were implicitly sorted due to the use of `sorted().dedup()`. The new implementation preserves the order of first appearance within each list. This is a user-visible behavioral change, but it is consistent with typical SQL set operation semantics, which do not guarantee a specific output order. <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 17416bf commit 4e2c0f1

4 files changed

Lines changed: 281 additions & 54 deletions

File tree

datafusion/functions-nested/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,3 +92,7 @@ name = "array_remove"
9292
[[bench]]
9393
harness = false
9494
name = "array_repeat"
95+
96+
[[bench]]
97+
harness = false
98+
name = "array_set_ops"
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#[macro_use]
19+
extern crate criterion;
20+
21+
use arrow::array::{ArrayRef, Int64Array, ListArray};
22+
use arrow::buffer::OffsetBuffer;
23+
use arrow::datatypes::{DataType, Field};
24+
use criterion::{BenchmarkId, Criterion};
25+
use datafusion_common::config::ConfigOptions;
26+
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
27+
use datafusion_functions_nested::set_ops::{ArrayIntersect, ArrayUnion};
28+
use rand::SeedableRng;
29+
use rand::prelude::SliceRandom;
30+
use rand::rngs::StdRng;
31+
use std::collections::HashSet;
32+
use std::hint::black_box;
33+
use std::sync::Arc;
34+
35+
const NUM_ROWS: usize = 1000;
36+
const ARRAY_SIZES: &[usize] = &[10, 50, 100];
37+
const SEED: u64 = 42;
38+
39+
fn criterion_benchmark(c: &mut Criterion) {
40+
bench_array_union(c);
41+
bench_array_intersect(c);
42+
}
43+
44+
fn invoke_udf(udf: &impl ScalarUDFImpl, array1: &ArrayRef, array2: &ArrayRef) {
45+
black_box(
46+
udf.invoke_with_args(ScalarFunctionArgs {
47+
args: vec![
48+
ColumnarValue::Array(array1.clone()),
49+
ColumnarValue::Array(array2.clone()),
50+
],
51+
arg_fields: vec![
52+
Field::new("arr1", array1.data_type().clone(), false).into(),
53+
Field::new("arr2", array2.data_type().clone(), false).into(),
54+
],
55+
number_rows: NUM_ROWS,
56+
return_field: Field::new("result", array1.data_type().clone(), false).into(),
57+
config_options: Arc::new(ConfigOptions::default()),
58+
})
59+
.unwrap(),
60+
);
61+
}
62+
63+
fn bench_array_union(c: &mut Criterion) {
64+
let mut group = c.benchmark_group("array_union");
65+
let udf = ArrayUnion::new();
66+
67+
for (overlap_label, overlap_ratio) in &[("high_overlap", 0.8), ("low_overlap", 0.2)] {
68+
for &array_size in ARRAY_SIZES {
69+
let (array1, array2) =
70+
create_arrays_with_overlap(NUM_ROWS, array_size, *overlap_ratio);
71+
group.bench_with_input(
72+
BenchmarkId::new(*overlap_label, array_size),
73+
&array_size,
74+
|b, _| b.iter(|| invoke_udf(&udf, &array1, &array2)),
75+
);
76+
}
77+
}
78+
79+
group.finish();
80+
}
81+
82+
fn bench_array_intersect(c: &mut Criterion) {
83+
let mut group = c.benchmark_group("array_intersect");
84+
let udf = ArrayIntersect::new();
85+
86+
for (overlap_label, overlap_ratio) in &[("high_overlap", 0.8), ("low_overlap", 0.2)] {
87+
for &array_size in ARRAY_SIZES {
88+
let (array1, array2) =
89+
create_arrays_with_overlap(NUM_ROWS, array_size, *overlap_ratio);
90+
group.bench_with_input(
91+
BenchmarkId::new(*overlap_label, array_size),
92+
&array_size,
93+
|b, _| b.iter(|| invoke_udf(&udf, &array1, &array2)),
94+
);
95+
}
96+
}
97+
98+
group.finish();
99+
}
100+
101+
fn create_arrays_with_overlap(
102+
num_rows: usize,
103+
array_size: usize,
104+
overlap_ratio: f64,
105+
) -> (ArrayRef, ArrayRef) {
106+
assert!((0.0..=1.0).contains(&overlap_ratio));
107+
let overlap_count = ((array_size as f64) * overlap_ratio).round() as usize;
108+
109+
let mut rng = StdRng::seed_from_u64(SEED);
110+
111+
let mut values1 = Vec::with_capacity(num_rows * array_size);
112+
let mut values2 = Vec::with_capacity(num_rows * array_size);
113+
114+
for row in 0..num_rows {
115+
let base = (row as i64) * (array_size as i64) * 2;
116+
117+
for i in 0..array_size {
118+
values1.push(base + i as i64);
119+
}
120+
121+
let mut positions: Vec<usize> = (0..array_size).collect();
122+
positions.shuffle(&mut rng);
123+
124+
let overlap_positions: HashSet<_> =
125+
positions[..overlap_count].iter().copied().collect();
126+
127+
for i in 0..array_size {
128+
if overlap_positions.contains(&i) {
129+
values2.push(base + i as i64);
130+
} else {
131+
values2.push(base + array_size as i64 + i as i64);
132+
}
133+
}
134+
}
135+
136+
let values1 = Int64Array::from(values1);
137+
let values2 = Int64Array::from(values2);
138+
139+
let field = Arc::new(Field::new("item", DataType::Int64, true));
140+
141+
let offsets = (0..=num_rows)
142+
.map(|i| (i * array_size) as i32)
143+
.collect::<Vec<i32>>();
144+
145+
let array1 = Arc::new(
146+
ListArray::try_new(
147+
field.clone(),
148+
OffsetBuffer::new(offsets.clone().into()),
149+
Arc::new(values1),
150+
None,
151+
)
152+
.unwrap(),
153+
);
154+
155+
let array2 = Arc::new(
156+
ListArray::try_new(
157+
field,
158+
OffsetBuffer::new(offsets.into()),
159+
Arc::new(values2),
160+
None,
161+
)
162+
.unwrap(),
163+
);
164+
165+
(array1, array2)
166+
}
167+
168+
criterion_group!(benches, criterion_benchmark);
169+
criterion_main!(benches);

datafusion/functions-nested/src/set_ops.rs

Lines changed: 102 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -184,11 +184,17 @@ impl ScalarUDFImpl for ArrayUnion {
184184
)
185185
)]
186186
#[derive(Debug, PartialEq, Eq, Hash)]
187-
pub(super) struct ArrayIntersect {
187+
pub struct ArrayIntersect {
188188
signature: Signature,
189189
aliases: Vec<String>,
190190
}
191191

192+
impl Default for ArrayIntersect {
193+
fn default() -> Self {
194+
Self::new()
195+
}
196+
}
197+
192198
impl ArrayIntersect {
193199
pub fn new() -> Self {
194200
Self {
@@ -358,69 +364,117 @@ fn generic_set_lists<OffsetSize: OffsetSizeTrait>(
358364
"{set_op:?} is not implemented for '{l:?}' and '{r:?}'"
359365
);
360366

361-
let mut offsets = vec![OffsetSize::usize_as(0)];
362-
let mut new_arrays = vec![];
367+
// Convert all values to rows in batch for performance.
363368
let converter = RowConverter::new(vec![SortField::new(l.value_type())])?;
364-
for (l_arr, r_arr) in l.iter().zip(r.iter()) {
365-
let last_offset = *offsets.last().unwrap();
369+
let rows_l = converter.convert_columns(&[Arc::clone(l.values())])?;
370+
let rows_r = converter.convert_columns(&[Arc::clone(r.values())])?;
366371

367-
let (l_values, r_values) = match (l_arr, r_arr) {
368-
(Some(l_arr), Some(r_arr)) => (
369-
converter.convert_columns(&[l_arr])?,
370-
converter.convert_columns(&[r_arr])?,
371-
),
372-
_ => {
373-
offsets.push(last_offset);
374-
continue;
375-
}
376-
};
372+
match set_op {
373+
SetOp::Union => generic_set_loop::<OffsetSize, true>(
374+
l, r, &rows_l, &rows_r, field, &converter,
375+
),
376+
SetOp::Intersect => generic_set_loop::<OffsetSize, false>(
377+
l, r, &rows_l, &rows_r, field, &converter,
378+
),
379+
}
380+
}
377381

378-
let l_iter = l_values.iter().sorted().dedup();
379-
let values_set: HashSet<_> = l_iter.clone().collect();
380-
let mut rows = if set_op == SetOp::Union {
381-
l_iter.collect()
382-
} else {
383-
vec![]
384-
};
382+
/// Inner loop for set operations, parameterized by const generic to
383+
/// avoid branching inside the hot loop.
384+
fn generic_set_loop<OffsetSize: OffsetSizeTrait, const IS_UNION: bool>(
385+
l: &GenericListArray<OffsetSize>,
386+
r: &GenericListArray<OffsetSize>,
387+
rows_l: &arrow::row::Rows,
388+
rows_r: &arrow::row::Rows,
389+
field: Arc<Field>,
390+
converter: &RowConverter,
391+
) -> Result<ArrayRef> {
392+
let l_offsets = l.value_offsets();
393+
let r_offsets = r.value_offsets();
394+
395+
let mut result_offsets = Vec::with_capacity(l.len() + 1);
396+
result_offsets.push(OffsetSize::usize_as(0));
397+
let initial_capacity = if IS_UNION {
398+
// Union can include all elements from both sides
399+
rows_l.num_rows()
400+
} else {
401+
// Intersect result is bounded by the smaller side
402+
rows_l.num_rows().min(rows_r.num_rows())
403+
};
404+
405+
let mut final_rows = Vec::with_capacity(initial_capacity);
406+
407+
// Reuse hash sets across iterations
408+
let mut seen = HashSet::new();
409+
let mut lookup_set = HashSet::new();
410+
for i in 0..l.len() {
411+
let last_offset = *result_offsets.last().unwrap();
385412

386-
for r_val in r_values.iter().sorted().dedup() {
387-
match set_op {
388-
SetOp::Union => {
389-
if !values_set.contains(&r_val) {
390-
rows.push(r_val);
391-
}
413+
if l.is_null(i) || r.is_null(i) {
414+
result_offsets.push(last_offset);
415+
continue;
416+
}
417+
418+
let l_start = l_offsets[i].as_usize();
419+
let l_end = l_offsets[i + 1].as_usize();
420+
let r_start = r_offsets[i].as_usize();
421+
let r_end = r_offsets[i + 1].as_usize();
422+
423+
seen.clear();
424+
425+
if IS_UNION {
426+
for idx in l_start..l_end {
427+
let row = rows_l.row(idx);
428+
if seen.insert(row) {
429+
final_rows.push(row);
392430
}
393-
SetOp::Intersect => {
394-
if values_set.contains(&r_val) {
395-
rows.push(r_val);
396-
}
431+
}
432+
for idx in r_start..r_end {
433+
let row = rows_r.row(idx);
434+
if seen.insert(row) {
435+
final_rows.push(row);
397436
}
398437
}
399-
}
400-
401-
offsets.push(last_offset + OffsetSize::usize_as(rows.len()));
402-
let arrays = converter.convert_rows(rows)?;
403-
let array = match arrays.first() {
404-
Some(array) => Arc::clone(array),
405-
None => {
406-
return internal_err!("{set_op}: failed to get array from rows");
438+
} else {
439+
let l_len = l_end - l_start;
440+
let r_len = r_end - r_start;
441+
442+
// Select shorter side for lookup, longer side for probing
443+
let (lookup_rows, lookup_range, probe_rows, probe_range) = if l_len < r_len {
444+
(rows_l, l_start..l_end, rows_r, r_start..r_end)
445+
} else {
446+
(rows_r, r_start..r_end, rows_l, l_start..l_end)
447+
};
448+
lookup_set.clear();
449+
lookup_set.reserve(lookup_range.len());
450+
451+
// Build lookup table
452+
for idx in lookup_range {
453+
lookup_set.insert(lookup_rows.row(idx));
407454
}
408-
};
409455

410-
new_arrays.push(array);
456+
// Probe and emit distinct intersected rows
457+
for idx in probe_range {
458+
let row = probe_rows.row(idx);
459+
if lookup_set.contains(&row) && seen.insert(row) {
460+
final_rows.push(row);
461+
}
462+
}
463+
}
464+
result_offsets.push(last_offset + OffsetSize::usize_as(seen.len()));
411465
}
412466

413-
let offsets = OffsetBuffer::new(offsets.into());
414-
let new_arrays_ref: Vec<_> = new_arrays.iter().map(|v| v.as_ref()).collect();
415-
let values = if new_arrays_ref.is_empty() {
467+
let final_values = if final_rows.is_empty() {
416468
new_empty_array(&l.value_type())
417469
} else {
418-
compute::concat(&new_arrays_ref)?
470+
let arrays = converter.convert_rows(final_rows)?;
471+
Arc::clone(&arrays[0])
419472
};
473+
420474
let arr = GenericListArray::<OffsetSize>::try_new(
421475
field,
422-
offsets,
423-
values,
476+
OffsetBuffer::new(result_offsets.into()),
477+
final_values,
424478
NullBuffer::union(l.nulls(), r.nulls()),
425479
)?;
426480
Ok(Arc::new(arr))

0 commit comments

Comments
 (0)