Skip to content

Commit c09aaa0

Browse files
committed
bitmap_instead_of_hll_smaller_datatypes
1 parent 8ce93d2 commit c09aaa0

1 file changed

Lines changed: 11 additions & 160 deletions

File tree

datafusion/functions-aggregate/src/approx_distinct.rs

Lines changed: 11 additions & 160 deletions
Original file line numberDiff line numberDiff line change
@@ -215,153 +215,6 @@ impl Accumulator for BoolDistinctAccumulator {
215215
}
216216
}
217217

218-
#[derive(Debug)]
219-
struct Bitmap256Accumulator {
220-
/// 256 bits = 4 x u64, tracks values 0-255
221-
bitmap: [u64; 4],
222-
}
223-
224-
impl Bitmap256Accumulator {
225-
fn new() -> Self {
226-
Self { bitmap: [0; 4] }
227-
}
228-
229-
#[inline]
230-
fn set_bit(&mut self, value: u8) {
231-
let word = (value >> 6) as usize;
232-
let bit = value & 63;
233-
self.bitmap[word] |= 1u64 << bit;
234-
}
235-
236-
#[inline]
237-
fn count(&self) -> u64 {
238-
self.bitmap.iter().map(|w| w.count_ones() as u64).sum()
239-
}
240-
241-
fn merge(&mut self, other: &[u64; 4]) {
242-
for i in 0..4 {
243-
self.bitmap[i] |= other[i];
244-
}
245-
}
246-
}
247-
248-
impl Accumulator for Bitmap256Accumulator {
249-
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
250-
let array = values[0].as_primitive::<UInt8Type>();
251-
for value in array.iter().flatten() {
252-
self.set_bit(value);
253-
}
254-
Ok(())
255-
}
256-
257-
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
258-
let array = downcast_value!(states[0], BinaryArray);
259-
for data in array.iter().flatten() {
260-
if data.len() == 32 {
261-
// Convert &[u8] to [u64; 4]
262-
let mut other = [0u64; 4];
263-
for i in 0..4 {
264-
let offset = i * 8;
265-
other[i] =
266-
u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
267-
}
268-
self.merge(&other);
269-
}
270-
}
271-
Ok(())
272-
}
273-
274-
fn state(&mut self) -> Result<Vec<ScalarValue>> {
275-
// Serialize [u64; 4] as 32 bytes
276-
let mut bytes = Vec::with_capacity(32);
277-
for word in &self.bitmap {
278-
bytes.extend_from_slice(&word.to_le_bytes());
279-
}
280-
Ok(vec![ScalarValue::Binary(Some(bytes))])
281-
}
282-
283-
fn evaluate(&mut self) -> Result<ScalarValue> {
284-
Ok(ScalarValue::UInt64(Some(self.count())))
285-
}
286-
287-
fn size(&self) -> usize {
288-
size_of::<Self>()
289-
}
290-
}
291-
292-
#[derive(Debug)]
293-
struct Bitmap256AccumulatorI8 {
294-
bitmap: [u64; 4],
295-
}
296-
297-
impl Bitmap256AccumulatorI8 {
298-
fn new() -> Self {
299-
Self { bitmap: [0; 4] }
300-
}
301-
302-
#[inline]
303-
fn set_bit(&mut self, value: i8) {
304-
// Convert i8 to u8 by reinterpreting bits
305-
let idx = value as u8;
306-
let word = (idx >> 6) as usize;
307-
let bit = idx & 63;
308-
self.bitmap[word] |= 1u64 << bit;
309-
}
310-
311-
#[inline]
312-
fn count(&self) -> u64 {
313-
self.bitmap.iter().map(|w| w.count_ones() as u64).sum()
314-
}
315-
316-
fn merge(&mut self, other: &[u64; 4]) {
317-
for i in 0..4 {
318-
self.bitmap[i] |= other[i];
319-
}
320-
}
321-
}
322-
323-
impl Accumulator for Bitmap256AccumulatorI8 {
324-
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
325-
let array = values[0].as_primitive::<Int8Type>();
326-
for value in array.iter().flatten() {
327-
self.set_bit(value);
328-
}
329-
Ok(())
330-
}
331-
332-
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
333-
let array = downcast_value!(states[0], BinaryArray);
334-
for data in array.iter().flatten() {
335-
if data.len() == 32 {
336-
let mut other = [0u64; 4];
337-
for i in 0..4 {
338-
let offset = i * 8;
339-
other[i] =
340-
u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
341-
}
342-
self.merge(&other);
343-
}
344-
}
345-
Ok(())
346-
}
347-
348-
fn state(&mut self) -> Result<Vec<ScalarValue>> {
349-
let mut bytes = Vec::with_capacity(32);
350-
for word in &self.bitmap {
351-
bytes.extend_from_slice(&word.to_le_bytes());
352-
}
353-
Ok(vec![ScalarValue::Binary(Some(bytes))])
354-
}
355-
356-
fn evaluate(&mut self) -> Result<ScalarValue> {
357-
Ok(ScalarValue::UInt64(Some(self.count())))
358-
}
359-
360-
fn size(&self) -> usize {
361-
size_of::<Self>()
362-
}
363-
}
364-
365218
/// Accumulator for u16 distinct counting using a 65536-bit bitmap
366219
#[derive(Debug)]
367220
struct Bitmap65536Accumulator {
@@ -389,8 +242,8 @@ impl Bitmap65536Accumulator {
389242
}
390243

391244
fn merge(&mut self, other: &[u64; 1024]) {
392-
for i in 0..1024 {
393-
self.bitmap[i] |= other[i];
245+
for (dst, src) in self.bitmap.iter_mut().zip(other.iter()) {
246+
*dst |= src;
394247
}
395248
}
396249
}
@@ -409,9 +262,9 @@ impl Accumulator for Bitmap65536Accumulator {
409262
for data in array.iter().flatten() {
410263
if data.len() == 8192 {
411264
let mut other = [0u64; 1024];
412-
for i in 0..1024 {
265+
for (i, word) in other.iter_mut().enumerate() {
413266
let offset = i * 8;
414-
other[i] =
267+
*word =
415268
u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
416269
}
417270
self.merge(&other);
@@ -464,8 +317,8 @@ impl Bitmap65536AccumulatorI16 {
464317
}
465318

466319
fn merge(&mut self, other: &[u64; 1024]) {
467-
for i in 0..1024 {
468-
self.bitmap[i] |= other[i];
320+
for (dst, src) in self.bitmap.iter_mut().zip(other.iter()) {
321+
*dst |= src;
469322
}
470323
}
471324
}
@@ -484,9 +337,9 @@ impl Accumulator for Bitmap65536AccumulatorI16 {
484337
for data in array.iter().flatten() {
485338
if data.len() == 8192 {
486339
let mut other = [0u64; 1024];
487-
for i in 0..1024 {
340+
for (i, word) in other.iter_mut().enumerate() {
488341
let offset = i * 8;
489-
other[i] =
342+
*word =
490343
u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
491344
}
492345
self.merge(&other);
@@ -693,14 +546,12 @@ impl AggregateUDFImpl for ApproxDistinct {
693546
let data_type = acc_args.expr_fields[0].data_type();
694547

695548
let accumulator: Box<dyn Accumulator> = match data_type {
696-
// TODO u8, i8, u16, i16 shall really be done using bitmap, not HLL
697-
// TODO support for boolean (trivial case)
698-
// https://github.com/apache/datafusion/issues/1109
699-
DataType::UInt8 => Box::new(Bitmap256Accumulator::new()),
549+
// Benchmarked HLL to be faster than bitmap for u8/i8
550+
DataType::UInt8 => Box::new(NumericHLLAccumulator::<UInt8Type>::new()),
700551
DataType::UInt16 => Box::new(Bitmap65536Accumulator::new()),
701552
DataType::UInt32 => Box::new(NumericHLLAccumulator::<UInt32Type>::new()),
702553
DataType::UInt64 => Box::new(NumericHLLAccumulator::<UInt64Type>::new()),
703-
DataType::Int8 => Box::new(Bitmap256AccumulatorI8::new()),
554+
DataType::Int8 => Box::new(NumericHLLAccumulator::<Int8Type>::new()),
704555
DataType::Int16 => Box::new(Bitmap65536AccumulatorI16::new()),
705556
DataType::Int32 => Box::new(NumericHLLAccumulator::<Int32Type>::new()),
706557
DataType::Int64 => Box::new(NumericHLLAccumulator::<Int64Type>::new()),

0 commit comments

Comments
 (0)