Skip to content

Commit eb0632f

Browse files
committed
bitmap_instead_of_hll_smaller_datatypes
1 parent c09aaa0 commit eb0632f

2 files changed

Lines changed: 117 additions & 23 deletions

File tree

datafusion/functions-aggregate/benches/approx_distinct.rs

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
use std::sync::Arc;
1919

2020
use arrow::array::{
21-
ArrayRef, BooleanArray, Int8Array, Int16Array, Int64Array, StringArray,
22-
StringViewArray, UInt8Array, UInt16Array,
21+
ArrayRef, Int8Array, Int16Array, Int64Array, StringArray, StringViewArray,
22+
UInt8Array, UInt16Array,
2323
};
2424
use arrow::datatypes::{DataType, Field, Schema};
2525
use criterion::{Criterion, criterion_group, criterion_main};
@@ -91,13 +91,6 @@ fn create_i16_array(n_distinct: usize) -> Int16Array {
9191
.collect()
9292
}
9393

94-
fn create_bool_array() -> BooleanArray {
95-
let mut rng = StdRng::seed_from_u64(42);
96-
(0..BATCH_SIZE)
97-
.map(|_| Some(rng.random_bool(0.5)))
98-
.collect()
99-
}
100-
10194
/// Creates a pool of `n_distinct` random strings of the given length.
10295
fn create_string_pool(n_distinct: usize, string_length: usize) -> Vec<String> {
10396
let mut rng = StdRng::seed_from_u64(42);
@@ -221,17 +214,6 @@ fn approx_distinct_benchmark(c: &mut Criterion) {
221214
.unwrap()
222215
})
223216
});
224-
225-
// // Boolean - commented out for main comparison (not supported on main)
226-
// let values = Arc::new(create_bool_array()) as ArrayRef;
227-
// c.bench_function("approx_distinct bool bitmap", |b| {
228-
// b.iter(|| {
229-
// let mut accumulator = prepare_accumulator(DataType::Boolean);
230-
// accumulator
231-
// .update_batch(std::slice::from_ref(&values))
232-
// .unwrap()
233-
// })
234-
// });
235217
}
236218

237219
criterion_group!(benches, approx_distinct_benchmark);

datafusion/functions-aggregate/src/approx_distinct.rs

Lines changed: 115 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,118 @@ impl Accumulator for BoolDistinctAccumulator {
215215
}
216216
}
217217

218+
/// Accumulator for u8 distinct counting using a bool array
219+
#[derive(Debug)]
220+
struct BoolArray256Accumulator {
221+
seen: Box<[bool; 256]>,
222+
}
223+
224+
impl BoolArray256Accumulator {
225+
fn new() -> Self {
226+
Self {
227+
seen: Box::new([false; 256]),
228+
}
229+
}
230+
231+
#[inline]
232+
fn count(&self) -> u64 {
233+
self.seen.iter().filter(|&&b| b).count() as u64
234+
}
235+
}
236+
237+
impl Accumulator for BoolArray256Accumulator {
238+
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
239+
let array = values[0].as_primitive::<UInt8Type>();
240+
for value in array.iter().flatten() {
241+
self.seen[value as usize] = true;
242+
}
243+
Ok(())
244+
}
245+
246+
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
247+
let array = downcast_value!(states[0], BinaryArray);
248+
for data in array.iter().flatten() {
249+
if data.len() == 256 {
250+
for (i, &b) in data.iter().enumerate() {
251+
if b != 0 {
252+
self.seen[i] = true;
253+
}
254+
}
255+
}
256+
}
257+
Ok(())
258+
}
259+
260+
fn state(&mut self) -> Result<Vec<ScalarValue>> {
261+
let bytes: Vec<u8> = self.seen.iter().map(|&b| b as u8).collect();
262+
Ok(vec![ScalarValue::Binary(Some(bytes))])
263+
}
264+
265+
fn evaluate(&mut self) -> Result<ScalarValue> {
266+
Ok(ScalarValue::UInt64(Some(self.count())))
267+
}
268+
269+
fn size(&self) -> usize {
270+
size_of::<Self>() + 256
271+
}
272+
}
273+
274+
/// Accumulator for i8 distinct counting using a bool array
275+
#[derive(Debug)]
276+
struct BoolArray256AccumulatorI8 {
277+
seen: Box<[bool; 256]>,
278+
}
279+
280+
impl BoolArray256AccumulatorI8 {
281+
fn new() -> Self {
282+
Self {
283+
seen: Box::new([false; 256]),
284+
}
285+
}
286+
287+
#[inline]
288+
fn count(&self) -> u64 {
289+
self.seen.iter().filter(|&&b| b).count() as u64
290+
}
291+
}
292+
293+
impl Accumulator for BoolArray256AccumulatorI8 {
294+
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
295+
let array = values[0].as_primitive::<Int8Type>();
296+
for value in array.iter().flatten() {
297+
self.seen[value as u8 as usize] = true;
298+
}
299+
Ok(())
300+
}
301+
302+
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
303+
let array = downcast_value!(states[0], BinaryArray);
304+
for data in array.iter().flatten() {
305+
if data.len() == 256 {
306+
for (i, &b) in data.iter().enumerate() {
307+
if b != 0 {
308+
self.seen[i] = true;
309+
}
310+
}
311+
}
312+
}
313+
Ok(())
314+
}
315+
316+
fn state(&mut self) -> Result<Vec<ScalarValue>> {
317+
let bytes: Vec<u8> = self.seen.iter().map(|&b| b as u8).collect();
318+
Ok(vec![ScalarValue::Binary(Some(bytes))])
319+
}
320+
321+
fn evaluate(&mut self) -> Result<ScalarValue> {
322+
Ok(ScalarValue::UInt64(Some(self.count())))
323+
}
324+
325+
fn size(&self) -> usize {
326+
size_of::<Self>() + 256
327+
}
328+
}
329+
218330
/// Accumulator for u16 distinct counting using a 65536-bit bitmap
219331
#[derive(Debug)]
220332
struct Bitmap65536Accumulator {
@@ -546,12 +658,12 @@ impl AggregateUDFImpl for ApproxDistinct {
546658
let data_type = acc_args.expr_fields[0].data_type();
547659

548660
let accumulator: Box<dyn Accumulator> = match data_type {
549-
// Benchmarked HLL to be faster than bitmap for u8/i8
550-
DataType::UInt8 => Box::new(NumericHLLAccumulator::<UInt8Type>::new()),
661+
// Testing bool array for u8
662+
DataType::UInt8 => Box::new(BoolArray256Accumulator::new()),
551663
DataType::UInt16 => Box::new(Bitmap65536Accumulator::new()),
552664
DataType::UInt32 => Box::new(NumericHLLAccumulator::<UInt32Type>::new()),
553665
DataType::UInt64 => Box::new(NumericHLLAccumulator::<UInt64Type>::new()),
554-
DataType::Int8 => Box::new(NumericHLLAccumulator::<Int8Type>::new()),
666+
DataType::Int8 => Box::new(BoolArray256AccumulatorI8::new()),
555667
DataType::Int16 => Box::new(Bitmap65536AccumulatorI16::new()),
556668
DataType::Int32 => Box::new(NumericHLLAccumulator::<Int32Type>::new()),
557669
DataType::Int64 => Box::new(NumericHLLAccumulator::<Int64Type>::new()),

0 commit comments

Comments
 (0)