Skip to content

Commit 1f0faf9

Browse files
coderfenderalamb
andauthored
perf : Optimize count distinct using bitmaps instead of hashsets for smaller datatypes (#21456)
## Which issue does this PR close? Remove hashset based accumulators for smaller int data types and use bitmaps. Follow up of : #21453 <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #21488 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 1068686 commit 1f0faf9

3 files changed

Lines changed: 382 additions & 17 deletions

File tree

datafusion/functions-aggregate-common/src/aggregate/count_distinct.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,9 @@ mod native;
2222
pub use bytes::BytesDistinctCountAccumulator;
2323
pub use bytes::BytesViewDistinctCountAccumulator;
2424
pub use dict::DictionaryCountAccumulator;
25+
pub use native::Bitmap65536DistinctCountAccumulator;
26+
pub use native::Bitmap65536DistinctCountAccumulatorI16;
27+
pub use native::BoolArray256DistinctCountAccumulator;
28+
pub use native::BoolArray256DistinctCountAccumulatorI8;
2529
pub use native::FloatDistinctCountAccumulator;
2630
pub use native::PrimitiveDistinctCountAccumulator;

datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs

Lines changed: 353 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ where
7878
])
7979
}
8080

81+
#[inline(never)]
8182
fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
8283
if values.is_empty() {
8384
return Ok(());
@@ -149,6 +150,7 @@ impl<T: ArrowPrimitiveType + Debug> Accumulator for FloatDistinctCountAccumulato
149150
self.values.state()
150151
}
151152

