diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/boolean.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/boolean.rs index 91a39f28f33c1..5fdbe434f9f30 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/boolean.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/boolean.rs @@ -22,7 +22,6 @@ use crate::aggregates::group_values::multi_group_by::{GroupColumn, nulls_equal_t use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; use arrow::array::{Array as _, ArrayRef, AsArray, BooleanArray, BooleanBufferBuilder}; use datafusion_common::Result; -use itertools::izip; /// An implementation of [`GroupColumn`] for booleans /// @@ -81,19 +80,14 @@ impl GroupColumn for BooleanGroupValueBuilder { lhs_rows: &[usize], array: &ArrayRef, rhs_rows: &[usize], - equal_to_results: &mut [bool], + equal_to_results: &mut BooleanBufferBuilder, ) { let array = array.as_boolean(); - let iter = izip!( - lhs_rows.iter(), - rhs_rows.iter(), - equal_to_results.iter_mut(), - ); - - for (&lhs_row, &rhs_row, equal_to_result) in iter { - // Has found not equal to in previous column, don't need to check - if !*equal_to_result { + for (idx, (&lhs_row, &rhs_row)) in + lhs_rows.iter().zip(rhs_rows.iter()).enumerate() + { + if !equal_to_results.get_bit(idx) { continue; } @@ -101,12 +95,16 @@ impl GroupColumn for BooleanGroupValueBuilder { let exist_null = self.nulls.is_null(lhs_row); let input_null = array.is_null(rhs_row); if let Some(result) = nulls_equal_to(exist_null, input_null) { - *equal_to_result = result; + if !result { + equal_to_results.set_bit(idx, false); + } continue; } } - *equal_to_result = self.buffer.get_bit(lhs_row) == array.value(rhs_row); + if self.buffer.get_bit(lhs_row) != array.value(rhs_row) { + equal_to_results.set_bit(idx, false); + } } } @@ -195,10 +193,20 @@ impl GroupColumn for BooleanGroupValueBuilder { #[cfg(test)] mod tests { - use arrow::array::NullBufferBuilder; + use arrow::array::{BooleanBufferBuilder, NullBufferBuilder}; use super::*; + fn make_true_buffer(n: usize) -> BooleanBufferBuilder { + let mut buf = BooleanBufferBuilder::new(n); + buf.append_n(n, true); + buf + } + + fn to_vec(buf: &BooleanBufferBuilder) -> Vec { + (0..buf.len()).map(|i| buf.get_bit(i)).collect() + } + #[test] fn test_nullable_boolean_equal_to() { let append = |builder: &mut BooleanGroupValueBuilder, @@ -209,16 +217,18 @@ mod tests { } }; - let equal_to = |builder: &BooleanGroupValueBuilder, - lhs_rows: &[usize], - input_array: &ArrayRef, - rhs_rows: &[usize], - equal_to_results: &mut Vec| { - let iter = lhs_rows.iter().zip(rhs_rows.iter()); - for (idx, (&lhs_row, &rhs_row)) in iter.enumerate() { - equal_to_results[idx] = builder.equal_to(lhs_row, input_array, rhs_row); - } - }; + let equal_to = + |builder: &BooleanGroupValueBuilder, + lhs_rows: &[usize], + input_array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut BooleanBufferBuilder| { + let iter = lhs_rows.iter().zip(rhs_rows.iter()); + for (idx, (&lhs_row, &rhs_row)) in iter.enumerate() { + equal_to_results + .set_bit(idx, builder.equal_to(lhs_row, input_array, rhs_row)); + } + }; test_nullable_boolean_equal_to_internal(append, equal_to); } @@ -233,18 +243,19 @@ mod tests { .unwrap(); }; - let equal_to = |builder: &BooleanGroupValueBuilder, - lhs_rows: &[usize], - input_array: &ArrayRef, - rhs_rows: &[usize], - equal_to_results: &mut Vec| { - builder.vectorized_equal_to( - lhs_rows, - input_array, - rhs_rows, - equal_to_results, - ); - }; + let equal_to = + |builder: &BooleanGroupValueBuilder, + lhs_rows: &[usize], + input_array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut BooleanBufferBuilder| { + builder.vectorized_equal_to( + lhs_rows, + input_array, + rhs_rows, + equal_to_results, + ); + }; test_nullable_boolean_equal_to_internal(append, equal_to); } @@ -257,7 +268,7 @@ mod tests { &[usize], &ArrayRef, &[usize], - &mut Vec, + &mut BooleanBufferBuilder, ), { // Will cover such cases: @@ -268,7 +279,7 @@ mod tests { // - exist not null, input not null; values not equal // - exist not null, input not null; values equal - // Define PrimitiveGroupValueBuilder + // Define BooleanGroupValueBuilder let mut builder = BooleanGroupValueBuilder::::new(); let builder_array = Arc::new(BooleanArray::from(vec![ None, @@ -294,7 +305,7 @@ mod tests { // explicitly build a null buffer where one of the null values also happens to match let mut nulls = NullBufferBuilder::new(6); nulls.append_non_null(); - nulls.append_null(); // this sets Some(false) to null above + nulls.append_null(); nulls.append_null(); nulls.append_null(); nulls.append_non_null(); @@ -302,7 +313,7 @@ mod tests { let input_array = Arc::new(BooleanArray::new(values, nulls.finish())) as ArrayRef; // Check - let mut equal_to_results = vec![true; builder.len()]; + let mut equal_to_results = make_true_buffer(builder.len()); equal_to( &builder, &[0, 1, 2, 3, 4, 5], @@ -310,13 +321,14 @@ mod tests { &[0, 1, 2, 3, 4, 5], &mut equal_to_results, ); - - assert!(!equal_to_results[0]); - assert!(equal_to_results[1]); - assert!(equal_to_results[2]); - assert!(!equal_to_results[3]); - assert!(!equal_to_results[4]); - assert!(equal_to_results[5]); + let results = to_vec(&equal_to_results); + + assert!(!results[0]); + assert!(results[1]); + assert!(results[2]); + assert!(!results[3]); + assert!(!results[4]); + assert!(results[5]); } #[test] @@ -329,16 +341,18 @@ mod tests { } }; - let equal_to = |builder: &BooleanGroupValueBuilder, - lhs_rows: &[usize], - input_array: &ArrayRef, - rhs_rows: &[usize], - equal_to_results: &mut Vec| { - let iter = lhs_rows.iter().zip(rhs_rows.iter()); - for (idx, (&lhs_row, &rhs_row)) in iter.enumerate() { - equal_to_results[idx] = builder.equal_to(lhs_row, input_array, rhs_row); - } - }; + let equal_to = + |builder: &BooleanGroupValueBuilder, + lhs_rows: &[usize], + input_array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut BooleanBufferBuilder| { + let iter = lhs_rows.iter().zip(rhs_rows.iter()); + for (idx, (&lhs_row, &rhs_row)) in iter.enumerate() { + equal_to_results + .set_bit(idx, builder.equal_to(lhs_row, input_array, rhs_row)); + } + }; test_not_nullable_boolean_equal_to_internal(append, equal_to); } @@ -353,18 +367,19 @@ mod tests { .unwrap(); }; - let equal_to = |builder: &BooleanGroupValueBuilder, - lhs_rows: &[usize], - input_array: &ArrayRef, - rhs_rows: &[usize], - equal_to_results: &mut Vec| { - builder.vectorized_equal_to( - lhs_rows, - input_array, - rhs_rows, - equal_to_results, - ); - }; + let equal_to = + |builder: &BooleanGroupValueBuilder, + lhs_rows: &[usize], + input_array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut BooleanBufferBuilder| { + builder.vectorized_equal_to( + lhs_rows, + input_array, + rhs_rows, + equal_to_results, + ); + }; test_not_nullable_boolean_equal_to_internal(append, equal_to); } @@ -377,14 +392,14 @@ mod tests { &[usize], &ArrayRef, &[usize], - &mut Vec, + &mut BooleanBufferBuilder, ), { // Will cover such cases: // - values equal // - values not equal - // Define PrimitiveGroupValueBuilder + // Define BooleanGroupValueBuilder let mut builder = BooleanGroupValueBuilder::::new(); let builder_array = Arc::new(BooleanArray::from(vec![ Some(false), @@ -403,7 +418,7 @@ mod tests { ])) as ArrayRef; // Check - let mut equal_to_results = vec![true; builder.len()]; + let mut equal_to_results = make_true_buffer(builder.len()); equal_to( &builder, &[0, 1, 2, 3], @@ -411,11 +426,12 @@ mod tests { &[0, 1, 2, 3], &mut equal_to_results, ); + let results = to_vec(&equal_to_results); - assert!(equal_to_results[0]); - assert!(!equal_to_results[1]); - assert!(!equal_to_results[2]); - assert!(equal_to_results[3]); + assert!(results[0]); + assert!(!results[1]); + assert!(!results[2]); + assert!(results[3]); } #[test] @@ -432,19 +448,20 @@ mod tests { .vectorized_append(&all_nulls_input_array, &[0, 1, 2, 3, 4]) .unwrap(); - let mut equal_to_results = vec![true; all_nulls_input_array.len()]; + let mut equal_to_results = make_true_buffer(all_nulls_input_array.len()); builder.vectorized_equal_to( &[0, 1, 2, 3, 4], &all_nulls_input_array, &[0, 1, 2, 3, 4], &mut equal_to_results, ); + let results = to_vec(&equal_to_results); - assert!(equal_to_results[0]); - assert!(equal_to_results[1]); - assert!(equal_to_results[2]); - assert!(equal_to_results[3]); - assert!(equal_to_results[4]); + assert!(results[0]); + assert!(results[1]); + assert!(results[2]); + assert!(results[3]); + assert!(results[4]); // All not nulls input array let all_not_nulls_input_array = Arc::new(BooleanArray::from(vec![ @@ -458,18 +475,19 @@ mod tests { .vectorized_append(&all_not_nulls_input_array, &[0, 1, 2, 3, 4]) .unwrap(); - let mut equal_to_results = vec![true; all_not_nulls_input_array.len()]; + let mut equal_to_results = make_true_buffer(all_not_nulls_input_array.len()); builder.vectorized_equal_to( &[5, 6, 7, 8, 9], &all_not_nulls_input_array, &[0, 1, 2, 3, 4], &mut equal_to_results, ); + let results = to_vec(&equal_to_results); - assert!(equal_to_results[0]); - assert!(equal_to_results[1]); - assert!(equal_to_results[2]); - assert!(equal_to_results[3]); - assert!(equal_to_results[4]); + assert!(results[0]); + assert!(results[1]); + assert!(results[2]); + assert!(results[3]); + assert!(results[4]); } } diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs index cd173741b6464..e407be5e390dc 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs @@ -20,15 +20,14 @@ use crate::aggregates::group_values::multi_group_by::{ }; use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; use arrow::array::{ - Array, ArrayRef, AsArray, BufferBuilder, GenericBinaryArray, GenericByteArray, - GenericStringArray, OffsetSizeTrait, types::GenericStringType, + Array, ArrayRef, AsArray, BooleanBufferBuilder, BufferBuilder, GenericBinaryArray, + GenericByteArray, GenericStringArray, OffsetSizeTrait, types::GenericStringType, }; use arrow::buffer::{OffsetBuffer, ScalarBuffer}; use arrow::datatypes::{ByteArrayType, DataType, GenericBinaryType}; use datafusion_common::utils::proxy::VecAllocExt; use datafusion_common::{Result, exec_datafusion_err}; use datafusion_physical_expr_common::binary_map::{INITIAL_BUFFER_CAPACITY, OutputType}; -use itertools::izip; use std::mem::size_of; use std::sync::Arc; use std::vec; @@ -106,25 +105,22 @@ where lhs_rows: &[usize], array: &ArrayRef, rhs_rows: &[usize], - equal_to_results: &mut [bool], + equal_to_results: &mut BooleanBufferBuilder, ) where B: ByteArrayType, { let array = array.as_bytes::(); - let iter = izip!( - lhs_rows.iter(), - rhs_rows.iter(), - equal_to_results.iter_mut(), - ); - - for (&lhs_row, &rhs_row, equal_to_result) in iter { - // Has found not equal to, don't need to check - if !*equal_to_result { + for (idx, (&lhs_row, &rhs_row)) in + lhs_rows.iter().zip(rhs_rows.iter()).enumerate() + { + if !equal_to_results.get_bit(idx) { continue; } - *equal_to_result = self.do_equal_to_inner(lhs_row, array, rhs_row); + if !self.do_equal_to_inner(lhs_row, array, rhs_row) { + equal_to_results.set_bit(idx, false); + } } } @@ -275,7 +271,7 @@ where lhs_rows: &[usize], array: &ArrayRef, rhs_rows: &[usize], - equal_to_results: &mut [bool], + equal_to_results: &mut BooleanBufferBuilder, ) { // Sanity array type match self.output_type { @@ -433,12 +429,22 @@ mod tests { use std::sync::Arc; use crate::aggregates::group_values::multi_group_by::bytes::ByteGroupValueBuilder; - use arrow::array::{ArrayRef, NullBufferBuilder, StringArray}; + use arrow::array::{ArrayRef, BooleanBufferBuilder, NullBufferBuilder, StringArray}; use datafusion_common::DataFusionError; use datafusion_physical_expr::binary_map::OutputType; use super::GroupColumn; + fn make_true_buffer(n: usize) -> BooleanBufferBuilder { + let mut buf = BooleanBufferBuilder::new(n); + buf.append_n(n, true); + buf + } + + fn to_vec(buf: &BooleanBufferBuilder) -> Vec { + (0..buf.len()).map(|i| buf.get_bit(i)).collect() + } + #[test] fn test_byte_group_value_builder_overflow() { let mut builder = ByteGroupValueBuilder::::new(OutputType::Utf8); @@ -516,16 +522,18 @@ mod tests { } }; - let equal_to = |builder: &ByteGroupValueBuilder, - lhs_rows: &[usize], - input_array: &ArrayRef, - rhs_rows: &[usize], - equal_to_results: &mut Vec| { - let iter = lhs_rows.iter().zip(rhs_rows.iter()); - for (idx, (&lhs_row, &rhs_row)) in iter.enumerate() { - equal_to_results[idx] = builder.equal_to(lhs_row, input_array, rhs_row); - } - }; + let equal_to = + |builder: &ByteGroupValueBuilder, + lhs_rows: &[usize], + input_array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut BooleanBufferBuilder| { + let iter = lhs_rows.iter().zip(rhs_rows.iter()); + for (idx, (&lhs_row, &rhs_row)) in iter.enumerate() { + equal_to_results + .set_bit(idx, builder.equal_to(lhs_row, input_array, rhs_row)); + } + }; test_byte_equal_to_internal(append, equal_to); } @@ -540,18 +548,19 @@ mod tests { .unwrap(); }; - let equal_to = |builder: &ByteGroupValueBuilder, - lhs_rows: &[usize], - input_array: &ArrayRef, - rhs_rows: &[usize], - equal_to_results: &mut Vec| { - builder.vectorized_equal_to( - lhs_rows, - input_array, - rhs_rows, - equal_to_results, - ); - }; + let equal_to = + |builder: &ByteGroupValueBuilder, + lhs_rows: &[usize], + input_array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut BooleanBufferBuilder| { + builder.vectorized_equal_to( + lhs_rows, + input_array, + rhs_rows, + equal_to_results, + ); + }; test_byte_equal_to_internal(append, equal_to); } @@ -575,19 +584,20 @@ mod tests { .vectorized_append(&all_nulls_input_array, &[0, 1, 2, 3, 4]) .unwrap(); - let mut equal_to_results = vec![true; all_nulls_input_array.len()]; + let mut equal_to_results = make_true_buffer(all_nulls_input_array.len()); builder.vectorized_equal_to( &[0, 1, 2, 3, 4], &all_nulls_input_array, &[0, 1, 2, 3, 4], &mut equal_to_results, ); + let results = to_vec(&equal_to_results); - assert!(equal_to_results[0]); - assert!(equal_to_results[1]); - assert!(equal_to_results[2]); - assert!(equal_to_results[3]); - assert!(equal_to_results[4]); + assert!(results[0]); + assert!(results[1]); + assert!(results[2]); + assert!(results[3]); + assert!(results[4]); // All not nulls input array let all_not_nulls_input_array = Arc::new(StringArray::from(vec![ @@ -601,19 +611,20 @@ mod tests { .vectorized_append(&all_not_nulls_input_array, &[0, 1, 2, 3, 4]) .unwrap(); - let mut equal_to_results = vec![true; all_not_nulls_input_array.len()]; + let mut equal_to_results = make_true_buffer(all_not_nulls_input_array.len()); builder.vectorized_equal_to( &[5, 6, 7, 8, 9], &all_not_nulls_input_array, &[0, 1, 2, 3, 4], &mut equal_to_results, ); + let results = to_vec(&equal_to_results); - assert!(equal_to_results[0]); - assert!(equal_to_results[1]); - assert!(equal_to_results[2]); - assert!(equal_to_results[3]); - assert!(equal_to_results[4]); + assert!(results[0]); + assert!(results[1]); + assert!(results[2]); + assert!(results[3]); + assert!(results[4]); } fn test_byte_equal_to_internal(mut append: A, mut equal_to: E) @@ -624,7 +635,7 @@ mod tests { &[usize], &ArrayRef, &[usize], - &mut Vec, + &mut BooleanBufferBuilder, ), { // Will cover such cases: @@ -658,10 +669,10 @@ mod tests { ]) .into_parts(); - // explicitly build a boolean buffer where one of the null values also happens to match + // explicitly build a null buffer where one of the null values also happens to match let mut nulls = NullBufferBuilder::new(6); nulls.append_non_null(); - nulls.append_null(); // this sets Some("bar") to null above + nulls.append_null(); nulls.append_null(); nulls.append_null(); nulls.append_non_null(); @@ -670,7 +681,7 @@ mod tests { Arc::new(StringArray::new(offsets, buffer, nulls.finish())) as ArrayRef; // Check - let mut equal_to_results = vec![true; builder.len()]; + let mut equal_to_results = make_true_buffer(builder.len()); equal_to( &builder, &[0, 1, 2, 3, 4, 5], @@ -678,12 +689,13 @@ mod tests { &[0, 1, 2, 3, 4, 5], &mut equal_to_results, ); - - assert!(!equal_to_results[0]); - assert!(equal_to_results[1]); - assert!(equal_to_results[2]); - assert!(!equal_to_results[3]); - assert!(!equal_to_results[4]); - assert!(equal_to_results[5]); + let results = to_vec(&equal_to_results); + + assert!(!results[0]); + assert!(results[1]); + assert!(results[2]); + assert!(!results[3]); + assert!(!results[4]); + assert!(results[5]); } } diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs index a91dd3115d879..9267cf4f27f35 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs @@ -19,11 +19,13 @@ use crate::aggregates::group_values::multi_group_by::{ GroupColumn, Nulls, nulls_equal_to, }; use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; -use arrow::array::{Array, ArrayRef, AsArray, ByteView, GenericByteViewArray, make_view}; +use arrow::array::{ + Array, ArrayRef, AsArray, BooleanBufferBuilder, ByteView, GenericByteViewArray, + make_view, +}; use arrow::buffer::{Buffer, ScalarBuffer}; use arrow::datatypes::ByteViewType; use datafusion_common::Result; -use itertools::izip; use std::marker::PhantomData; use std::mem::{replace, size_of}; use std::sync::Arc; @@ -126,22 +128,19 @@ impl ByteViewGroupValueBuilder { lhs_rows: &[usize], array: &GenericByteViewArray, rhs_rows: &[usize], - equal_to_results: &mut [bool], + equal_to_results: &mut BooleanBufferBuilder, ) { - let iter = izip!( - lhs_rows.iter(), - rhs_rows.iter(), - equal_to_results.iter_mut(), - ); - - for (&lhs_row, &rhs_row, equal_to_result) in iter { - // Has found not equal to, don't need to check - if !*equal_to_result { + for (idx, (&lhs_row, &rhs_row)) in + lhs_rows.iter().zip(rhs_rows.iter()).enumerate() + { + if !equal_to_results.get_bit(idx) { continue; } - *equal_to_result = - self.do_equal_to_inner::(lhs_row, array, rhs_row); + if !self.do_equal_to_inner::(lhs_row, array, rhs_row) + { + equal_to_results.set_bit(idx, false); + } } } @@ -513,7 +512,7 @@ impl GroupColumn for ByteViewGroupValueBuilder { group_indices: &[usize], array: &ArrayRef, rows: &[usize], - equal_to_results: &mut [bool], + equal_to_results: &mut BooleanBufferBuilder, ) { let has_nulls = array.null_count() != 0; let array = array.as_byte_view::(); @@ -584,11 +583,23 @@ mod tests { use std::sync::Arc; use crate::aggregates::group_values::multi_group_by::bytes_view::ByteViewGroupValueBuilder; - use arrow::array::{ArrayRef, AsArray, NullBufferBuilder, StringViewArray}; + use arrow::array::{ + ArrayRef, AsArray, BooleanBufferBuilder, NullBufferBuilder, StringViewArray, + }; use arrow::datatypes::StringViewType; use super::GroupColumn; + fn make_true_buffer(n: usize) -> BooleanBufferBuilder { + let mut buf = BooleanBufferBuilder::new(n); + buf.append_n(n, true); + buf + } + + fn to_vec(buf: &BooleanBufferBuilder) -> Vec { + (0..buf.len()).map(|i| buf.get_bit(i)).collect() + } + #[test] fn test_byte_view_append_val() { let mut builder = @@ -623,16 +634,18 @@ mod tests { } }; - let equal_to = |builder: &ByteViewGroupValueBuilder, - lhs_rows: &[usize], - input_array: &ArrayRef, - rhs_rows: &[usize], - equal_to_results: &mut Vec| { - let iter = lhs_rows.iter().zip(rhs_rows.iter()); - for (idx, (&lhs_row, &rhs_row)) in iter.enumerate() { - equal_to_results[idx] = builder.equal_to(lhs_row, input_array, rhs_row); - } - }; + let equal_to = + |builder: &ByteViewGroupValueBuilder, + lhs_rows: &[usize], + input_array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut BooleanBufferBuilder| { + let iter = lhs_rows.iter().zip(rhs_rows.iter()); + for (idx, (&lhs_row, &rhs_row)) in iter.enumerate() { + equal_to_results + .set_bit(idx, builder.equal_to(lhs_row, input_array, rhs_row)); + } + }; test_byte_view_equal_to_internal(append, equal_to); } @@ -647,18 +660,19 @@ mod tests { .unwrap(); }; - let equal_to = |builder: &ByteViewGroupValueBuilder, - lhs_rows: &[usize], - input_array: &ArrayRef, - rhs_rows: &[usize], - equal_to_results: &mut Vec| { - builder.vectorized_equal_to( - lhs_rows, - input_array, - rhs_rows, - equal_to_results, - ); - }; + let equal_to = + |builder: &ByteViewGroupValueBuilder, + lhs_rows: &[usize], + input_array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut BooleanBufferBuilder| { + builder.vectorized_equal_to( + lhs_rows, + input_array, + rhs_rows, + equal_to_results, + ); + }; test_byte_view_equal_to_internal(append, equal_to); } @@ -683,19 +697,20 @@ mod tests { .vectorized_append(&all_nulls_input_array, &[0, 1, 2, 3, 4]) .unwrap(); - let mut equal_to_results = vec![true; all_nulls_input_array.len()]; + let mut equal_to_results = make_true_buffer(all_nulls_input_array.len()); builder.vectorized_equal_to( &[0, 1, 2, 3, 4], &all_nulls_input_array, &[0, 1, 2, 3, 4], &mut equal_to_results, ); + let results = to_vec(&equal_to_results); - assert!(equal_to_results[0]); - assert!(equal_to_results[1]); - assert!(equal_to_results[2]); - assert!(equal_to_results[3]); - assert!(equal_to_results[4]); + assert!(results[0]); + assert!(results[1]); + assert!(results[2]); + assert!(results[3]); + assert!(results[4]); // All not nulls input array let all_not_nulls_input_array = Arc::new(StringViewArray::from(vec![ @@ -709,19 +724,20 @@ mod tests { .vectorized_append(&all_not_nulls_input_array, &[0, 1, 2, 3, 4]) .unwrap(); - let mut equal_to_results = vec![true; all_not_nulls_input_array.len()]; + let mut equal_to_results = make_true_buffer(all_not_nulls_input_array.len()); builder.vectorized_equal_to( &[5, 6, 7, 8, 9], &all_not_nulls_input_array, &[0, 1, 2, 3, 4], &mut equal_to_results, ); + let results = to_vec(&equal_to_results); - assert!(equal_to_results[0]); - assert!(equal_to_results[1]); - assert!(equal_to_results[2]); - assert!(equal_to_results[3]); - assert!(equal_to_results[4]); + assert!(results[0]); + assert!(results[1]); + assert!(results[2]); + assert!(results[3]); + assert!(results[4]); } fn test_byte_view_equal_to_internal(mut append: A, mut equal_to: E) @@ -732,7 +748,7 @@ mod tests { &[usize], &ArrayRef, &[usize], - &mut Vec, + &mut BooleanBufferBuilder, ), { // Will cover such cases: @@ -779,7 +795,7 @@ mod tests { // Define input array let (views, buffer, _nulls) = StringViewArray::from(vec![ Some("foo"), - Some("bar"), // set to null + Some("bar"), None, None, Some("baz"), @@ -793,10 +809,10 @@ mod tests { ]) .into_parts(); - // explicitly build a boolean buffer where one of the null values also happens to match + // explicitly build a null buffer where one of the null values also happens to match let mut nulls = NullBufferBuilder::new(9); nulls.append_non_null(); - nulls.append_null(); // this sets Some("bar") to null above + nulls.append_null(); nulls.append_null(); nulls.append_null(); nulls.append_non_null(); @@ -811,7 +827,7 @@ mod tests { Arc::new(StringViewArray::new(views, buffer, nulls.finish())) as ArrayRef; // Check - let mut equal_to_results = vec![true; input_array.len()]; + let mut equal_to_results = make_true_buffer(input_array.len()); equal_to( &builder, &[0, 1, 2, 3, 4, 5, 6, 7, 7, 7, 8, 8], @@ -819,19 +835,20 @@ mod tests { &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], &mut equal_to_results, ); - - assert!(!equal_to_results[0]); - assert!(equal_to_results[1]); - assert!(equal_to_results[2]); - assert!(!equal_to_results[3]); - assert!(!equal_to_results[4]); - assert!(!equal_to_results[5]); - assert!(equal_to_results[6]); - assert!(!equal_to_results[7]); - assert!(!equal_to_results[8]); - assert!(equal_to_results[9]); - assert!(!equal_to_results[10]); - assert!(equal_to_results[11]); + let results = to_vec(&equal_to_results); + + assert!(!results[0]); + assert!(results[1]); + assert!(results[2]); + assert!(!results[3]); + assert!(!results[4]); + assert!(!results[5]); + assert!(results[6]); + assert!(!results[7]); + assert!(!results[8]); + assert!(results[9]); + assert!(!results[10]); + assert!(results[11]); } #[test] diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index cc4576eabddbd..2115e9a34da64 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -29,7 +29,7 @@ use crate::aggregates::group_values::multi_group_by::{ boolean::BooleanGroupValueBuilder, bytes::ByteGroupValueBuilder, bytes_view::ByteViewGroupValueBuilder, primitive::PrimitiveGroupValueBuilder, }; -use arrow::array::{Array, ArrayRef}; +use arrow::array::{Array, ArrayRef, BooleanBufferBuilder}; use arrow::compute::cast; use arrow::datatypes::{ BinaryViewType, DataType, Date32Type, Date64Type, Decimal128Type, Float32Type, @@ -82,7 +82,7 @@ pub trait GroupColumn: Send + Sync { lhs_rows: &[usize], array: &ArrayRef, rhs_rows: &[usize], - equal_to_results: &mut [bool], + equal_to_results: &mut BooleanBufferBuilder, ); /// The vectorized version `append_val` @@ -224,7 +224,6 @@ pub struct GroupValuesColumn { /// Buffers to store intermediate results in `vectorized_append` /// and `vectorized_equal_to`, for reducing memory allocation -#[derive(Default)] struct VectorizedOperationBuffers { /// The `vectorized append` row indices buffer append_row_indices: Vec, @@ -235,8 +234,8 @@ struct VectorizedOperationBuffers { /// The `vectorized_equal_to` group indices buffer equal_to_group_indices: Vec, - /// The `vectorized_equal_to` result buffer - equal_to_results: Vec, + /// The `vectorized_equal_to` result buffer (bitmask) + equal_to_results: BooleanBufferBuilder, /// The buffer for storing row indices found not equal to /// exist groups in `group_values` in `vectorized_equal_to`. @@ -244,12 +243,23 @@ struct VectorizedOperationBuffers { remaining_row_indices: Vec, } +impl Default for VectorizedOperationBuffers { + fn default() -> Self { + Self { + append_row_indices: Vec::new(), + equal_to_row_indices: Vec::new(), + equal_to_group_indices: Vec::new(), + equal_to_results: BooleanBufferBuilder::new(0), + remaining_row_indices: Vec::new(), + } + } +} + impl VectorizedOperationBuffers { fn clear(&mut self) { self.append_row_indices.clear(); self.equal_to_row_indices.clear(); self.equal_to_group_indices.clear(); - self.equal_to_results.clear(); self.remaining_row_indices.clear(); } } @@ -615,15 +625,16 @@ impl GroupValuesColumn { // 1. Perform `vectorized_equal_to` for `rows` in `vectorized_equal_to_group_indices` // and `group_indices` in `vectorized_equal_to_group_indices` - let mut equal_to_results = - mem::take(&mut self.vectorized_operation_buffers.equal_to_results); - equal_to_results.clear(); - equal_to_results.resize( - self.vectorized_operation_buffers - .equal_to_group_indices - .len(), - true, + let n = self + .vectorized_operation_buffers + .equal_to_group_indices + .len(); + let mut equal_to_results = mem::replace( + &mut self.vectorized_operation_buffers.equal_to_results, + BooleanBufferBuilder::new(0), ); + equal_to_results.truncate(0); + equal_to_results.append_n(n, true); for (col_idx, group_col) in self.group_values.iter().enumerate() { group_col.vectorized_equal_to( @@ -643,7 +654,7 @@ impl GroupValuesColumn { .iter() .enumerate() { - let equal_to_result = equal_to_results[idx]; + let equal_to_result = equal_to_results.get_bit(idx); // Equal to case, set the `group_indices` to `rows` in `groups` if equal_to_result { diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs index 31126348b3fd4..bdc06fa553de5 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs @@ -20,12 +20,15 @@ use crate::aggregates::group_values::multi_group_by::{ }; use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; use arrow::array::ArrowNativeTypeOp; -use arrow::array::{Array, ArrayRef, ArrowPrimitiveType, PrimitiveArray, cast::AsArray}; +use arrow::array::{ + Array, ArrayRef, ArrowPrimitiveType, BooleanBufferBuilder, PrimitiveArray, + cast::AsArray, +}; use arrow::buffer::ScalarBuffer; use arrow::datatypes::DataType; +use arrow::util::bit_util::apply_bitwise_binary_op; use datafusion_common::Result; use datafusion_execution::memory_pool::proxy::VecAllocExt; -use itertools::izip; use std::iter; use std::sync::Arc; @@ -62,43 +65,45 @@ where lhs_rows: &[usize], array: &ArrayRef, rhs_rows: &[usize], - equal_to_results: &mut [bool], + equal_to_results: &mut BooleanBufferBuilder, ) { assert!( !NULLABLE || (array.null_count() == 0 && !self.nulls.might_have_nulls()), "called with nullable input" ); let array_values = array.as_primitive::().values(); + let n = lhs_rows.len(); - let iter = izip!( - lhs_rows.iter(), - rhs_rows.iter(), - equal_to_results.iter_mut(), - ); + // Build a packed comparison bitmask, then AND it into equal_to_results + let num_bytes = n.div_ceil(8); + let mut cmp_buf = vec![0u8; num_bytes]; - for (&lhs_row, &rhs_row, equal_to_result) in iter { - let result = { - // Getting unchecked not only for bound checks but because the bound checks are - // what prevents auto-vectorization - let left = if cfg!(debug_assertions) { - self.group_values[lhs_row] - } else { - // SAFETY: indices are guaranteed to be in bounds - unsafe { *self.group_values.get_unchecked(lhs_row) } - }; - let right = if cfg!(debug_assertions) { - array_values[rhs_row] - } else { - // SAFETY: indices are guaranteed to be in bounds - unsafe { *array_values.get_unchecked(rhs_row) } - }; - - // Always evaluate, to allow for auto-vectorization - left.is_eq(right) + for (i, (&lhs_row, &rhs_row)) in lhs_rows.iter().zip(rhs_rows.iter()).enumerate() + { + let left = if cfg!(debug_assertions) { + self.group_values[lhs_row] + } else { + unsafe { *self.group_values.get_unchecked(lhs_row) } }; - - *equal_to_result = result && *equal_to_result; + let right = if cfg!(debug_assertions) { + array_values[rhs_row] + } else { + unsafe { *array_values.get_unchecked(rhs_row) } + }; + if left.is_eq(right) { + cmp_buf[i / 8] |= 1 << (i % 8); + } } + + // AND the comparison result into the existing equal_to_results bitmask + apply_bitwise_binary_op( + equal_to_results.as_slice_mut(), + 0, + &cmp_buf, + 0, + n, + |a, b| a & b, + ); } pub fn vectorized_equal_nullable( @@ -106,33 +111,30 @@ where lhs_rows: &[usize], array: &ArrayRef, rhs_rows: &[usize], - equal_to_results: &mut [bool], + equal_to_results: &mut BooleanBufferBuilder, ) { assert!(NULLABLE, "called with non-nullable input"); let array = array.as_primitive::(); - let iter = izip!( - lhs_rows.iter(), - rhs_rows.iter(), - equal_to_results.iter_mut(), - ); - - for (&lhs_row, &rhs_row, equal_to_result) in iter { - // Has found not equal to in previous column, don't need to check - if !*equal_to_result { + for (idx, (&lhs_row, &rhs_row)) in + lhs_rows.iter().zip(rhs_rows.iter()).enumerate() + { + if !equal_to_results.get_bit(idx) { continue; } - // Perf: skip null check (by short circuit) if input is not nullable let exist_null = self.nulls.is_null(lhs_row); let input_null = array.is_null(rhs_row); if let Some(result) = nulls_equal_to(exist_null, input_null) { - *equal_to_result = result; + if !result { + equal_to_results.set_bit(idx, false); + } continue; } - // Otherwise, we need to check their values - *equal_to_result = self.group_values[lhs_row].is_eq(array.value(rhs_row)); + if !self.group_values[lhs_row].is_eq(array.value(rhs_row)) { + equal_to_results.set_bit(idx, false); + } } } } @@ -176,7 +178,7 @@ impl GroupColumn lhs_rows: &[usize], array: &ArrayRef, rhs_rows: &[usize], - equal_to_results: &mut [bool], + equal_to_results: &mut BooleanBufferBuilder, ) { if !NULLABLE || (array.null_count() == 0 && !self.nulls.might_have_nulls()) { self.vectorized_equal_to_non_nullable( @@ -281,11 +283,23 @@ mod tests { use std::sync::Arc; use crate::aggregates::group_values::multi_group_by::primitive::PrimitiveGroupValueBuilder; - use arrow::array::{ArrayRef, Float32Array, Int64Array, NullBufferBuilder}; + use arrow::array::{ + ArrayRef, BooleanBufferBuilder, Float32Array, Int64Array, NullBufferBuilder, + }; use arrow::datatypes::{DataType, Float32Type, Int64Type}; use super::GroupColumn; + fn make_true_buffer(n: usize) -> BooleanBufferBuilder { + let mut buf = BooleanBufferBuilder::new(n); + buf.append_n(n, true); + buf + } + + fn to_vec(buf: &BooleanBufferBuilder) -> Vec { + (0..buf.len()).map(|i| buf.get_bit(i)).collect() + } + #[test] fn test_nullable_primitive_equal_to() { let append = |builder: &mut PrimitiveGroupValueBuilder, @@ -296,16 +310,18 @@ mod tests { } }; - let equal_to = |builder: &PrimitiveGroupValueBuilder, - lhs_rows: &[usize], - input_array: &ArrayRef, - rhs_rows: &[usize], - equal_to_results: &mut Vec| { - let iter = lhs_rows.iter().zip(rhs_rows.iter()); - for (idx, (&lhs_row, &rhs_row)) in iter.enumerate() { - equal_to_results[idx] = builder.equal_to(lhs_row, input_array, rhs_row); - } - }; + let equal_to = + |builder: &PrimitiveGroupValueBuilder, + lhs_rows: &[usize], + input_array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut BooleanBufferBuilder| { + let iter = lhs_rows.iter().zip(rhs_rows.iter()); + for (idx, (&lhs_row, &rhs_row)) in iter.enumerate() { + equal_to_results + .set_bit(idx, builder.equal_to(lhs_row, input_array, rhs_row)); + } + }; test_nullable_primitive_equal_to_internal(append, equal_to); } @@ -320,18 +336,19 @@ mod tests { .unwrap(); }; - let equal_to = |builder: &PrimitiveGroupValueBuilder, - lhs_rows: &[usize], - input_array: &ArrayRef, - rhs_rows: &[usize], - equal_to_results: &mut Vec| { - builder.vectorized_equal_to( - lhs_rows, - input_array, - rhs_rows, - equal_to_results, - ); - }; + let equal_to = + |builder: &PrimitiveGroupValueBuilder, + lhs_rows: &[usize], + input_array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut BooleanBufferBuilder| { + builder.vectorized_equal_to( + lhs_rows, + input_array, + rhs_rows, + equal_to_results, + ); + }; test_nullable_primitive_equal_to_internal(append, equal_to); } @@ -344,7 +361,7 @@ mod tests { &[usize], &ArrayRef, &[usize], - &mut Vec, + &mut BooleanBufferBuilder, ), { // Will cover such cases: @@ -384,7 +401,7 @@ mod tests { // explicitly build a null buffer where one of the null values also happens to match let mut nulls = NullBufferBuilder::new(6); nulls.append_non_null(); - nulls.append_null(); // this sets Some(2) to null above + nulls.append_null(); nulls.append_null(); nulls.append_non_null(); nulls.append_null(); @@ -393,7 +410,7 @@ mod tests { let input_array = Arc::new(Float32Array::new(values, nulls.finish())) as ArrayRef; // Check - let mut equal_to_results = vec![true; builder.len()]; + let mut equal_to_results = make_true_buffer(builder.len()); equal_to( &builder, &[0, 1, 2, 3, 4, 5, 6], @@ -401,14 +418,15 @@ mod tests { &[0, 1, 2, 3, 4, 5, 6], &mut equal_to_results, ); - - assert!(!equal_to_results[0]); - assert!(equal_to_results[1]); - assert!(equal_to_results[2]); - assert!(equal_to_results[3]); - assert!(!equal_to_results[4]); - assert!(equal_to_results[5]); - assert!(!equal_to_results[6]); + let results = to_vec(&equal_to_results); + + assert!(!results[0]); + assert!(results[1]); + assert!(results[2]); + assert!(results[3]); + assert!(!results[4]); + assert!(results[5]); + assert!(!results[6]); } #[test] @@ -421,16 +439,18 @@ mod tests { } }; - let equal_to = |builder: &PrimitiveGroupValueBuilder, - lhs_rows: &[usize], - input_array: &ArrayRef, - rhs_rows: &[usize], - equal_to_results: &mut Vec| { - let iter = lhs_rows.iter().zip(rhs_rows.iter()); - for (idx, (&lhs_row, &rhs_row)) in iter.enumerate() { - equal_to_results[idx] = builder.equal_to(lhs_row, input_array, rhs_row); - } - }; + let equal_to = + |builder: &PrimitiveGroupValueBuilder, + lhs_rows: &[usize], + input_array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut BooleanBufferBuilder| { + let iter = lhs_rows.iter().zip(rhs_rows.iter()); + for (idx, (&lhs_row, &rhs_row)) in iter.enumerate() { + equal_to_results + .set_bit(idx, builder.equal_to(lhs_row, input_array, rhs_row)); + } + }; test_not_nullable_primitive_equal_to_internal(append, equal_to); } @@ -445,18 +465,19 @@ mod tests { .unwrap(); }; - let equal_to = |builder: &PrimitiveGroupValueBuilder, - lhs_rows: &[usize], - input_array: &ArrayRef, - rhs_rows: &[usize], - equal_to_results: &mut Vec| { - builder.vectorized_equal_to( - lhs_rows, - input_array, - rhs_rows, - equal_to_results, - ); - }; + let equal_to = + |builder: &PrimitiveGroupValueBuilder, + lhs_rows: &[usize], + input_array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut BooleanBufferBuilder| { + builder.vectorized_equal_to( + lhs_rows, + input_array, + rhs_rows, + equal_to_results, + ); + }; test_not_nullable_primitive_equal_to_internal(append, equal_to); } @@ -469,7 +490,7 @@ mod tests { &[usize], &ArrayRef, &[usize], - &mut Vec, + &mut BooleanBufferBuilder, ), { // Will cover such cases: @@ -487,7 +508,7 @@ mod tests { let input_array = Arc::new(Int64Array::from(vec![Some(0), Some(2)])) as ArrayRef; // Check - let mut equal_to_results = vec![true; builder.len()]; + let mut equal_to_results = make_true_buffer(builder.len()); equal_to( &builder, &[0, 1], @@ -495,9 +516,10 @@ mod tests { &[0, 1], &mut equal_to_results, ); + let results = to_vec(&equal_to_results); - assert!(equal_to_results[0]); - assert!(!equal_to_results[1]); + assert!(results[0]); + assert!(!results[1]); } #[test] @@ -520,19 +542,20 @@ mod tests { .vectorized_append(&all_nulls_input_array, &[0, 1, 2, 3, 4]) .unwrap(); - let mut equal_to_results = vec![true; all_nulls_input_array.len()]; + let mut equal_to_results = make_true_buffer(all_nulls_input_array.len()); builder.vectorized_equal_to( &[0, 1, 2, 3, 4], &all_nulls_input_array, &[0, 1, 2, 3, 4], &mut equal_to_results, ); + let results = to_vec(&equal_to_results); - assert!(equal_to_results[0]); - assert!(equal_to_results[1]); - assert!(equal_to_results[2]); - assert!(equal_to_results[3]); - assert!(equal_to_results[4]); + assert!(results[0]); + assert!(results[1]); + assert!(results[2]); + assert!(results[3]); + assert!(results[4]); // All not nulls input array let all_not_nulls_input_array = Arc::new(Int64Array::from(vec![ @@ -546,18 +569,19 @@ mod tests { .vectorized_append(&all_not_nulls_input_array, &[0, 1, 2, 3, 4]) .unwrap(); - let mut equal_to_results = vec![true; all_not_nulls_input_array.len()]; + let mut equal_to_results = make_true_buffer(all_not_nulls_input_array.len()); builder.vectorized_equal_to( &[5, 6, 7, 8, 9], &all_not_nulls_input_array, &[0, 1, 2, 3, 4], &mut equal_to_results, ); + let results = to_vec(&equal_to_results); - assert!(equal_to_results[0]); - assert!(equal_to_results[1]); - assert!(equal_to_results[2]); - assert!(equal_to_results[3]); - assert!(equal_to_results[4]); + assert!(results[0]); + assert!(results[1]); + assert!(results[2]); + assert!(results[3]); + assert!(results[4]); } }