Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions datafusion/datasource-parquet/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,12 +520,10 @@ fn summarize_column_statistics(

// handle the common special case when all row groups have exact statistics
let exactness = &is_max_value_exact_stat;
if !exactness.is_empty()
&& exactness.null_count() == 0
&& exactness.true_count() == exactness.len()
if !exactness.is_empty() && exactness.null_count() == 0 && !exactness.has_false()
{
accumulators.is_max_value_exact[logical_schema_index] = Some(true);
} else if exactness.true_count() == 0 {
} else if !exactness.has_true() {
accumulators.is_max_value_exact[logical_schema_index] = Some(false);
} else {
let val = max_acc.evaluate()?;
Expand All @@ -539,12 +537,10 @@ fn summarize_column_statistics(

// handle the common special case when all row groups have exact statistics
let exactness = &is_min_value_exact_stat;
if !exactness.is_empty()
&& exactness.null_count() == 0
&& exactness.true_count() == exactness.len()
if !exactness.is_empty() && exactness.null_count() == 0 && !exactness.has_false()
{
accumulators.is_min_value_exact[logical_schema_index] = Some(true);
} else if exactness.true_count() == 0 {
} else if !exactness.has_true() {
accumulators.is_min_value_exact[logical_schema_index] = Some(false);
} else {
let val = min_acc.evaluate()?;
Expand Down Expand Up @@ -675,7 +671,7 @@ fn has_any_exact_match(
let scalar_array = value.to_scalar().ok()?;
let eq_mask = eq(&scalar_array, &array).ok()?;
let combined_mask = and(&eq_mask, exactness).ok()?;
Some(combined_mask.true_count() > 0)
Some(combined_mask.has_true())
}

/// Wrapper to implement [`FileMetadata`] for [`ParquetMetaData`].
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions-nested/src/array_has.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ fn array_has_dispatch_for_array<'a>(
let is_nested = arr.data_type().is_nested();
let needle_row = Scalar::new(needle.slice(i, 1));
let eq_array = compare_with_eq(&arr, &needle_row, is_nested)?;
result.append(eq_array.true_count() > 0);
result.append(eq_array.has_true());
}

Ok(Arc::new(BooleanArray::new(result.finish(), combined_nulls)))
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions-nested/src/replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ fn general_replace<O: OffsetSizeTrait>(
let mut counter = 0;

// All elements are false, no need to replace, just copy original data
if eq_array.false_count() == eq_array.len() {
if !eq_array.has_true() {
mutable.extend(
original_idx.to_usize().unwrap(),
start.to_usize().unwrap(),
Expand Down
10 changes: 4 additions & 6 deletions datafusion/physical-expr-common/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::Arc;

use crate::utils::scatter;

use arrow::array::{ArrayRef, BooleanArray, new_empty_array};
use arrow::array::{Array, ArrayRef, BooleanArray, new_empty_array};
use arrow::compute::filter_record_batch;
use arrow::datatypes::{DataType, Field, FieldRef, Schema};
use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -109,17 +109,15 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
);
}

let selection_count = selection.true_count();

// First, check if we can avoid filtering altogether.
if selection_count == row_count {
if selection.null_count() == 0 && !selection.has_false() {
// All values from the `selection` filter are true and match the input batch.
// No need to perform any filtering.
return self.evaluate(batch);
}

// Next, prepare the result array for each 'true' row in the selection vector.
let filtered_result = if selection_count == 0 {
let filtered_result = if !selection.has_true() {
// Do not call `evaluate` when the selection is empty.
// `evaluate_selection` is used to conditionally evaluate expressions.
// When the expression in question is fallible, evaluating it with an empty
Expand All @@ -132,7 +130,7 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
// If we reach this point, there's no other option than to filter the batch.
// This is a fairly costly operation since it requires creating partial copies
// (worst case of length `row_count - 1`) of all the arrays in the record batch.
// The resulting `filtered_batch` will contain `selection_count` rows.
// The resulting `filtered_batch` will contain one row per true in `selection`.
let filtered_batch = filter_record_batch(batch, selection)?;
self.evaluate(&filtered_batch)?
};
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-expr-common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,18 @@ pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
};

let output_len = mask.len();
let count = mask.true_count();

// Fast path: no true values mean all-null object
if count == 0 {
if !mask.has_true() {
return Ok(new_null_array(truthy.data_type(), output_len));
}

// Fast path: all true means output = truthy
if count == output_len {
if mask.null_count() == 0 && !mask.has_false() {
return Ok(truthy.slice(0, truthy.len()));
}

let count = mask.true_count();
let selectivity = count as f64 / output_len as f64;
let mask_buffer = mask.values();

Expand Down
8 changes: 4 additions & 4 deletions datafusion/physical-expr/src/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,11 @@ impl PhysicalExpr for BinaryExpr {
ColumnarValue::Array(array) => {
// When the array on the right is all true or all false, skip the scatter process
let boolean_array = array.as_boolean();
let true_count = boolean_array.true_count();
let length = boolean_array.len();
if true_count == length {
if boolean_array.null_count() == 0 && !boolean_array.has_false() {
return Ok(lhs);
} else if true_count == 0 && boolean_array.null_count() == 0 {
} else if boolean_array.null_count() == 0
&& !boolean_array.has_true()
{
// If the right-hand array is returned at this point,the lengths will be inconsistent;
// returning a scalar can avoid this issue
return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(
Expand Down
21 changes: 6 additions & 15 deletions datafusion/physical-expr/src/expressions/case.rs
Original file line number Diff line number Diff line change
Expand Up @@ -793,17 +793,13 @@ impl CaseBody {
}
}?;

// `true_count` ignores `true` values where the validity bit is not set, so there's
// no need to call `prep_null_mask_filter`.
let when_true_count = when_value.true_count();

// If the 'when' predicate did not match any rows, continue to the next branch immediately
if when_true_count == 0 {
if !when_value.has_true() {
continue;
}

// If the 'when' predicate matched all remaining rows, there is no need to filter
if when_true_count == remainder_batch.num_rows() {
if when_value.null_count() == 0 && !when_value.has_false() {
let then_expression = &self.when_then_expr[i].1;
let then_value = then_expression.evaluate(&remainder_batch)?;
result_builder.add_branch_result(&remainder_rows, then_value)?;
Expand Down Expand Up @@ -882,17 +878,13 @@ impl CaseBody {
internal_datafusion_err!("WHEN expression did not return a BooleanArray")
})?;

// `true_count` ignores `true` values where the validity bit is not set, so there's
// no need to call `prep_null_mask_filter`.
let when_true_count = when_value.true_count();

// If the 'when' predicate did not match any rows, continue to the next branch immediately
if when_true_count == 0 {
if !when_value.has_true() {
continue;
}

// If the 'when' predicate matched all remaining rows, there is no need to filter
if when_true_count == remainder_batch.num_rows() {
if when_value.null_count() == 0 && !when_value.has_false() {
let then_expression = &self.when_then_expr[i].1;
let then_value = then_expression.evaluate(&remainder_batch)?;
result_builder.add_branch_result(&remainder_rows, then_value)?;
Expand Down Expand Up @@ -1144,11 +1136,10 @@ impl CaseExpr {
)
})?;

let true_count = when_value.true_count();
if true_count == when_value.len() {
if when_value.null_count() == 0 && !when_value.has_false() {
// All input rows are true, just call the 'then' expression
self.body.when_then_expr[0].1.evaluate(batch)
} else if true_count == 0 {
} else if !when_value.has_true() {
// All input rows are false/null, just call the 'else' expression
match &self.body.else_expr {
Some(else_expr) => else_expr.evaluate(batch),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/expressions/in_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ impl PhysicalExpr for InListExpr {
for expr in self.list.iter().skip(1) {
// Short-circuit: if every non-null row is already true,
// no further list items can change the result.
if found.null_count() == 0 && found.true_count() == num_rows {
if found.null_count() == 0 && !found.has_false() {
break;
}
found = or_kleene(&found, &compare_one(expr)?)?;
Expand Down
9 changes: 3 additions & 6 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2105,7 +2105,7 @@ impl NestedLoopJoinStream {
return Ok(None);
}

if cur_right_bitmap.true_count() == 0 {
if !cur_right_bitmap.has_true() {
// If none of the pairs has passed the join predicate/filter
Ok(None)
} else {
Expand Down Expand Up @@ -2300,11 +2300,8 @@ impl NestedLoopJoinStream {
) -> Result<()> {
let left_data = self.get_left_data()?;

// number of successfully joined pairs from (l_index x cur_right_batch)
let joined_len = r_matched_bitmap.true_count();

// 1. Maybe update the left bitmap
if need_produce_result_in_final(self.join_type) && (joined_len > 0) {
if need_produce_result_in_final(self.join_type) && r_matched_bitmap.has_true() {
let mut bitmap = left_data.bitmap().lock();
bitmap.set_bit(l_index, true);
}
Expand Down Expand Up @@ -2696,7 +2693,7 @@ fn build_unmatched_batch(
not(&batch_bitmap)?
};

if bitmap.true_count() == 0 {
if !bitmap.has_true() {
return Ok(None);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ pub fn filter_record_batch_by_join_type(
.unwrap();

// All rows passed the filter — no null-joining needed
if kept_corrected.true_count() == kept_corrected.len() {
if !kept_corrected.has_false() {
return Ok(kept_batch);
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/topk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,11 @@ impl TopK {
let num_rows = batch.num_rows();
let array = filtered.into_array(num_rows)?;
let mut filter = array.as_boolean().clone();
let true_count = filter.true_count();
if true_count == 0 {
if !filter.has_true() {
// nothing to filter, so no need to update
return Ok(());
}
let true_count = filter.true_count();
// only update the keys / rows if the filter does not match all rows
if true_count < num_rows {
// Indices in `set_indices` should be correct if filter contains nulls
Expand Down
2 changes: 1 addition & 1 deletion datafusion/spark/src/function/array/array_contains.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ fn apply_spark_null_semantics(
haystack_arg: &ColumnarValue,
) -> Result<BooleanArray> {
// happy path
if result.false_count() == 0 || haystack_arg.data_type() == DataType::Null {
if !result.has_false() || haystack_arg.data_type() == DataType::Null {
return Ok(result.clone());
}

Expand Down
Loading