Skip to content

Commit d4ee901

Browse files
Implement FixedSizeBinary zero-copy reinterpretation optimization
FixedSizeBinary(N) arrays share the same contiguous buffer layout as primitive arrays, so for power-of-2 widths (1, 2, 4, 8, 16) we can zero-copy reinterpret them and use the optimized primitive filters (bitmap, branchless, hash) instead of falling through to the NestedTypeFilter fallback.
1 parent 85f2d9a commit d4ee901

3 files changed

Lines changed: 196 additions & 10 deletions

File tree

datafusion/physical-expr/src/expressions/in_list.rs

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -853,6 +853,135 @@ mod tests {
853853
])
854854
}
855855

856+
#[test]
857+
fn test_in_list_fixed_size_binary_canonical_path() -> Result<()> {
858+
let width = 16;
859+
let matching = vec![0x10; width as usize];
860+
let also_in_list = vec![0x20; width as usize];
861+
let not_in_list = vec![0x30; width as usize];
862+
863+
let schema = Schema::new(vec![Field::new(
864+
"a",
865+
DataType::FixedSizeBinary(width),
866+
true,
867+
)]);
868+
let col_a = col("a", &schema)?;
869+
let list_array = Arc::new(FixedSizeBinaryArray::from(vec![
870+
matching.as_slice(),
871+
also_in_list.as_slice(),
872+
])) as ArrayRef;
873+
let expr = Arc::new(InListExpr::try_new_from_array(
874+
Arc::clone(&col_a),
875+
list_array,
876+
false,
877+
)?) as Arc<dyn PhysicalExpr>;
878+
879+
let batch_array = Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size(
880+
vec![
881+
Some(matching.as_slice()),
882+
Some(not_in_list.as_slice()),
883+
None,
884+
]
885+
.into_iter(),
886+
width,
887+
)?) as ArrayRef;
888+
let batch = RecordBatch::try_new(Arc::new(schema), vec![batch_array])?;
889+
890+
let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
891+
let result = as_boolean_array(&result);
892+
assert_eq!(
893+
result,
894+
&BooleanArray::from(vec![Some(true), Some(false), None])
895+
);
896+
897+
Ok(())
898+
}
899+
900+
#[test]
901+
fn test_in_list_fixed_size_binary_offset_path() -> Result<()> {
902+
let width = 16;
903+
let before_slice = vec![0x01; width as usize];
904+
let in_slice = vec![0x02; width as usize];
905+
let not_in_slice = vec![0x03; width as usize];
906+
907+
let schema = Schema::new(vec![Field::new(
908+
"a",
909+
DataType::FixedSizeBinary(width),
910+
false,
911+
)]);
912+
let col_a = col("a", &schema)?;
913+
let parent = Arc::new(FixedSizeBinaryArray::from(vec![
914+
before_slice.as_slice(),
915+
in_slice.as_slice(),
916+
not_in_slice.as_slice(),
917+
])) as ArrayRef;
918+
let sliced_list = parent.slice(1, 1);
919+
let expr = Arc::new(InListExpr::try_new_from_array(
920+
Arc::clone(&col_a),
921+
sliced_list,
922+
false,
923+
)?) as Arc<dyn PhysicalExpr>;
924+
925+
let batch_array = Arc::new(FixedSizeBinaryArray::from(vec![
926+
in_slice.as_slice(),
927+
before_slice.as_slice(),
928+
])) as ArrayRef;
929+
let batch = RecordBatch::try_new(Arc::new(schema), vec![batch_array])?;
930+
931+
let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
932+
let result = as_boolean_array(&result);
933+
assert_eq!(result, &BooleanArray::from(vec![Some(true), Some(false)]));
934+
935+
Ok(())
936+
}
937+
938+
#[test]
939+
fn test_in_list_fixed_size_binary_offset_path_with_nulls() -> Result<()> {
940+
let width = 16;
941+
let before_slice = vec![0xAA; width as usize];
942+
let in_slice = vec![0xBB; width as usize];
943+
let not_in_slice = vec![0xCC; width as usize];
944+
945+
let schema = Schema::new(vec![Field::new(
946+
"a",
947+
DataType::FixedSizeBinary(width),
948+
true,
949+
)]);
950+
let col_a = col("a", &schema)?;
951+
let parent = Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size(
952+
vec![
953+
Some(before_slice.as_slice()),
954+
None,
955+
Some(in_slice.as_slice()),
956+
]
957+
.into_iter(),
958+
width,
959+
)?) as ArrayRef;
960+
let sliced_list = parent.slice(1, 2);
961+
let expr = Arc::new(InListExpr::try_new_from_array(
962+
Arc::clone(&col_a),
963+
sliced_list,
964+
false,
965+
)?) as Arc<dyn PhysicalExpr>;
966+
967+
let batch_array = Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size(
968+
vec![
969+
Some(in_slice.as_slice()),
970+
Some(not_in_slice.as_slice()),
971+
None,
972+
]
973+
.into_iter(),
974+
width,
975+
)?) as ArrayRef;
976+
let batch = RecordBatch::try_new(Arc::new(schema), vec![batch_array])?;
977+
978+
let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
979+
let result = as_boolean_array(&result);
980+
assert_eq!(result, &BooleanArray::from(vec![Some(true), None, None]));
981+
982+
Ok(())
983+
}
984+
856985
/// Test IN LIST for date types (Date32, Date64).
857986
///
858987
/// Test data: 0 (in list), 2 (not in list), [1, 3] (other list values)