153+
#[inline(never)]
152154
fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
153155
self.values.update_batch(values)
154156
}
@@ -165,3 +167,354 @@ impl<T: ArrowPrimitiveType + Debug> Accumulator for FloatDistinctCountAccumulato
165167
size_of_val(self) + self.values.size()
166168
}
167169
}
170+
171+
/// Optimized COUNT DISTINCT accumulator for u8 using a bool array.
172+
/// Uses 256 bytes to track all possible u8 values.
173+
#[derive(Debug)]
174+
pub struct BoolArray256DistinctCountAccumulator {
175+
seen: [bool; 256],
176+
}
177+
178+
impl BoolArray256DistinctCountAccumulator {
179+
pub fn new() -> Self {
180+
Self { seen: [false; 256] }
181+
}
182+
183+
#[inline]
184+
fn count(&self) -> i64 {
185+
self.seen.iter().filter(|&&b| b).count() as i64
186+
}
187+
}
188+
189+
impl Default for BoolArray256DistinctCountAccumulator {
190+
fn default() -> Self {
191+
Self::new()
192+
}
193+
}
194+
195+
impl Accumulator for BoolArray256DistinctCountAccumulator {
196+
#[inline(never)]
197+
fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
198+
if values.is_empty() {
199+
return Ok(());
200+
}
201+
202+
let arr = as_primitive_array::<arrow::datatypes::UInt8Type>(&values[0])?;
203+
for value in arr.iter().flatten() {
204+
self.seen[value as usize] = true;
205+
}
206+
Ok(())
207+
}
208+
209+
fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
210+
if states.is_empty() {
211+
return Ok(());
212+
}
213+
214+
let arr = as_list_array(&states[0])?;
215+
arr.iter().try_for_each(|maybe_list| {
216+
if let Some(list) = maybe_list {
217+
let list = as_primitive_array::<arrow::datatypes::UInt8Type>(&list)?;
218+
for value in list.values().iter() {
219+
self.seen[*value as usize] = true;
220+
}
221+
};
222+
Ok(())
223+
})
224+
}
225+
226+
fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
227+
let values: Vec<u8> = self
228+
.seen
229+
.iter()
230+
.enumerate()
231+
.filter_map(|(idx, &seen)| if seen { Some(idx as u8) } else { None })
232+
.collect();
233+
234+
let arr = Arc::new(
235+
PrimitiveArray::<arrow::datatypes::UInt8Type>::from_iter_values(values),
236+
);
237+
Ok(vec![
238+
SingleRowListArrayBuilder::new(arr).build_list_scalar(),
239+
])
240+
}
241+
242+
fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
243+
Ok(ScalarValue::Int64(Some(self.count())))
244+
}
245+
246+
fn size(&self) -> usize {
247+
size_of_val(self) + 256
248+
}
249+
}
250+
251+
/// Optimized COUNT DISTINCT accumulator for i8 using a bool array.
252+
/// Uses 256 bytes to track all possible i8 values (mapped to 0..255).
253+
#[derive(Debug)]
254+
pub struct BoolArray256DistinctCountAccumulatorI8 {
255+
seen: [bool; 256],
256+
}
257+
258+
impl BoolArray256DistinctCountAccumulatorI8 {
259+
pub fn new() -> Self {
260+
Self { seen: [false; 256] }
261+
}
262+
263+
#[inline]
264+
fn count(&self) -> i64 {
265+
self.seen.iter().filter(|&&b| b).count() as i64
266+
}
267+
}
268+
269+
impl Default for BoolArray256DistinctCountAccumulatorI8 {
270+
fn default() -> Self {
271+
Self::new()
272+
}
273+
}
274+
275+
impl Accumulator for BoolArray256DistinctCountAccumulatorI8 {
276+
#[inline(never)]
277+
fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
278+
if values.is_empty() {
279+
return Ok(());
280+
}
281+
282+
let arr = as_primitive_array::<arrow::datatypes::Int8Type>(&values[0])?;
283+
for value in arr.iter().flatten() {
284+
self.seen[value as u8 as usize] = true;
285+
}
286+
Ok(())
287+
}
288+
289+
fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
290+
if states.is_empty() {
291+
return Ok(());
292+
}
293+
294+
let arr = as_list_array(&states[0])?;
295+
arr.iter().try_for_each(|maybe_list| {
296+
if let Some(list) = maybe_list {
297+
let list = as_primitive_array::<arrow::datatypes::Int8Type>(&list)?;
298+
for value in list.values().iter() {
299+
self.seen[*value as u8 as usize] = true;
300+
}
301+
};
302+
Ok(())
303+
})
304+
}
305+
306+
fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
307+
let values: Vec<i8> = self
308+
.seen
309+
.iter()
310+
.enumerate()
311+
.filter_map(
312+
|(idx, &seen)| {
313+
if seen { Some(idx as u8 as i8) } else { None }
314+
},
315+
)
316+
.collect();
317+
318+
let arr = Arc::new(
319+
PrimitiveArray::<arrow::datatypes::Int8Type>::from_iter_values(values),
320+
);
321+
Ok(vec![
322+
SingleRowListArrayBuilder::new(arr).build_list_scalar(),
323+
])
324+
}
325+
326+
fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
327+
Ok(ScalarValue::Int64(Some(self.count())))
328+
}
329+
330+
fn size(&self) -> usize {
331+
size_of_val(self) + 256
332+
}
333+
}
334+
335+
/// Optimized COUNT DISTINCT accumulator for u16 using a 65536-bit bitmap.
336+
/// Uses 8KB (1024 x u64) to track all possible u16 values.
337+
#[derive(Debug)]
338+
pub struct Bitmap65536DistinctCountAccumulator {
339+
bitmap: Box<[u64; 1024]>,
340+
}
341+
342+
impl Bitmap65536DistinctCountAccumulator {
343+
pub fn new() -> Self {
344+
Self {
345+
bitmap: Box::new([0; 1024]),
346+
}
347+
}
348+
349+
#[inline]
350+
fn set_bit(&mut self, value: u16) {
351+
let word = (value / 64) as usize;
352+
let bit = value % 64;
353+
self.bitmap[word] |= 1u64 << bit;
354+
}
355+
356+
#[inline]
357+
fn count(&self) -> i64 {
358+
self.bitmap.iter().map(|w| w.count_ones() as i64).sum()
359+
}
360+
}
361+
362+
impl Default for Bitmap65536DistinctCountAccumulator {
363+
fn default() -> Self {
364+
Self::new()
365+
}
366+
}
367+
368+
impl Accumulator for Bitmap65536DistinctCountAccumulator {
369+
#[inline(never)]
370+
fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
371+
if values.is_empty() {
372+
return Ok(());
373+
}
374+
375+
let arr = as_primitive_array::<arrow::datatypes::UInt16Type>(&values[0])?;
376+
for value in arr.iter().flatten() {
377+
self.set_bit(value);
378+
}
379+
Ok(())
380+
}
381+
382+
fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
383+
if states.is_empty() {
384+
return Ok(());
385+
}
386+
387+
let arr = as_list_array(&states[0])?;
388+
arr.iter().try_for_each(|maybe_list| {
389+
if let Some(list) = maybe_list {
390+
let list = as_primitive_array::<arrow::datatypes::UInt16Type>(&list)?;
391+
for value in list.values().iter() {
392+
self.set_bit(*value);
393+
}
394+
};
395+
Ok(())
396+
})
397+
}
398+
399+
fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
400+
let mut values = Vec::new();
401+
for (word_idx, &word) in self.bitmap.iter().enumerate() {
402+
if word != 0 {
403+
for bit in 0..64 {
404+
if (word & (1u64 << bit)) != 0 {
405+
values.push((word_idx as u16) * 64 + bit);
406+
}
407+
}
408+
}
409+
}
410+
411+
let arr = Arc::new(
412+
PrimitiveArray::<arrow::datatypes::UInt16Type>::from_iter_values(values),
413+
);
414+
Ok(vec![
415+
SingleRowListArrayBuilder::new(arr).build_list_scalar(),
416+
])
417+
}
418+
419+
fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
420+
Ok(ScalarValue::Int64(Some(self.count())))
421+
}
422+
423+
fn size(&self) -> usize {
424+
size_of_val(self) + 8192
425+
}
426+
}
427+
428+
/// Optimized COUNT DISTINCT accumulator for i16 using a 65536-bit bitmap.
429+
/// Uses 8KB (1024 x u64) to track all possible i16 values (mapped to 0..65535).
430+
#[derive(Debug)]
431+
pub struct Bitmap65536DistinctCountAccumulatorI16 {
432+
bitmap: Box<[u64; 1024]>,
433+
}
434+
435+
impl Bitmap65536DistinctCountAccumulatorI16 {
436+
pub fn new() -> Self {
437+
Self {
438+
bitmap: Box::new([0; 1024]),
439+
}
440+
}
441+
442+
#[inline]
443+
fn set_bit(&mut self, value: i16) {
444+
let idx = value as u16;
445+
let word = (idx / 64) as usize;
446+
let bit = idx % 64;
447+
self.bitmap[word] |= 1u64 << bit;
448+
}
449+
450+
#[inline]
451+
fn count(&self) -> i64 {
452+
self.bitmap.iter().map(|w| w.count_ones() as i64).sum()
453+
}
454+
}
455+
456+
impl Default for Bitmap65536DistinctCountAccumulatorI16 {
457+
fn default() -> Self {
458+
Self::new()
459+
}
460+
}
461+
462+
impl Accumulator for Bitmap65536DistinctCountAccumulatorI16 {
463+
#[inline(never)]
464+
fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
465+
if values.is_empty() {
466+
return Ok(());
467+
}
468+
469+
let arr = as_primitive_array::<arrow::datatypes::Int16Type>(&values[0])?;
470+
for value in arr.iter().flatten() {
471+
self.set_bit(value);
472+
}
473+
Ok(())
474+
}
475+
476+
fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
477+
if states.is_empty() {
478+
return Ok(());
479+
}
480+
481+
let arr = as_list_array(&states[0])?;
482+
arr.iter().try_for_each(|maybe_list| {
483+
if let Some(list) = maybe_list {
484+
let list = as_primitive_array::<arrow::datatypes::Int16Type>(&list)?;
485+
for value in list.values().iter() {
486+
self.set_bit(*value);
487+
}
488+
};
489+
Ok(())
490+
})
491+
}
492+
493+
fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
494+
let mut values = Vec::new();
495+
for (word_idx, &word) in self.bitmap.iter().enumerate() {
496+
if word != 0 {
497+
for bit in 0..64 {
498+
if (word & (1u64 << bit)) != 0 {
499+
values.push(((word_idx as u16) * 64 + bit) as i16);
500+
}
501+
}
502+
}
503+
}
504+
505+
let arr = Arc::new(
506+
PrimitiveArray::<arrow::datatypes::Int16Type>::from_iter_values(values),
507+
);
508+
Ok(vec![
509+
SingleRowListArrayBuilder::new(arr).build_list_scalar(),
510+
])
511+
}
512+
513+
fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
514+
Ok(ScalarValue::Int64(Some(self.count())))
515+
}
516+
517+
fn size(&self) -> usize {
518+
size_of_val(self) + 8192
519+
}
520+
}

0 commit comments

Comments
 (0)