Skip to content

Commit 5ece6c6

Browse files
Implement FixedSizeBinary zero-copy reinterpretation optimization
FixedSizeBinary(N) arrays share the same contiguous buffer layout as primitive arrays, so for power-of-2 widths (1, 2, 4, 8, 16) we can zero-copy reinterpret them and use the optimized primitive filters (bitmap, branchless, hash) instead of falling through to the NestedTypeFilter fallback.
1 parent 4b34f4a commit 5ece6c6

3 files changed

Lines changed: 196 additions & 10 deletions

File tree

datafusion/physical-expr/src/expressions/in_list.rs

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -858,6 +858,135 @@ mod tests {
858858
])
859859
}
860860

861+
#[test]
862+
fn test_in_list_fixed_size_binary_canonical_path() -> Result<()> {
863+
let width = 16;
864+
let matching = vec![0x10; width as usize];
865+
let also_in_list = vec![0x20; width as usize];
866+
let not_in_list = vec![0x30; width as usize];
867+
868+
let schema = Schema::new(vec![Field::new(
869+
"a",
870+
DataType::FixedSizeBinary(width),
871+
true,
872+
)]);
873+
let col_a = col("a", &schema)?;
874+
let list_array = Arc::new(FixedSizeBinaryArray::from(vec![
875+
matching.as_slice(),
876+
also_in_list.as_slice(),
877+
])) as ArrayRef;
878+
let expr = Arc::new(InListExpr::try_new_from_array(
879+
Arc::clone(&col_a),
880+
list_array,
881+
false,
882+
)?) as Arc<dyn PhysicalExpr>;
883+
884+
let batch_array = Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size(
885+
vec![
886+
Some(matching.as_slice()),
887+
Some(not_in_list.as_slice()),
888+
None,
889+
]
890+
.into_iter(),
891+
width,
892+
)?) as ArrayRef;
893+
let batch = RecordBatch::try_new(Arc::new(schema), vec![batch_array])?;
894+
895+
let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
896+
let result = as_boolean_array(&result);
897+
assert_eq!(
898+
result,
899+
&BooleanArray::from(vec![Some(true), Some(false), None])
900+
);
901+
902+
Ok(())
903+
}
904+
905+
#[test]
906+
fn test_in_list_fixed_size_binary_offset_path() -> Result<()> {
907+
let width = 16;
908+
let before_slice = vec![0x01; width as usize];
909+
let in_slice = vec![0x02; width as usize];
910+
let not_in_slice = vec![0x03; width as usize];
911+
912+
let schema = Schema::new(vec![Field::new(
913+
"a",
914+
DataType::FixedSizeBinary(width),
915+
false,
916+
)]);
917+
let col_a = col("a", &schema)?;
918+
let parent = Arc::new(FixedSizeBinaryArray::from(vec![
919+
before_slice.as_slice(),
920+
in_slice.as_slice(),
921+
not_in_slice.as_slice(),
922+
])) as ArrayRef;
923+
let sliced_list = parent.slice(1, 1);
924+
let expr = Arc::new(InListExpr::try_new_from_array(
925+
Arc::clone(&col_a),
926+
sliced_list,
927+
false,
928+
)?) as Arc<dyn PhysicalExpr>;
929+
930+
let batch_array = Arc::new(FixedSizeBinaryArray::from(vec![
931+
in_slice.as_slice(),
932+
before_slice.as_slice(),
933+
])) as ArrayRef;
934+
let batch = RecordBatch::try_new(Arc::new(schema), vec![batch_array])?;
935+
936+
let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
937+
let result = as_boolean_array(&result);
938+
assert_eq!(result, &BooleanArray::from(vec![Some(true), Some(false)]));
939+
940+
Ok(())
941+
}
942+
943+
#[test]
944+
fn test_in_list_fixed_size_binary_offset_path_with_nulls() -> Result<()> {
945+
let width = 16;
946+
let before_slice = vec![0xAA; width as usize];
947+
let in_slice = vec![0xBB; width as usize];
948+
let not_in_slice = vec![0xCC; width as usize];
949+
950+
let schema = Schema::new(vec![Field::new(
951+
"a",
952+
DataType::FixedSizeBinary(width),
953+
true,
954+
)]);
955+
let col_a = col("a", &schema)?;
956+
let parent = Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size(
957+
vec![
958+
Some(before_slice.as_slice()),
959+
None,
960+
Some(in_slice.as_slice()),
961+
]
962+
.into_iter(),
963+
width,
964+
)?) as ArrayRef;
965+
let sliced_list = parent.slice(1, 2);
966+
let expr = Arc::new(InListExpr::try_new_from_array(
967+
Arc::clone(&col_a),
968+
sliced_list,
969+
false,
970+
)?) as Arc<dyn PhysicalExpr>;
971+
972+
let batch_array = Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size(
973+
vec![
974+
Some(in_slice.as_slice()),
975+
Some(not_in_slice.as_slice()),
976+
None,
977+
]
978+
.into_iter(),
979+
width,
980+
)?) as ArrayRef;
981+
let batch = RecordBatch::try_new(Arc::new(schema), vec![batch_array])?;
982+
983+
let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
984+
let result = as_boolean_array(&result);
985+
assert_eq!(result, &BooleanArray::from(vec![Some(true), None, None]));
986+
987+
Ok(())
988+
}
989+
861990
/// Test IN LIST for date types (Date32, Date64).
862991
///
863992
/// Test data: 0 (in list), 2 (not in list), [1, 3] (other list values)

