Skip to content

Commit 2595e56

Browse files
committed
implement_group_accumulators_count_distinct_use_hashtable
1 parent f2524a7 commit 2595e56

File tree

1 file changed

+50
-28
lines changed
  • datafusion/functions-aggregate-common/src/aggregate/count_distinct

1 file changed

+50
-28
lines changed

datafusion/functions-aggregate-common/src/aggregate/count_distinct/groups.rs

Lines changed: 50 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -48,31 +48,6 @@ where
4848
}
4949
}
5050

51-
fn emit_to_values(&mut self, emit_to: EmitTo) -> Vec<Vec<T::Native>> {
52-
let num_emitted = match emit_to {
53-
EmitTo::All => self.num_groups,
54-
EmitTo::First(n) => n,
55-
};
56-
57-
let mut group_values: Vec<Vec<T::Native>> = vec![Vec::new(); num_emitted];
58-
let mut remaining = HashSet::default();
59-
60-
for (group_idx, value) in self.seen.drain() {
61-
if group_idx < num_emitted {
62-
group_values[group_idx].push(value);
63-
} else {
64-
remaining.insert((group_idx - num_emitted, value));
65-
}
66-
}
67-
68-
self.seen = remaining;
69-
match emit_to {
70-
EmitTo::All => self.num_groups = 0,
71-
EmitTo::First(n) => self.num_groups = self.num_groups.saturating_sub(n),
72-
}
73-
74-
group_values
75-
}
7651
}
7752

7853
impl<T: ArrowPrimitiveType> Default for PrimitiveDistinctCountGroupsAccumulator<T>
@@ -106,13 +81,60 @@ where
10681
}
10782

10883
fn evaluate(&mut self, emit_to: EmitTo) -> datafusion_common::Result<ArrayRef> {
109-
let group_values = self.emit_to_values(emit_to);
110-
let counts: Vec<i64> = group_values.iter().map(|v| v.len() as i64).collect();
84+
let num_emitted = match emit_to {
85+
EmitTo::All => self.num_groups,
86+
EmitTo::First(n) => n,
87+
};
88+
89+
let mut counts = vec![0i64; num_emitted];
90+
91+
if matches!(emit_to, EmitTo::All) {
92+
for &(group_idx, _) in self.seen.iter() {
93+
counts[group_idx] += 1;
94+
}
95+
self.seen.clear();
96+
self.num_groups = 0;
97+
} else {
98+
let mut remaining = HashSet::default();
99+
for (group_idx, value) in self.seen.drain() {
100+
if group_idx < num_emitted {
101+
counts[group_idx] += 1;
102+
} else {
103+
remaining.insert((group_idx - num_emitted, value));
104+
}
105+
}
106+
self.seen = remaining;
107+
self.num_groups = self.num_groups.saturating_sub(num_emitted);
108+
}
109+
111110
Ok(Arc::new(Int64Array::from(counts)))
112111
}
113112

114113
fn state(&mut self, emit_to: EmitTo) -> datafusion_common::Result<Vec<ArrayRef>> {
115-
let group_values = self.emit_to_values(emit_to);
114+
let num_emitted = match emit_to {
115+
EmitTo::All => self.num_groups,
116+
EmitTo::First(n) => n,
117+
};
118+
119+
let mut group_values: Vec<Vec<T::Native>> = vec![Vec::new(); num_emitted];
120+
121+
if matches!(emit_to, EmitTo::All) {
122+
for (group_idx, value) in self.seen.drain() {
123+
group_values[group_idx].push(value);
124+
}
125+
self.num_groups = 0;
126+
} else {
127+
let mut remaining = HashSet::default();
128+
for (group_idx, value) in self.seen.drain() {
129+
if group_idx < num_emitted {
130+
group_values[group_idx].push(value);
131+
} else {
132+
remaining.insert((group_idx - num_emitted, value));
133+
}
134+
}
135+
self.seen = remaining;
136+
self.num_groups = self.num_groups.saturating_sub(num_emitted);
137+
}
116138

117139
let mut offsets = vec![0i32];
118140
let mut all_values = Vec::new();

0 commit comments

Comments
 (0)