Skip to content

Commit 401ef47

Browse files
Dandandanclaude
andcommitted
Use BooleanBuffer bitwise assign instead of arrow::compute::and
Replace `arrow::compute::and()` calls with `BooleanBuffer` bitwise operations (`&`, `&=`) where possible. This avoids allocating intermediate BooleanArrays with null handling overhead. - catalog/table.rs: accumulate filter masks with `BooleanBuffer &=` - joins/utils.rs: fold equality results with `BooleanBuffer &=` - metadata.rs: combine eq/exactness masks with `BooleanBuffer &` - correlation.rs: combine is_not_null results with `BooleanBuffer &` - approx_percentile_cont_with_weight.rs: same pattern Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 1e93a67 commit 401ef47

5 files changed

Lines changed: 68 additions & 37 deletions

File tree

datafusion/catalog/src/memory/table.rs

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ use crate::TableProvider;
2727
use arrow::array::{
2828
Array, ArrayRef, BooleanArray, RecordBatch as ArrowRecordBatch, UInt64Array,
2929
};
30+
use arrow::buffer::BooleanBuffer;
31+
use arrow::compute::filter_record_batch;
3032
use arrow::compute::kernels::zip::zip;
31-
use arrow::compute::{and, filter_record_batch};
3233
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
3334
use arrow::record_batch::RecordBatch;
3435
use datafusion_common::error::Result;
@@ -515,31 +516,37 @@ fn evaluate_filters_to_mask(
515516
return Ok(None);
516517
}
517518

518-
let mut combined_mask: Option<BooleanArray> = None;
519+
let mut combined_buf: Option<BooleanBuffer> = None;
519520

520521
for filter_expr in filters {
521522
let physical_expr =
522523
create_physical_expr(filter_expr, df_schema, execution_props)?;
523524

524525
let result = physical_expr.evaluate(batch)?;
525526
let array = result.into_array(batch.num_rows())?;
526-
let bool_array = array
527-
.as_any()
528-
.downcast_ref::<BooleanArray>()
529-
.ok_or_else(|| {
530-
datafusion_common::DataFusionError::Internal(
531-
"Filter did not evaluate to boolean".to_string(),
532-
)
533-
})?
534-
.clone();
535-
536-
combined_mask = Some(match combined_mask {
537-
Some(existing) => and(&existing, &bool_array)?,
538-
None => bool_array,
539-
});
527+
let bool_array =
528+
array
529+
.as_any()
530+
.downcast_ref::<BooleanArray>()
531+
.ok_or_else(|| {
532+
datafusion_common::DataFusionError::Internal(
533+
"Filter did not evaluate to boolean".to_string(),
534+
)
535+
})?;
536+
537+
// Convert to BooleanBuffer, treating null as false
538+
let buf = match bool_array.nulls() {
539+
Some(nulls) => bool_array.values() & nulls.inner(),
540+
None => bool_array.values().clone(),
541+
};
542+
543+
match &mut combined_buf {
544+
Some(existing) => *existing &= &buf,
545+
None => combined_buf = Some(buf),
546+
}
540547
}
541548

542-
Ok(combined_mask)
549+
Ok(combined_buf.map(|buf| BooleanArray::new(buf, None)))
543550
}
544551

545552
/// Returns a single row with the count of affected rows.

datafusion/datasource-parquet/src/metadata.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use crate::{
2222
ObjectStoreFetch, apply_file_schema_type_coercions, coerce_int96_to_resolution,
2323
};
2424
use arrow::array::{Array, ArrayRef, BooleanArray};
25-
use arrow::compute::and;
2625
use arrow::compute::kernels::cmp::eq;
2726
use arrow::compute::sum;
2827
use arrow::datatypes::{DataType, Schema, SchemaRef, TimeUnit};
@@ -649,8 +648,16 @@ fn has_any_exact_match(
649648

650649
let scalar_array = value.to_scalar().ok()?;
651650
let eq_mask = eq(&scalar_array, &array).ok()?;
652-
let combined_mask = and(&eq_mask, exactness).ok()?;
653-
Some(combined_mask.true_count() > 0)
651+
// Combine the two masks using BooleanBuffer bitwise AND, treating null as false
652+
let eq_buf = match eq_mask.nulls() {
653+
Some(nulls) => eq_mask.values() & nulls.inner(),
654+
None => eq_mask.values().clone(),
655+
};
656+
let exact_buf = match exactness.nulls() {
657+
Some(nulls) => exactness.values() & nulls.inner(),
658+
None => exactness.values().clone(),
659+
};
660+
Some(BooleanArray::new(&eq_buf & &exact_buf, None).has_true())
654661
}
655662

656663
/// Wrapper to implement [`FileMetadata`] for [`ParquetMetaData`].

datafusion/functions-aggregate/src/approx_percentile_cont_with_weight.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ use std::hash::Hash;
2020
use std::mem::size_of_val;
2121
use std::sync::Arc;
2222

