From ca7e0a82865af30041a82609b5151266fc1f75d2 Mon Sep 17 00:00:00 2001 From: Raushan Prabhakar Date: Thu, 23 Apr 2026 20:50:27 +0530 Subject: [PATCH] Adding Use of arrow's has_true() / has_false() --- datafusion/datasource-parquet/src/metadata.rs | 14 +++++-------- datafusion/functions-nested/src/array_has.rs | 2 +- datafusion/functions-nested/src/replace.rs | 2 +- .../physical-expr-common/src/physical_expr.rs | 10 ++++----- datafusion/physical-expr-common/src/utils.rs | 6 +++--- .../physical-expr/src/expressions/binary.rs | 8 +++---- .../physical-expr/src/expressions/case.rs | 21 ++++++------------- .../physical-expr/src/expressions/in_list.rs | 2 +- .../src/joins/nested_loop_join.rs | 9 +++----- .../src/joins/sort_merge_join/filter.rs | 2 +- datafusion/physical-plan/src/topk/mod.rs | 4 ++-- .../src/function/array/array_contains.rs | 2 +- 12 files changed, 32 insertions(+), 50 deletions(-) diff --git a/datafusion/datasource-parquet/src/metadata.rs b/datafusion/datasource-parquet/src/metadata.rs index 3d4d051abbb26..70ac3fe4987c0 100644 --- a/datafusion/datasource-parquet/src/metadata.rs +++ b/datafusion/datasource-parquet/src/metadata.rs @@ -520,12 +520,10 @@ fn summarize_column_statistics( // handle the common special case when all row groups have exact statistics let exactness = &is_max_value_exact_stat; - if !exactness.is_empty() - && exactness.null_count() == 0 - && exactness.true_count() == exactness.len() + if !exactness.is_empty() && exactness.null_count() == 0 && !exactness.has_false() { accumulators.is_max_value_exact[logical_schema_index] = Some(true); - } else if exactness.true_count() == 0 { + } else if !exactness.has_true() { accumulators.is_max_value_exact[logical_schema_index] = Some(false); } else { let val = max_acc.evaluate()?; @@ -539,12 +537,10 @@ fn summarize_column_statistics( // handle the common special case when all row groups have exact statistics let exactness = &is_min_value_exact_stat; - if !exactness.is_empty() - && exactness.null_count() == 0 - && exactness.true_count() == exactness.len() + if !exactness.is_empty() && exactness.null_count() == 0 && !exactness.has_false() { accumulators.is_min_value_exact[logical_schema_index] = Some(true); - } else if exactness.true_count() == 0 { + } else if !exactness.has_true() { accumulators.is_min_value_exact[logical_schema_index] = Some(false); } else { let val = min_acc.evaluate()?; @@ -675,7 +671,7 @@ fn has_any_exact_match( let scalar_array = value.to_scalar().ok()?; let eq_mask = eq(&scalar_array, &array).ok()?; let combined_mask = and(&eq_mask, exactness).ok()?; - Some(combined_mask.true_count() > 0) + Some(combined_mask.has_true()) } /// Wrapper to implement [`FileMetadata`] for [`ParquetMetaData`]. diff --git a/datafusion/functions-nested/src/array_has.rs b/datafusion/functions-nested/src/array_has.rs index d5b2e3e6cf67c..04818258f040b 100644 --- a/datafusion/functions-nested/src/array_has.rs +++ b/datafusion/functions-nested/src/array_has.rs @@ -338,7 +338,7 @@ fn array_has_dispatch_for_array<'a>( let is_nested = arr.data_type().is_nested(); let needle_row = Scalar::new(needle.slice(i, 1)); let eq_array = compare_with_eq(&arr, &needle_row, is_nested)?; - result.append(eq_array.true_count() > 0); + result.append(eq_array.has_true()); } Ok(Arc::new(BooleanArray::new(result.finish(), combined_nulls))) diff --git a/datafusion/functions-nested/src/replace.rs b/datafusion/functions-nested/src/replace.rs index 3de66810fcaf0..7effd13b696ad 100644 --- a/datafusion/functions-nested/src/replace.rs +++ b/datafusion/functions-nested/src/replace.rs @@ -347,7 +347,7 @@ fn general_replace( let mut counter = 0; // All elements are false, no need to replace, just copy original data - if eq_array.false_count() == eq_array.len() { + if !eq_array.has_true() { mutable.extend( original_idx.to_usize().unwrap(), start.to_usize().unwrap(), diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index a42a1560cb769..d91f91b6fb2ca 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use crate::utils::scatter; -use arrow::array::{ArrayRef, BooleanArray, new_empty_array}; +use arrow::array::{Array, ArrayRef, BooleanArray, new_empty_array}; use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, Field, FieldRef, Schema}; use arrow::record_batch::RecordBatch; @@ -109,17 +109,15 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { ); } - let selection_count = selection.true_count(); - // First, check if we can avoid filtering altogether. - if selection_count == row_count { + if selection.null_count() == 0 && !selection.has_false() { // All values from the `selection` filter are true and match the input batch. // No need to perform any filtering. return self.evaluate(batch); } // Next, prepare the result array for each 'true' row in the selection vector. - let filtered_result = if selection_count == 0 { + let filtered_result = if !selection.has_true() { // Do not call `evaluate` when the selection is empty. // `evaluate_selection` is used to conditionally evaluate expressions. // When the expression in question is fallible, evaluating it with an empty @@ -132,7 +130,7 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { // If we reach this point, there's no other option than to filter the batch. // This is a fairly costly operation since it requires creating partial copies // (worst case of length `row_count - 1`) of all the arrays in the record batch. - // The resulting `filtered_batch` will contain `selection_count` rows. + // The resulting `filtered_batch` will contain one row per true in `selection`. let filtered_batch = filter_record_batch(batch, selection)?; self.evaluate(&filtered_batch)? }; diff --git a/datafusion/physical-expr-common/src/utils.rs b/datafusion/physical-expr-common/src/utils.rs index 6ca00a4460aaf..e469885f83316 100644 --- a/datafusion/physical-expr-common/src/utils.rs +++ b/datafusion/physical-expr-common/src/utils.rs @@ -83,18 +83,18 @@ pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result { }; let output_len = mask.len(); - let count = mask.true_count(); // Fast path: no true values mean all-null object - if count == 0 { + if !mask.has_true() { return Ok(new_null_array(truthy.data_type(), output_len)); } // Fast path: all true means output = truthy - if count == output_len { + if mask.null_count() == 0 && !mask.has_false() { return Ok(truthy.slice(0, truthy.len())); } + let count = mask.true_count(); let selectivity = count as f64 / output_len as f64; let mask_buffer = mask.values(); diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 268a350a4da2c..ae93b1d03ac5b 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -289,11 +289,11 @@ impl PhysicalExpr for BinaryExpr { ColumnarValue::Array(array) => { // When the array on the right is all true or all false, skip the scatter process let boolean_array = array.as_boolean(); - let true_count = boolean_array.true_count(); - let length = boolean_array.len(); - if true_count == length { + if boolean_array.null_count() == 0 && !boolean_array.has_false() { return Ok(lhs); - } else if true_count == 0 && boolean_array.null_count() == 0 { + } else if boolean_array.null_count() == 0 + && !boolean_array.has_true() + { // If the right-hand array is returned at this point,the lengths will be inconsistent; // returning a scalar can avoid this issue return Ok(ColumnarValue::Scalar(ScalarValue::Boolean( diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs index 25aedecc2c544..d06efb0786c46 100644 --- a/datafusion/physical-expr/src/expressions/case.rs +++ b/datafusion/physical-expr/src/expressions/case.rs @@ -793,17 +793,13 @@ impl CaseBody { } }?; - // `true_count` ignores `true` values where the validity bit is not set, so there's - // no need to call `prep_null_mask_filter`. - let when_true_count = when_value.true_count(); - // If the 'when' predicate did not match any rows, continue to the next branch immediately - if when_true_count == 0 { + if !when_value.has_true() { continue; } // If the 'when' predicate matched all remaining rows, there is no need to filter - if when_true_count == remainder_batch.num_rows() { + if when_value.null_count() == 0 && !when_value.has_false() { let then_expression = &self.when_then_expr[i].1; let then_value = then_expression.evaluate(&remainder_batch)?; result_builder.add_branch_result(&remainder_rows, then_value)?; @@ -882,17 +878,13 @@ impl CaseBody { internal_datafusion_err!("WHEN expression did not return a BooleanArray") })?; - // `true_count` ignores `true` values where the validity bit is not set, so there's - // no need to call `prep_null_mask_filter`. - let when_true_count = when_value.true_count(); - // If the 'when' predicate did not match any rows, continue to the next branch immediately - if when_true_count == 0 { + if !when_value.has_true() { continue; } // If the 'when' predicate matched all remaining rows, there is no need to filter - if when_true_count == remainder_batch.num_rows() { + if when_value.null_count() == 0 && !when_value.has_false() { let then_expression = &self.when_then_expr[i].1; let then_value = then_expression.evaluate(&remainder_batch)?; result_builder.add_branch_result(&remainder_rows, then_value)?; @@ -1144,11 +1136,10 @@ impl CaseExpr { ) })?; - let true_count = when_value.true_count(); - if true_count == when_value.len() { + if when_value.null_count() == 0 && !when_value.has_false() { // All input rows are true, just call the 'then' expression self.body.when_then_expr[0].1.evaluate(batch) - } else if true_count == 0 { + } else if !when_value.has_true() { // All input rows are false/null, just call the 'else' expression match &self.body.else_expr { Some(else_expr) => else_expr.evaluate(batch), diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index 2377edf9375cf..c3656a4423138 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -854,7 +854,7 @@ impl PhysicalExpr for InListExpr { for expr in self.list.iter().skip(1) { // Short-circuit: if every non-null row is already true, // no further list items can change the result. - if found.null_count() == 0 && found.true_count() == num_rows { + if found.null_count() == 0 && !found.has_false() { break; } found = or_kleene(&found, &compare_one(expr)?)?; diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index c686d9d5a05ff..add4508c87bd0 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -2105,7 +2105,7 @@ impl NestedLoopJoinStream { return Ok(None); } - if cur_right_bitmap.true_count() == 0 { + if !cur_right_bitmap.has_true() { // If none of the pairs has passed the join predicate/filter Ok(None) } else { @@ -2300,11 +2300,8 @@ impl NestedLoopJoinStream { ) -> Result<()> { let left_data = self.get_left_data()?; - // number of successfully joined pairs from (l_index x cur_right_batch) - let joined_len = r_matched_bitmap.true_count(); - // 1. Maybe update the left bitmap - if need_produce_result_in_final(self.join_type) && (joined_len > 0) { + if need_produce_result_in_final(self.join_type) && r_matched_bitmap.has_true() { let mut bitmap = left_data.bitmap().lock(); bitmap.set_bit(l_index, true); } @@ -2696,7 +2693,7 @@ fn build_unmatched_batch( not(&batch_bitmap)? }; - if bitmap.true_count() == 0 { + if !bitmap.has_true() { return Ok(None); } diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/filter.rs b/datafusion/physical-plan/src/joins/sort_merge_join/filter.rs index 3a5f0d0f0598f..4fc6cccaa8838 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/filter.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/filter.rs @@ -331,7 +331,7 @@ pub fn filter_record_batch_by_join_type( .unwrap(); // All rows passed the filter — no null-joining needed - if kept_corrected.true_count() == kept_corrected.len() { + if !kept_corrected.has_false() { return Ok(kept_batch); } diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index 2c6d3bcad1a65..4a38bdab8d15e 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -254,11 +254,11 @@ impl TopK { let num_rows = batch.num_rows(); let array = filtered.into_array(num_rows)?; let mut filter = array.as_boolean().clone(); - let true_count = filter.true_count(); - if true_count == 0 { + if !filter.has_true() { // nothing to filter, so no need to update return Ok(()); } + let true_count = filter.true_count(); // only update the keys / rows if the filter does not match all rows if true_count < num_rows { // Indices in `set_indices` should be correct if filter contains nulls diff --git a/datafusion/spark/src/function/array/array_contains.rs b/datafusion/spark/src/function/array/array_contains.rs index e0f99f7dd326e..9ae23468dad02 100644 --- a/datafusion/spark/src/function/array/array_contains.rs +++ b/datafusion/spark/src/function/array/array_contains.rs @@ -84,7 +84,7 @@ fn apply_spark_null_semantics( haystack_arg: &ColumnarValue, ) -> Result { // happy path - if result.false_count() == 0 || haystack_arg.data_type() == DataType::Null { + if !result.has_false() || haystack_arg.data_type() == DataType::Null { return Ok(result.clone()); }