datafusion/physical-expr/src/expressions/in_list/strategy.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
//! - 8-byte types (Int64/Float64): branchless (≤16) or hash (>16)
2626
//! - 16-byte types (Decimal128): branchless (≤4) or hash (>4)
2727
//! - Utf8View (short strings): branchless (≤4) or hash (>4)
28+
//! - FixedSizeBinary(N) for N ∈ {1,2,4,8,16}: reinterpreted as N-byte primitive
2829
//! - Byte arrays (Utf8, Binary, etc.): ByteArrayFilter / ByteViewFilter
2930
//! - Other types: ArrayStaticFilter (fallback for List, Struct, Map, etc.)
3031
@@ -146,6 +147,11 @@ pub(super) fn instantiate_static_filter(
146147
};
147148
}
148149

150+
// FixedSizeBinary with power-of-2 width: reinterpret as primitive
151+
if let &DataType::FixedSizeBinary(byte_width) = dt {
152+
return instantiate_fixed_size_binary_filter(in_array, byte_width);
153+
}
154+
149155
let strategy = select_strategy(dt, len);
150156

151157
match (dt, strategy) {
@@ -195,6 +201,49 @@ pub(super) fn instantiate_static_filter(
195201
}
196202
}
197203

204+
// =============================================================================
205+
// FIXED-SIZE BINARY DISPATCH
206+
// =============================================================================
207+
208+
/// Creates the optimal filter for FixedSizeBinary(N) arrays.
209+
///
210+
/// For power-of-2 widths (1, 2, 4, 8, 16), FixedSizeBinary has the same
211+
/// contiguous buffer layout as primitive arrays, so we zero-copy reinterpret
212+
/// and use the optimized primitive filters (bitmap, branchless, hash).
213+
/// Non-power-of-2 widths fall back to ArrayStaticFilter.
214+
fn instantiate_fixed_size_binary_filter(
215+
in_array: ArrayRef,
216+
byte_width: i32,
217+
) -> Result<Arc<dyn StaticFilter + Send + Sync>> {
218+
let len = in_array.len() - in_array.null_count();
219+
match byte_width {
220+
1 => make_bitmap_filter::<U8Config>(&in_array),
221+
2 => make_bitmap_filter::<U16Config>(&in_array),
222+
4 => {
223+
if len <= BRANCHLESS_MAX_4B {
224+
make_branchless_filter::<UInt32Type>(&in_array, 4)
225+
} else {
226+
make_direct_probe_filter_reinterpreted::<UInt32Type>(&in_array)
227+
}
228+
}
229+
8 => {
230+
if len <= BRANCHLESS_MAX_8B {
231+
make_branchless_filter::<UInt64Type>(&in_array, 8)
232+
} else {
233+
make_direct_probe_filter_reinterpreted::<UInt64Type>(&in_array)
234+
}
235+
}
236+
16 => {
237+
if len <= BRANCHLESS_MAX_16B {
238+
make_branchless_filter::<Decimal128Type>(&in_array, 16)
239+
} else {
240+
make_direct_probe_filter_reinterpreted::<Decimal128Type>(&in_array)
241+
}
242+
}
243+
_ => Ok(Arc::new(ArrayStaticFilter::try_new(in_array)?)),
244+
}
245+
}
246+
198247
// =============================================================================
199248
// TYPE DISPATCH
200249
// =============================================================================