datafusion/physical-expr/src/expressions/in_list/strategy.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
//! - 8-byte types (Int64/Float64): branchless (≤16) or hash (>16)
2626
//! - 16-byte types (Decimal128): branchless (≤4) or hash (>4)
2727
//! - Utf8View (short strings): branchless (≤4) or hash (>4)
28+
//! - FixedSizeBinary(N) for N ∈ {1,2,4,8,16}: reinterpreted as N-byte primitive
2829
//! - Byte arrays (Utf8, Binary, etc.): ByteArrayFilter / ByteViewFilter
2930
//! - Other types: NestedTypeFilter (fallback for List, Struct, Map, etc.)
3031
@@ -146,6 +147,11 @@ pub(crate) fn instantiate_static_filter(
146147
};
147148
}
148149

150+
// FixedSizeBinary with power-of-2 width: reinterpret as primitive
151+
if let &DataType::FixedSizeBinary(byte_width) = dt {
152+
return instantiate_fixed_size_binary_filter(in_array, byte_width);
153+
}
154+
149155
let strategy = select_strategy(dt, len);
150156

151157
match (dt, strategy) {
@@ -195,6 +201,49 @@ pub(crate) fn instantiate_static_filter(
195201
}
196202
}
197203

204+
// =============================================================================
205+
// FIXED-SIZE BINARY DISPATCH
206+
// =============================================================================
207+
208+
/// Creates the optimal filter for FixedSizeBinary(N) arrays.
209+
///
210+
/// For power-of-2 widths (1, 2, 4, 8, 16), FixedSizeBinary has the same
211+
/// contiguous buffer layout as primitive arrays, so we zero-copy reinterpret
212+
/// and use the optimized primitive filters (bitmap, branchless, hash).
213+
/// Non-power-of-2 widths fall back to NestedTypeFilter.
214+
fn instantiate_fixed_size_binary_filter(
215+
in_array: ArrayRef,
216+
byte_width: i32,
217+
) -> Result<Arc<dyn StaticFilter + Send + Sync>> {
218+
let len = in_array.len() - in_array.null_count();
219+
match byte_width {
220+
1 => make_bitmap_filter::<U8Config>(&in_array),
221+
2 => make_bitmap_filter::<U16Config>(&in_array),
222+
4 => {
223+
if len <= BRANCHLESS_MAX_4B {
224+
make_branchless_filter::<UInt32Type>(&in_array, 4)
225+
} else {
226+
make_direct_probe_filter_reinterpreted::<UInt32Type>(&in_array)
227+
}
228+
}
229+
8 => {
230+
if len <= BRANCHLESS_MAX_8B {
231+
make_branchless_filter::<UInt64Type>(&in_array, 8)
232+
} else {
233+
make_direct_probe_filter_reinterpreted::<UInt64Type>(&in_array)
234+
}
235+
}
236+
16 => {
237+
if len <= BRANCHLESS_MAX_16B {
238+
make_branchless_filter::<Decimal128Type>(&in_array, 16)
239+
} else {
240+
make_direct_probe_filter_reinterpreted::<Decimal128Type>(&in_array)
241+
}
242+
}
243+
_ => Ok(Arc::new(NestedTypeFilter::try_new(in_array)?)),
244+
}
245+
}
246+
198247
// =============================================================================
199248
// TYPE DISPATCH
200249
// =============================================================================

