Skip to content

Commit f03c0d0

Browse files
committed
bitmap_smaller_datatypes
1 parent 8367a75 commit f03c0d0

1 file changed

Lines changed: 358 additions & 5 deletions

File tree

datafusion/functions-aggregate/src/approx_distinct.rs

Lines changed: 358 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
//! Defines physical expressions that can evaluated at runtime during query execution
1919
2020
use crate::hyperloglog::{HLL_HASH_STATE, HyperLogLog};
21-
use arrow::array::{Array, BinaryArray, StringViewArray};
21+
use arrow::array::{Array, AsArray, BinaryArray, BooleanArray, StringViewArray};
2222
use arrow::array::{
2323
GenericBinaryArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray,
2424
};
@@ -160,6 +160,358 @@ where
160160
}
161161
}
162162

163+
#[derive(Debug)]
164+
struct BoolDistinctAccumulator {
165+
seen_true: bool,
166+
seen_false: bool,
167+
}
168+
169+
impl BoolDistinctAccumulator {
170+
fn new() -> Self {
171+
Self {
172+
seen_true: false,
173+
seen_false: false,
174+
}
175+
}
176+
}
177+
178+
impl Accumulator for BoolDistinctAccumulator {
179+
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
180+
let array: &BooleanArray = downcast_value!(values[0], BooleanArray);
181+
for value in array.iter().flatten() {
182+
if value {
183+
self.seen_true = true;
184+
} else {
185+
self.seen_false = true;
186+
}
187+
}
188+
Ok(())
189+
}
190+
191+
fn evaluate(&mut self) -> Result<ScalarValue> {
192+
let count = (self.seen_true as u64) + (self.seen_false as u64);
193+
Ok(ScalarValue::UInt64(Some(count)))
194+
}
195+
196+
fn size(&self) -> usize {
197+
size_of::<Self>()
198+
}
199+
200+
fn state(&mut self) -> Result<Vec<ScalarValue>> {
201+
// Pack into 1 byte: bit 0 = seen_false, bit 1 = seen_true
202+
let packed = (self.seen_false as u8) | ((self.seen_true as u8) << 1);
203+
Ok(vec![ScalarValue::Binary(Some(vec![packed]))])
204+
}
205+
206+
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
207+
let array = downcast_value!(states[0], BinaryArray);
208+
for data in array.iter().flatten() {
209+
if !data.is_empty() {
210+
self.seen_false |= (data[0] & 1) != 0;
211+
self.seen_true |= (data[0] & 2) != 0;
212+
}
213+
}
214+
Ok(())
215+
}
216+
}
217+
218+
#[derive(Debug)]
219+
struct Bitmap256Accumulator {
220+
/// 256 bits = 4 x u64, tracks values 0-255
221+
bitmap: [u64; 4],
222+
}
223+
224+
impl Bitmap256Accumulator {
225+
fn new() -> Self {
226+
Self { bitmap: [0; 4] }
227+
}
228+
229+
#[inline]
230+
fn set_bit(&mut self, value: u8) {
231+
let word = (value / 64) as usize;
232+
let bit = value % 64;
233+
self.bitmap[word] |= 1u64 << bit;
234+
}
235+
236+
#[inline]
237+
fn count(&self) -> u64 {
238+
self.bitmap.iter().map(|w| w.count_ones() as u64).sum()
239+
}
240+
241+
fn merge(&mut self, other: &[u64; 4]) {
242+
for i in 0..4 {
243+
self.bitmap[i] |= other[i];
244+
}
245+
}
246+
}
247+
248+
impl Accumulator for Bitmap256Accumulator {
249+
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
250+
let array = values[0].as_primitive::<UInt8Type>();
251+
for value in array.iter().flatten() {
252+
self.set_bit(value);
253+
}
254+
Ok(())
255+
}
256+
257+
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
258+
let array = downcast_value!(states[0], BinaryArray);
259+
for data in array.iter().flatten() {
260+
if data.len() == 32 {
261+
// Convert &[u8] to [u64; 4]
262+
let mut other = [0u64; 4];
263+
for i in 0..4 {
264+
let offset = i * 8;
265+
other[i] =
266+
u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
267+
}
268+
self.merge(&other);
269+
}
270+
}
271+
Ok(())
272+
}
273+
274+
fn state(&mut self) -> Result<Vec<ScalarValue>> {
275+
// Serialize [u64; 4] as 32 bytes
276+
let mut bytes = Vec::with_capacity(32);
277+
for word in &self.bitmap {
278+
bytes.extend_from_slice(&word.to_le_bytes());
279+
}
280+
Ok(vec![ScalarValue::Binary(Some(bytes))])
281+
}
282+
283+
fn evaluate(&mut self) -> Result<ScalarValue> {
284+
Ok(ScalarValue::UInt64(Some(self.count())))
285+
}
286+
287+
fn size(&self) -> usize {
288+
size_of::<Self>()
289+
}
290+
}
291+
292+
#[derive(Debug)]
293+
struct Bitmap256AccumulatorI8 {
294+
bitmap: [u64; 4],
295+
}
296+
297+
impl Bitmap256AccumulatorI8 {
298+
fn new() -> Self {
299+
Self { bitmap: [0; 4] }
300+
}
301+
302+
#[inline]
303+
fn set_bit(&mut self, value: i8) {
304+
// Convert i8 to u8 by reinterpreting bits
305+
let idx = value as u8;
306+
let word = (idx / 64) as usize;
307+
let bit = idx % 64;
308+
self.bitmap[word] |= 1u64 << bit;
309+
}
310+
311+
#[inline]
312+
fn count(&self) -> u64 {
313+
self.bitmap.iter().map(|w| w.count_ones() as u64).sum()
314+
}
315+
316+
fn merge(&mut self, other: &[u64; 4]) {
317+
for i in 0..4 {
318+
self.bitmap[i] |= other[i];
319+
}
320+
}
321+
}
322+
323+
impl Accumulator for Bitmap256AccumulatorI8 {
324+
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
325+
let array = values[0].as_primitive::<Int8Type>();
326+
for value in array.iter().flatten() {
327+
self.set_bit(value);
328+
}
329+
Ok(())
330+
}
331+
332+
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
333+
let array = downcast_value!(states[0], BinaryArray);
334+
for data in array.iter().flatten() {
335+
if data.len() == 32 {
336+
let mut other = [0u64; 4];
337+
for i in 0..4 {
338+
let offset = i * 8;
339+
other[i] =
340+
u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
341+
}
342+
self.merge(&other);
343+
}
344+
}
345+
Ok(())
346+
}
347+
348+
fn state(&mut self) -> Result<Vec<ScalarValue>> {
349+
let mut bytes = Vec::with_capacity(32);
350+
for word in &self.bitmap {
351+
bytes.extend_from_slice(&word.to_le_bytes());
352+
}
353+
Ok(vec![ScalarValue::Binary(Some(bytes))])
354+
}
355+
356+
fn evaluate(&mut self) -> Result<ScalarValue> {
357+
Ok(ScalarValue::UInt64(Some(self.count())))
358+
}
359+
360+
fn size(&self) -> usize {
361+
size_of::<Self>()
362+
}
363+
}
364+
365+
/// Accumulator for u16 distinct counting using a 65536-bit bitmap
366+
#[derive(Debug)]
367+
struct Bitmap65536Accumulator {
368+
/// 65536 bits = 1024 x u64, tracks values 0-65535
369+
bitmap: Box<[u64; 1024]>,
370+
}
371+
372+
impl Bitmap65536Accumulator {
373+
fn new() -> Self {
374+
Self {
375+
bitmap: Box::new([0; 1024]),
376+
}
377+
}
378+
379+
#[inline]
380+
fn set_bit(&mut self, value: u16) {
381+
let word = (value / 64) as usize;
382+
let bit = value % 64;
383+
self.bitmap[word] |= 1u64 << bit;
384+
}
385+
386+
#[inline]
387+
fn count(&self) -> u64 {
388+
self.bitmap.iter().map(|w| w.count_ones() as u64).sum()
389+
}
390+
391+
fn merge(&mut self, other: &[u64; 1024]) {
392+
for i in 0..1024 {
393+
self.bitmap[i] |= other[i];
394+
}
395+
}
396+
}
397+
398+
impl Accumulator for Bitmap65536Accumulator {
399+
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
400+
let array = values[0].as_primitive::<UInt16Type>();
401+
for value in array.iter().flatten() {
402+
self.set_bit(value);
403+
}
404+
Ok(())
405+
}
406+
407+
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
408+
let array = downcast_value!(states[0], BinaryArray);
409+
for data in array.iter().flatten() {
410+
if data.len() == 8192 {
411+
let mut other = [0u64; 1024];
412+
for i in 0..1024 {
413+
let offset = i * 8;
414+
other[i] =
415+
u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
416+
}
417+
self.merge(&other);
418+
}
419+
}
420+
Ok(())
421+
}
422+
423+
fn state(&mut self) -> Result<Vec<ScalarValue>> {
424+
let mut bytes = Vec::with_capacity(8192);
425+
for word in self.bitmap.iter() {
426+
bytes.extend_from_slice(&word.to_le_bytes());
427+
}
428+
Ok(vec![ScalarValue::Binary(Some(bytes))])
429+
}
430+
431+
fn evaluate(&mut self) -> Result<ScalarValue> {
432+
Ok(ScalarValue::UInt64(Some(self.count())))
433+
}
434+
435+
fn size(&self) -> usize {
436+
size_of::<Self>() + 8192
437+
}
438+
}
439+
440+
/// Accumulator for i16 distinct counting using a 65536-bit bitmap
441+
#[derive(Debug)]
442+
struct Bitmap65536AccumulatorI16 {
443+
bitmap: Box<[u64; 1024]>,
444+
}
445+
446+
impl Bitmap65536AccumulatorI16 {
447+
fn new() -> Self {
448+
Self {
449+
bitmap: Box::new([0; 1024]),
450+
}
451+
}
452+
453+
#[inline]
454+
fn set_bit(&mut self, value: i16) {
455+
let idx = value as u16;
456+
let word = (idx / 64) as usize;
457+
let bit = idx % 64;
458+
self.bitmap[word] |= 1u64 << bit;
459+
}
460+
461+
#[inline]
462+
fn count(&self) -> u64 {
463+
self.bitmap.iter().map(|w| w.count_ones() as u64).sum()
464+
}
465+
466+
fn merge(&mut self, other: &[u64; 1024]) {
467+
for i in 0..1024 {
468+
self.bitmap[i] |= other[i];
469+
}
470+
}
471+
}
472+
473+
impl Accumulator for Bitmap65536AccumulatorI16 {
474+
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
475+
let array = values[0].as_primitive::<Int16Type>();
476+
for value in array.iter().flatten() {
477+
self.set_bit(value);
478+
}
479+
Ok(())
480+
}
481+
482+
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
483+
let array = downcast_value!(states[0], BinaryArray);
484+
for data in array.iter().flatten() {
485+
if data.len() == 8192 {
486+
let mut other = [0u64; 1024];
487+
for i in 0..1024 {
488+
let offset = i * 8;
489+
other[i] =
490+
u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
491+
}
492+
self.merge(&other);
493+
}
494+
}
495+
Ok(())
496+
}
497+
498+
fn state(&mut self) -> Result<Vec<ScalarValue>> {
499+
let mut bytes = Vec::with_capacity(8192);
500+
for word in self.bitmap.iter() {
501+
bytes.extend_from_slice(&word.to_le_bytes());
502+
}
503+
Ok(vec![ScalarValue::Binary(Some(bytes))])
504+
}
505+
506+
fn evaluate(&mut self) -> Result<ScalarValue> {
507+
Ok(ScalarValue::UInt64(Some(self.count())))
508+
}
509+
510+
fn size(&self) -> usize {
511+
size_of::<Self>() + 8192
512+
}
513+
}
514+
163515
macro_rules! default_accumulator_impl {
164516
() => {
165517
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
@@ -344,12 +696,12 @@ impl AggregateUDFImpl for ApproxDistinct {
344696
// TODO u8, i8, u16, i16 shall really be done using bitmap, not HLL
345697
// TODO support for boolean (trivial case)
346698
// https://github.com/apache/datafusion/issues/1109
347-
DataType::UInt8 => Box::new(NumericHLLAccumulator::<UInt8Type>::new()),
348-
DataType::UInt16 => Box::new(NumericHLLAccumulator::<UInt16Type>::new()),
699+
DataType::UInt8 => Box::new(Bitmap256Accumulator::new()),
700+
DataType::UInt16 => Box::new(Bitmap65536Accumulator::new()),
349701
DataType::UInt32 => Box::new(NumericHLLAccumulator::<UInt32Type>::new()),
350702
DataType::UInt64 => Box::new(NumericHLLAccumulator::<UInt64Type>::new()),
351-
DataType::Int8 => Box::new(NumericHLLAccumulator::<Int8Type>::new()),
352-
DataType::Int16 => Box::new(NumericHLLAccumulator::<Int16Type>::new()),
703+
DataType::Int8 => Box::new(Bitmap256AccumulatorI8::new()),
704+
DataType::Int16 => Box::new(Bitmap65536AccumulatorI16::new()),
353705
DataType::Int32 => Box::new(NumericHLLAccumulator::<Int32Type>::new()),
354706
DataType::Int64 => Box::new(NumericHLLAccumulator::<Int64Type>::new()),
355707
DataType::Date32 => Box::new(NumericHLLAccumulator::<Date32Type>::new()),
@@ -383,6 +735,7 @@ impl AggregateUDFImpl for ApproxDistinct {
383735
DataType::Utf8View => Box::new(StringViewHLLAccumulator::new()),
384736
DataType::Binary => Box::new(BinaryHLLAccumulator::<i32>::new()),
385737
DataType::LargeBinary => Box::new(BinaryHLLAccumulator::<i64>::new()),
738+
DataType::Boolean => Box::new(BoolDistinctAccumulator::new()),
386739
DataType::Null => {
387740
Box::new(NoopAccumulator::new(ScalarValue::UInt64(Some(0))))
388741
}

0 commit comments

Comments
 (0)