datafusion/physical-expr/src/expressions/in_list/transform.rs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,16 @@
2626
//! multiple types (Int64, Float64, Timestamp, Duration) that share the same
2727
//! byte width, reducing code duplication.
2828
29+
use std::hash::BuildHasher;
2930
use std::marker::PhantomData;
3031
use std::mem::size_of;
3132
use std::sync::Arc;
3233

33-
use ahash::RandomState;
3434
use arrow::array::{Array, ArrayRef, AsArray, BooleanArray, PrimitiveArray};
3535
use arrow::buffer::ScalarBuffer;
36-
use arrow::datatypes::{ArrowPrimitiveType, ByteViewType, Decimal128Type};
36+
use arrow::datatypes::{ArrowPrimitiveType, ByteViewType, DataType, Decimal128Type};
3737
use arrow::util::bit_iterator::BitIndexIterator;
38+
use datafusion_common::hash_utils::RandomState;
3839
use datafusion_common::hash_utils::with_hashes;
3940
use datafusion_common::{Result, exec_datafusion_err};
4041
use hashbrown::HashTable;
@@ -88,11 +89,17 @@ where
8889
}
8990

9091
fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray> {
92+
fn matches_reinterpreted_width(dt: &DataType, width: usize) -> bool {
93+
dt.primitive_width() == Some(width)
94+
|| matches!(dt, DataType::FixedSizeBinary(byte_width) if *byte_width as usize == width)
95+
}
96+
9197
arrow::array::downcast_dictionary_array! {
9298
v => {
93-
if v.values().data_type().primitive_width()
94-
!= Some(size_of::<T::Native>())
95-
{
99+
if !matches_reinterpreted_width(
100+
v.values().data_type(),
101+
size_of::<T::Native>(),
102+
) {
96103
return Err(exec_datafusion_err!(
97104
"Failed to downcast array to primitive type"
98105
));
@@ -105,7 +112,7 @@ where
105112
_ => {}
106113
}
107114

108-
if v.data_type().primitive_width() != Some(size_of::<T::Native>()) {
115+
if !matches_reinterpreted_width(v.data_type(), size_of::<T::Native>()) {
109116
return Err(exec_datafusion_err!(
110117
"Failed to downcast array to primitive type"
111118
));
@@ -151,8 +158,9 @@ impl StaticFilter for Utf8ViewHashFilter {
151158
pub(crate) fn reinterpret_any_primitive_to<T: ArrowPrimitiveType>(
152159
array: &dyn Array,
153160
) -> ArrayRef {
154-
let values = array.to_data().buffers()[0].clone();
155-
let buffer: ScalarBuffer<T::Native> = values.into();
161+
let data = array.to_data();
162+
let values = data.buffers()[0].clone();
163+
let buffer = ScalarBuffer::<T::Native>::new(values, data.offset(), data.len());
156164
Arc::new(PrimitiveArray::<T>::new(buffer, array.nulls().cloned()))
157165
}
158166

@@ -417,7 +425,7 @@ where
417425
let views: &[i128] = bv.views().inner().typed_data();
418426

419427
let mut masked_views = Vec::new();
420-
let state = RandomState::new();
428+
let state = RandomState::default();
421429
let mut long_value_table = HashTable::new();
422430

423431
// Build hash table for long strings using batch hashing
@@ -617,7 +625,7 @@ impl<O: arrow::array::OffsetSizeTrait + 'static> Utf8TwoStageFilter<O> {
617625

618626
let len = arr.len();
619627
let mut encoded_values = Vec::with_capacity(len);
620-
let state = RandomState::new();
628+
let state = RandomState::default();
621629
let mut long_string_table = HashTable::new();
622630
let mut all_short = true;
623631

0 commit comments

Comments
 (0)