datafusion/physical-expr/src/expressions/in_list/transform.rs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,16 @@
2626
//! multiple types (Int64, Float64, Timestamp, Duration) that share the same
2727
//! byte width, reducing code duplication.
2828
29+
use std::hash::BuildHasher;
2930
use std::marker::PhantomData;
3031
use std::mem::size_of;
3132
use std::sync::Arc;
3233

33-
use ahash::RandomState;
3434
use arrow::array::{Array, ArrayRef, AsArray, BooleanArray, PrimitiveArray};
3535
use arrow::buffer::ScalarBuffer;
36-
use arrow::datatypes::{ArrowPrimitiveType, ByteViewType, Decimal128Type};
36+
use arrow::datatypes::{ArrowPrimitiveType, ByteViewType, DataType, Decimal128Type};
3737
use arrow::util::bit_iterator::BitIndexIterator;
38+
use datafusion_common::hash_utils::RandomState;
3839
use datafusion_common::hash_utils::with_hashes;
3940
use datafusion_common::{Result, exec_datafusion_err};
4041
use hashbrown::HashTable;
@@ -88,11 +89,17 @@ where
8889
}
8990

9091
fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray> {
92+
fn matches_reinterpreted_width(dt: &DataType, width: usize) -> bool {
93+
dt.primitive_width() == Some(width)
94+
|| matches!(dt, DataType::FixedSizeBinary(byte_width) if *byte_width as usize == width)
95+
}
96+
9197
arrow::array::downcast_dictionary_array! {
9298
v => {
93-
if v.values().data_type().primitive_width()
94-
!= Some(size_of::<T::Native>())
95-
{
99+
if !matches_reinterpreted_width(
100+
v.values().data_type(),
101+
size_of::<T::Native>(),
102+
) {
96103
return Err(exec_datafusion_err!(
97104
"Failed to downcast array to primitive type"
98105
));
@@ -105,7 +112,7 @@ where
105112
_ => {}
106113
}
107114

108-
if v.data_type().primitive_width() != Some(size_of::<T::Native>()) {
115+
if !matches_reinterpreted_width(v.data_type(), size_of::<T::Native>()) {
109116
return Err(exec_datafusion_err!(
110117
"Failed to downcast array to primitive type"
111118
));
@@ -151,8 +158,9 @@ impl StaticFilter for Utf8ViewHashFilter {
151158
pub(crate) fn reinterpret_any_primitive_to<T: ArrowPrimitiveType>(
152159
array: &dyn Array,
153160
) -> ArrayRef {
154-
let values = array.to_data().buffers()[0].clone();
155-
let buffer: ScalarBuffer<T::Native> = values.into();
161+
let data = array.to_data();
162+
let values = data.buffers()[0].clone();
163+
let buffer = ScalarBuffer::<T::Native>::new(values, data.offset(), data.len());
156164
Arc::new(PrimitiveArray::<T>::new(buffer, array.nulls().cloned()))
157165
}
158166

@@ -417,7 +425,7 @@ where
417425
let views: &[i128] = bv.views().inner().typed_data();
418426

419427
let mut masked_views = Vec::new();
420-
let state = RandomState::new();
428+
let state = RandomState::default();
421429
let mut long_value_table = HashTable::new();
422430

423431
// Build hash table for long strings using batch hashing
@@ -617,7 +625,7 @@ impl<O: arrow::array::OffsetSizeTrait + 'static> Utf8TwoStageFilter<O> {
617625

618626
let len = arr.len();
619627
let mut encoded_values = Vec::with_capacity(len);
620-
let state = RandomState::new();
628+
let state = RandomState::default();
621629
let mut long_string_table = HashTable::new();
622630
let mut all_short = true;
623631

0 commit comments

Comments
 (0)