Skip to content

Commit e5de29b

Browse files
committed
bitmap_instead_smaller_datatypes
1 parent eb0632f commit e5de29b

6 files changed

Lines changed: 377 additions & 29 deletions

File tree

datafusion/functions-aggregate-common/src/aggregate/count_distinct.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,9 @@ mod native;
2222
pub use bytes::BytesDistinctCountAccumulator;
2323
pub use bytes::BytesViewDistinctCountAccumulator;
2424
pub use dict::DictionaryCountAccumulator;
25+
pub use native::Bitmap65536DistinctCountAccumulator;
26+
pub use native::Bitmap65536DistinctCountAccumulatorI16;
27+
pub use native::BoolArray256DistinctCountAccumulator;
28+
pub use native::BoolArray256DistinctCountAccumulatorI8;
2529
pub use native::FloatDistinctCountAccumulator;
2630
pub use native::PrimitiveDistinctCountAccumulator;

datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs

Lines changed: 353 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,3 +165,356 @@ impl<T: ArrowPrimitiveType + Debug> Accumulator for FloatDistinctCountAccumulato
165165
size_of_val(self) + self.values.size()
166166
}
167167
}
168+
169+
/// Optimized COUNT DISTINCT accumulator for u8 using a bool array.
170+
/// Uses 256 bytes to track all possible u8 values.
171+
#[derive(Debug)]
172+
pub struct BoolArray256DistinctCountAccumulator {
173+
seen: Box<[bool; 256]>,
174+
}
175+
176+
impl BoolArray256DistinctCountAccumulator {
177+
pub fn new() -> Self {
178+
Self {
179+
seen: Box::new([false; 256]),
180+
}
181+
}
182+
183+
#[inline]
184+
fn count(&self) -> i64 {
185+
self.seen.iter().filter(|&&b| b).count() as i64
186+
}
187+
}
188+
189+
impl Default for BoolArray256DistinctCountAccumulator {
190+
fn default() -> Self {
191+
Self::new()
192+
}
193+
}
194+
195+
impl Accumulator for BoolArray256DistinctCountAccumulator {
196+
fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
197+
if values.is_empty() {
198+
return Ok(());
199+
}
200+
201+
let arr = as_primitive_array::<arrow::datatypes::UInt8Type>(&values[0])?;
202+
for value in arr.iter().flatten() {
203+
self.seen[value as usize] = true;
204+
}
205+
Ok(())
206+
}
207+
208+
fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
209+
if states.is_empty() {
210+
return Ok(());
211+
}
212+
213+
let arr = as_list_array(&states[0])?;
214+
arr.iter().try_for_each(|maybe_list| {
215+
if let Some(list) = maybe_list {
216+
let list = as_primitive_array::<arrow::datatypes::UInt8Type>(&list)?;
217+
for value in list.values().iter() {
218+
self.seen[*value as usize] = true;
219+
}
220+
};
221+
Ok(())
222+
})
223+
}
224+
225+
fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
226+
let values: Vec<u8> = self
227+
.seen
228+
.iter()
229+
.enumerate()
230+
.filter_map(|(idx, &seen)| if seen { Some(idx as u8) } else { None })
231+
.collect();
232+
233+
let arr = Arc::new(PrimitiveArray::<arrow::datatypes::UInt8Type>::from_iter_values(
234+
values,
235+
));
236+
Ok(vec![
237+
SingleRowListArrayBuilder::new(arr).build_list_scalar(),
238+
])
239+
}
240+
241+
fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
242+
Ok(ScalarValue::Int64(Some(self.count())))
243+
}
244+
245+
fn size(&self) -> usize {
246+
size_of_val(self) + 256
247+
}
248+
}
249+
250+
/// Optimized COUNT DISTINCT accumulator for i8 using a bool array.
251+
/// Uses 256 bytes to track all possible i8 values (mapped to 0..255).
252+
#[derive(Debug)]
253+
pub struct BoolArray256DistinctCountAccumulatorI8 {
254+
seen: Box<[bool; 256]>,
255+
}
256+
257+
impl BoolArray256DistinctCountAccumulatorI8 {
258+
pub fn new() -> Self {
259+
Self {
260+
seen: Box::new([false; 256]),
261+
}
262+
}
263+
264+
#[inline]
265+
fn count(&self) -> i64 {
266+
self.seen.iter().filter(|&&b| b).count() as i64
267+
}
268+
}
269+
270+
impl Default for BoolArray256DistinctCountAccumulatorI8 {
271+
fn default() -> Self {
272+
Self::new()
273+
}
274+
}
275+
276+
impl Accumulator for BoolArray256DistinctCountAccumulatorI8 {
277+
fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
278+
if values.is_empty() {
279+
return Ok(());
280+
}
281+
282+
let arr = as_primitive_array::<arrow::datatypes::Int8Type>(&values[0])?;
283+
for value in arr.iter().flatten() {
284+
self.seen[value as u8 as usize] = true;
285+
}
286+
Ok(())
287+
}
288+
289+
fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
290+
if states.is_empty() {
291+
return Ok(());
292+
}
293+
294+
let arr = as_list_array(&states[0])?;
295+
arr.iter().try_for_each(|maybe_list| {
296+
if let Some(list) = maybe_list {
297+
let list = as_primitive_array::<arrow::datatypes::Int8Type>(&list)?;
298+
for value in list.values().iter() {
299+
self.seen[*value as u8 as usize] = true;
300+
}
301+
};
302+
Ok(())
303+
})
304+
}
305+
306+
fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
307+
let values: Vec<i8> = self
308+
.seen
309+
.iter()
310+
.enumerate()
311+
.filter_map(|(idx, &seen)| {
312+
if seen {
313+
Some(idx as u8 as i8)
314+
} else {
315+
None
316+
}
317+
})
318+
.collect();
319+
320+
let arr = Arc::new(PrimitiveArray::<arrow::datatypes::Int8Type>::from_iter_values(
321+
values,
322+
));
323+
Ok(vec![
324+
SingleRowListArrayBuilder::new(arr).build_list_scalar(),
325+
])
326+
}
327+
328+
fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
329+
Ok(ScalarValue::Int64(Some(self.count())))
330+
}
331+
332+
fn size(&self) -> usize {
333+
size_of_val(self) + 256
334+
}
335+
}
336+
337+
/// Optimized COUNT DISTINCT accumulator for u16 using a 65536-bit bitmap.
338+
/// Uses 8KB (1024 x u64) to track all possible u16 values.
339+
#[derive(Debug)]
340+
pub struct Bitmap65536DistinctCountAccumulator {
341+
bitmap: Box<[u64; 1024]>,
342+
}
343+
344+
impl Bitmap65536DistinctCountAccumulator {
345+
pub fn new() -> Self {
346+
Self {
347+
bitmap: Box::new([0; 1024]),
348+
}
349+
}
350+
351+
#[inline]
352+
fn set_bit(&mut self, value: u16) {
353+
let word = (value / 64) as usize;
354+
let bit = value % 64;
355+
self.bitmap[word] |= 1u64 << bit;
356+
}
357+
358+
#[inline]
359+
fn count(&self) -> i64 {
360+
self.bitmap.iter().map(|w| w.count_ones() as i64).sum()
361+
}
362+
}
363+
364+
impl Default for Bitmap65536DistinctCountAccumulator {
365+
fn default() -> Self {
366+
Self::new()
367+
}
368+
}
369+
370+
impl Accumulator for Bitmap65536DistinctCountAccumulator {
371+
fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
372+
if values.is_empty() {
373+
return Ok(());
374+
}
375+
376+
let arr = as_primitive_array::<arrow::datatypes::UInt16Type>(&values[0])?;
377+
for value in arr.iter().flatten() {
378+
self.set_bit(value);
379+
}
380+
Ok(())
381+
}
382+
383+
fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
384+
if states.is_empty() {
385+
return Ok(());
386+
}
387+
388+
let arr = as_list_array(&states[0])?;
389+
arr.iter().try_for_each(|maybe_list| {
390+
if let Some(list) = maybe_list {
391+
let list = as_primitive_array::<arrow::datatypes::UInt16Type>(&list)?;
392+
for value in list.values().iter() {
393+
self.set_bit(*value);
394+
}
395+
};
396+
Ok(())
397+
})
398+
}
399+
400+
fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
401+
let mut values = Vec::new();
402+
for (word_idx, &word) in self.bitmap.iter().enumerate() {
403+
if word != 0 {
404+
for bit in 0..64 {
405+
if (word & (1u64 << bit)) != 0 {
406+
values.push((word_idx as u16) * 64 + bit);
407+
}
408+
}
409+
}
410+
}
411+
412+
let arr = Arc::new(PrimitiveArray::<arrow::datatypes::UInt16Type>::from_iter_values(
413+
values,
414+
));
415+
Ok(vec![
416+
SingleRowListArrayBuilder::new(arr).build_list_scalar(),
417+
])
418+
}
419+
420+
fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
421+
Ok(ScalarValue::Int64(Some(self.count())))
422+
}
423+
424+
fn size(&self) -> usize {
425+
size_of_val(self) + 8192
426+
}
427+
}
428+
429+
/// Optimized COUNT DISTINCT accumulator for i16 using a 65536-bit bitmap.
430+
/// Uses 8KB (1024 x u64) to track all possible i16 values (mapped to 0..65535).
431+
#[derive(Debug)]
432+
pub struct Bitmap65536DistinctCountAccumulatorI16 {
433+
bitmap: Box<[u64; 1024]>,
434+
}
435+
436+
impl Bitmap65536DistinctCountAccumulatorI16 {
437+
pub fn new() -> Self {
438+
Self {
439+
bitmap: Box::new([0; 1024]),
440+
}
441+
}
442+
443+
#[inline]
444+
fn set_bit(&mut self, value: i16) {
445+
let idx = value as u16;
446+
let word = (idx / 64) as usize;
447+
let bit = idx % 64;
448+
self.bitmap[word] |= 1u64 << bit;
449+
}
450+
451+
#[inline]
452+
fn count(&self) -> i64 {
453+
self.bitmap.iter().map(|w| w.count_ones() as i64).sum()
454+
}
455+
}
456+
457+
impl Default for Bitmap65536DistinctCountAccumulatorI16 {
458+
fn default() -> Self {
459+
Self::new()
460+
}
461+
}
462+
463+
impl Accumulator for Bitmap65536DistinctCountAccumulatorI16 {
464+
fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
465+
if values.is_empty() {
466+
return Ok(());
467+
}
468+
469+
let arr = as_primitive_array::<arrow::datatypes::Int16Type>(&values[0])?;
470+
for value in arr.iter().flatten() {
471+
self.set_bit(value);
472+
}
473+
Ok(())
474+
}
475+
476+
fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
477+
if states.is_empty() {
478+
return Ok(());
479+
}
480+
481+
let arr = as_list_array(&states[0])?;
482+
arr.iter().try_for_each(|maybe_list| {
483+
if let Some(list) = maybe_list {
484+
let list = as_primitive_array::<arrow::datatypes::Int16Type>(&list)?;
485+
for value in list.values().iter() {
486+
self.set_bit(*value);
487+
}
488+
};
489+
Ok(())
490+
})
491+
}
492+
493+
fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
494+
let mut values = Vec::new();
495+
for (word_idx, &word) in self.bitmap.iter().enumerate() {
496+
if word != 0 {
497+
for bit in 0..64 {
498+
if (word & (1u64 << bit)) != 0 {
499+
values.push(((word_idx as u16) * 64 + bit) as i16);
500+
}
501+
}
502+
}
503+
}
504+
505+
let arr = Arc::new(PrimitiveArray::<arrow::datatypes::Int16Type>::from_iter_values(
506+
values,
507+
));
508+
Ok(vec![
509+
SingleRowListArrayBuilder::new(arr).build_list_scalar(),
510+
])
511+
}
512+
513+
fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
514+
Ok(ScalarValue::Int64(Some(self.count())))
515+
}
516+
517+
fn size(&self) -> usize {
518+
size_of_val(self) + 8192
519+
}
520+
}

datafusion/functions-aggregate/Cargo.toml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,6 @@ name = "min_max_bytes"
8080
name = "approx_distinct"
8181
harness = false
8282

83-
[[bench]]
84-
name = "first_last"
85-
harness = false
86-
8783
[[bench]]
8884
name = "count_distinct"
8985
harness = false

0 commit comments

Comments
 (0)