Skip to content

Commit e4df471

Browse files
refactor(in_list): extract StaticFilter trait and ArrayStaticFilter
Moves the StaticFilter trait and its generic hash-based implementation (ArrayStaticFilter) into a dedicated submodule. This is the first step toward modularizing the in_list expression code. ArrayStaticFilter uses Arrow's row comparison with hash-based lookup for O(1) membership tests. It serves as the fallback for types without specialized filter implementations (e.g., structs, lists).
1 parent 57830f5 commit e4df471

2 files changed

Lines changed: 162 additions & 136 deletions

File tree

datafusion/physical-expr/src/expressions/in_list.rs

Lines changed: 4 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
//! Implementation of `InList` expressions: [`InListExpr`]
1919
20+
mod array_filter;
21+
2022
use std::any::Any;
2123
use std::fmt::Debug;
2224
use std::hash::{Hash, Hasher};
@@ -28,27 +30,15 @@ use crate::physical_expr::physical_exprs_bag_equal;
2830
use arrow::array::*;
2931
use arrow::buffer::{BooleanBuffer, NullBuffer};
3032
use arrow::compute::kernels::boolean::{not, or_kleene};
31-
use arrow::compute::{SortOptions, take};
33+
use arrow::compute::{take, SortOptions};
3234
use arrow::datatypes::*;
33-
use arrow::util::bit_iterator::BitIndexIterator;
34-
use datafusion_common::hash_utils::with_hashes;
3535
use datafusion_common::{
3636
DFSchema, HashSet, Result, ScalarValue, assert_or_internal_err, exec_datafusion_err,
3737
exec_err,
3838
};
3939
use datafusion_expr::{ColumnarValue, expr_vec_fmt};
4040

41-
use ahash::RandomState;
42-
use datafusion_common::HashMap;
43-
use hashbrown::hash_map::RawEntryMut;
44-
45-
/// Trait for InList static filters
46-
trait StaticFilter {
47-
fn null_count(&self) -> usize;
48-
49-
/// Checks if values in `v` are contained in the filter
50-
fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray>;
51-
}
41+
use array_filter::{ArrayStaticFilter, StaticFilter};
5242

5343
/// InList
5444
pub struct InListExpr {
@@ -68,76 +58,6 @@ impl Debug for InListExpr {
6858
}
6959
}
7060

71-
/// Static filter for InList that stores the array and hash set for O(1) lookups
72-
#[derive(Debug, Clone)]
73-
struct ArrayStaticFilter {
74-
in_array: ArrayRef,
75-
state: RandomState,
76-
/// Used to provide a lookup from value to in list index
77-
///
78-
/// Note: usize::hash is not used, instead the raw entry
79-
/// API is used to store entries w.r.t their value
80-
map: HashMap<usize, (), ()>,
81-
}
82-
83-
impl StaticFilter for ArrayStaticFilter {
84-
fn null_count(&self) -> usize {
85-
self.in_array.null_count()
86-
}
87-
88-
/// Checks if values in `v` are contained in the `in_array` using this hash set for lookup.
89-
fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray> {
90-
// Null type comparisons always return null (SQL three-valued logic)
91-
if v.data_type() == &DataType::Null
92-
|| self.in_array.data_type() == &DataType::Null
93-
{
94-
let nulls = NullBuffer::new_null(v.len());
95-
return Ok(BooleanArray::new(
96-
BooleanBuffer::new_unset(v.len()),
97-
Some(nulls),
98-
));
99-
}
100-
101-
downcast_dictionary_array! {
102-
v => {
103-
let values_contains = self.contains(v.values().as_ref(), negated)?;
104-
let result = take(&values_contains, v.keys(), None)?;
105-
return Ok(downcast_array(result.as_ref()))
106-
}
107-
_ => {}
108-
}
109-
110-
let needle_nulls = v.logical_nulls();
111-
let needle_nulls = needle_nulls.as_ref();
112-
let haystack_has_nulls = self.in_array.null_count() != 0;
113-
114-
with_hashes([v], &self.state, |hashes| {
115-
let cmp = make_comparator(v, &self.in_array, SortOptions::default())?;
116-
Ok((0..v.len())
117-
.map(|i| {
118-
// SQL three-valued logic: null IN (...) is always null
119-
if needle_nulls.is_some_and(|nulls| nulls.is_null(i)) {
120-
return None;
121-
}
122-
123-
let hash = hashes[i];
124-
let contains = self
125-
.map
126-
.raw_entry()
127-
.from_hash(hash, |idx| cmp(i, *idx).is_eq())
128-
.is_some();
129-
130-
match contains {
131-
true => Some(!negated),
132-
false if haystack_has_nulls => None,
133-
false => Some(negated),
134-
}
135-
})
136-
.collect())
137-
})
138-
}
139-
}
140-
14161
fn instantiate_static_filter(
14262
in_array: ArrayRef,
14363
) -> Result<Arc<dyn StaticFilter + Send + Sync>> {
@@ -161,58 +81,6 @@ fn instantiate_static_filter(
16181
}
16282
}
16383