23-
use arrow::compute::{and, filter, is_not_null};
23+
use arrow::array::BooleanArray;
24+
use arrow::compute::{filter, is_not_null};
2425
use arrow::datatypes::FieldRef;
2526
use arrow::{array::ArrayRef, datatypes::DataType};
2627
use datafusion_common::ScalarValue;
@@ -280,7 +281,10 @@ impl Accumulator for ApproxPercentileWithWeightAccumulator {
280281
match (means.null_count() > 0, weights.null_count() > 0) {
281282
// Both have nulls
282283
(true, true) => {
283-
let predicate = and(&is_not_null(&means)?, &is_not_null(&weights)?)?;
284+
let predicate = BooleanArray::new(
285+
is_not_null(&means)?.values() & is_not_null(&weights)?.values(),
286+
None,
287+
);
284288
means = filter(&means, &predicate)?;
285289
weights = filter(&weights, &predicate)?;
286290
}

datafusion/functions-aggregate/src/correlation.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use arrow::array::{
2525
Array, AsArray, BooleanArray, Float64Array, NullBufferBuilder, UInt64Array,
2626
downcast_array,
2727
};
28-
use arrow::compute::{and, filter, is_not_null};
28+
use arrow::compute::{filter, is_not_null};
2929
use arrow::datatypes::{FieldRef, Float64Type, UInt64Type};
3030
use arrow::{
3131
array::ArrayRef,
@@ -174,7 +174,10 @@ impl Accumulator for CorrelationAccumulator {
174174
// calculation logic in children accumulators, and calling only
175175
// calculation part from Correlation
176176
let values = if values[0].null_count() != 0 || values[1].null_count() != 0 {
177-
let mask = and(&is_not_null(&values[0])?, &is_not_null(&values[1])?)?;
177+
let mask = BooleanArray::new(
178+
is_not_null(&values[0])?.values() & is_not_null(&values[1])?.values(),
179+
None,
180+
);
178181
let values1 = filter(&values[0], &mask)?;
179182
let values2 = filter(&values[1], &mask)?;
180183

@@ -267,7 +270,10 @@ impl Accumulator for CorrelationAccumulator {
267270

268271
fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
269272
let values = if values[0].null_count() != 0 || values[1].null_count() != 0 {
270-
let mask = and(&is_not_null(&values[0])?, &is_not_null(&values[1])?)?;
273+
let mask = BooleanArray::new(
274+
is_not_null(&values[0])?.values() & is_not_null(&values[1])?.values(),
275+
None,
276+
);
271277
let values1 = filter(&values[0], &mask)?;
272278
let values2 = filter(&values[1], &mask)?;
273279

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ use arrow::array::{
5454
};
5555
use arrow::buffer::{BooleanBuffer, NullBuffer};
5656
use arrow::compute::kernels::cmp::eq;
57-
use arrow::compute::{self, FilterBuilder, and, take};
57+
use arrow::compute::{self, FilterBuilder, take};
5858
use arrow::datatypes::{
5959
ArrowNativeType, Field, Schema, SchemaBuilder, UInt32Type, UInt64Type,
6060
};
@@ -1788,19 +1788,26 @@ pub(super) fn equal_rows_arr(
17881788
let arr_left = take(first_left.as_ref(), indices_left, None)?;
17891789
let arr_right = take(first_right.as_ref(), indices_right, None)?;
17901790

1791-
let mut equal: BooleanArray = eq_dyn_null(&arr_left, &arr_right, null_equality)?;
1791+
let first_eq = eq_dyn_null(&arr_left, &arr_right, null_equality)?;
17921792

1793-
// Use map and try_fold to iterate over the remaining pairs of arrays.
1794-
// In each iteration, take is used on the pair of arrays and their equality is determined.
1795-
// The results are then folded (combined) using the and function to get a final equality result.
1796-
equal = iter
1797-
.map(|(left, right)| {
1798-
let arr_left = take(left.as_ref(), indices_left, None)?;
1799-
let arr_right = take(right.as_ref(), indices_right, None)?;
1800-
eq_dyn_null(arr_left.as_ref(), arr_right.as_ref(), null_equality)
1801-
})
1802-
.try_fold(equal, |acc, equal2| and(&acc, &equal2?))?;
1793+
// Accumulate equality results using BooleanBuffer bitwise AND, treating null as false.
1794+
// This avoids allocating intermediate BooleanArrays with null handling.
1795+
let to_filter_buf = |arr: &BooleanArray| -> BooleanBuffer {
1796+
match arr.nulls() {
1797+
Some(nulls) => arr.values() & nulls.inner(),
1798+
None => arr.values().clone(),
1799+
}
1800+
};
1801+
1802+
let mut equal_buf = to_filter_buf(&first_eq);
1803+
for (left, right) in iter {
1804+
let arr_left = take(left.as_ref(), indices_left, None)?;
1805+
let arr_right = take(right.as_ref(), indices_right, None)?;
1806+
let eq_result = eq_dyn_null(&arr_left, &arr_right, null_equality)?;
1807+
equal_buf &= &to_filter_buf(&eq_result);
1808+
}
18031809

1810+
let equal = BooleanArray::new(equal_buf, None);
18041811
let filter_builder = FilterBuilder::new(&equal).optimize().build();
18051812

18061813
let left_filtered = filter_builder.filter(indices_left)?;

0 commit comments

Comments
 (0)