164-
impl ArrayStaticFilter {
165-
/// Computes a [`StaticFilter`] for the provided [`Array`] if there
166-
/// are nulls present or there are more than the configured number of
167-
/// elements.
168-
///
169-
/// Note: This is split into a separate function as higher-rank trait bounds currently
170-
/// cause type inference to misbehave
171-
fn try_new(in_array: ArrayRef) -> Result<ArrayStaticFilter> {
172-
// Null type has no natural order - return empty hash set
173-
if in_array.data_type() == &DataType::Null {
174-
return Ok(ArrayStaticFilter {
175-
in_array,
176-
state: RandomState::new(),
177-
map: HashMap::with_hasher(()),
178-
});
179-
}
180-
181-
let state = RandomState::new();
182-
let mut map: HashMap<usize, (), ()> = HashMap::with_hasher(());
183-
184-
with_hashes([&in_array], &state, |hashes| -> Result<()> {
185-
let cmp = make_comparator(&in_array, &in_array, SortOptions::default())?;
186-
187-
let insert_value = |idx| {
188-
let hash = hashes[idx];
189-
if let RawEntryMut::Vacant(v) = map
190-
.raw_entry_mut()
191-
.from_hash(hash, |x| cmp(*x, idx).is_eq())
192-
{
193-
v.insert_with_hasher(hash, idx, (), |x| hashes[*x]);
194-
}
195-
};
196-
197-
match in_array.nulls() {
198-
Some(nulls) => {
199-
BitIndexIterator::new(nulls.validity(), nulls.offset(), nulls.len())
200-
.for_each(insert_value)
201-
}
202-
None => (0..in_array.len()).for_each(insert_value),
203-
}
204-
205-
Ok(())
206-
})?;
207-
208-
Ok(Self {
209-
in_array,
210-
state,
211-
map,
212-
})
213-
}
214-
}
215-
21684
/// Wrapper for f32 that implements Hash and Eq using bit comparison.
21785
/// This treats NaN values as equal to each other when they have the same bit pattern.
21886
#[derive(Clone, Copy)]
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Generic array-based static filter using hash lookups
19+
20+
use arrow::array::*;
21+
use arrow::buffer::{BooleanBuffer, NullBuffer};
22+
use arrow::compute::{SortOptions, take};
23+
use arrow::datatypes::DataType;
24+
use arrow::util::bit_iterator::BitIndexIterator;
25+
use datafusion_common::hash_utils::with_hashes;
26+
use datafusion_common::{HashMap, Result};
27+
28+
use ahash::RandomState;
29+
use hashbrown::hash_map::RawEntryMut;
30+
31+
/// Trait for InList static filters
32+
pub(crate) trait StaticFilter {
33+
fn null_count(&self) -> usize;
34+
35+
/// Checks if values in `v` are contained in the filter
36+
fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray>;
37+
}
38+
39+
/// Static filter for InList that stores the array and hash set for O(1) lookups
40+
#[derive(Debug, Clone)]
41+
pub(crate) struct ArrayStaticFilter {
42+
in_array: ArrayRef,
43+
state: RandomState,
44+
/// Used to provide a lookup from value to in list index
45+
///
46+
/// Note: usize::hash is not used, instead the raw entry
47+
/// API is used to store entries w.r.t their value
48+
map: HashMap<usize, (), ()>,
49+
}
50+
51+
impl StaticFilter for ArrayStaticFilter {
52+
fn null_count(&self) -> usize {
53+
self.in_array.null_count()
54+
}
55+
56+
/// Checks if values in `v` are contained in the `in_array` using this hash set for lookup.
57+
fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray> {
58+
// Null type comparisons always return null (SQL three-valued logic)
59+
if v.data_type() == &DataType::Null || self.in_array.data_type() == &DataType::Null
60+
{
61+
let nulls = NullBuffer::new_null(v.len());
62+
return Ok(BooleanArray::new(
63+
BooleanBuffer::new_unset(v.len()),
64+
Some(nulls),
65+
));
66+
}
67+
68+
downcast_dictionary_array! {
69+
v => {
70+
let values_contains = self.contains(v.values().as_ref(), negated)?;
71+
let result = take(&values_contains, v.keys(), None)?;
72+
return Ok(downcast_array(result.as_ref()))
73+
}
74+
_ => {}
75+
}
76+
77+
let needle_nulls = v.logical_nulls();
78+
let needle_nulls = needle_nulls.as_ref();
79+
let haystack_has_nulls = self.in_array.null_count() != 0;
80+
81+
with_hashes([v], &self.state, |hashes| {
82+
let cmp = make_comparator(v, &self.in_array, SortOptions::default())?;
83+
Ok((0..v.len())
84+
.map(|i| {
85+
// SQL three-valued logic: null IN (...) is always null
86+
if needle_nulls.is_some_and(|nulls| nulls.is_null(i)) {
87+
return None;
88+
}
89+
90+
let hash = hashes[i];
91+
let contains = self
92+
.map
93+
.raw_entry()
94+
.from_hash(hash, |idx| cmp(i, *idx).is_eq())
95+
.is_some();
96+
97+
match contains {
98+
true => Some(!negated),
99+
false if haystack_has_nulls => None,
100+
false => Some(negated),
101+
}
102+
})
103+
.collect())
104+
})
105+
}
106+
}
107+
108+
impl ArrayStaticFilter {
109+
/// Computes a [`StaticFilter`] for the provided [`Array`] if there
110+
/// are nulls present or there are more than the configured number of
111+
/// elements.
112+
///
113+
/// Note: This is split into a separate function as higher-rank trait bounds currently
114+
/// cause type inference to misbehave
115+
pub(crate) fn try_new(in_array: ArrayRef) -> Result<ArrayStaticFilter> {
116+
// Null type has no natural order - return empty hash set
117+
if in_array.data_type() == &DataType::Null {
118+
return Ok(ArrayStaticFilter {
119+
in_array,
120+
state: RandomState::new(),
121+
map: HashMap::with_hasher(()),
122+
});
123+
}
124+
125+
let state = RandomState::new();
126+
let mut map: HashMap<usize, (), ()> = HashMap::with_hasher(());
127+
128+
with_hashes([&in_array], &state, |hashes| -> Result<()> {
129+
let cmp = make_comparator(&in_array, &in_array, SortOptions::default())?;
130+
131+
let insert_value = |idx| {
132+
let hash = hashes[idx];
133+
if let RawEntryMut::Vacant(v) = map
134+
.raw_entry_mut()
135+
.from_hash(hash, |x| cmp(*x, idx).is_eq())
136+
{
137+
v.insert_with_hasher(hash, idx, (), |x| hashes[*x]);
138+
}
139+
};
140+
141+
match in_array.nulls() {
142+
Some(nulls) => {
143+
BitIndexIterator::new(nulls.validity(), nulls.offset(), nulls.len())
144+
.for_each(insert_value)
145+
}
146+
None => (0..in_array.len()).for_each(insert_value),
147+
}
148+
149+
Ok(())
150+
})?;
151+
152+
Ok(Self {
153+
in_array,
154+
state,
155+
map,
156+
})
157+
}
158+
}

0 commit comments

Comments
 (0)