Skip to content

Commit 888f556

Browse files
committed
single_state_vec
1 parent 066504d commit 888f556

1 file changed

Lines changed: 17 additions & 10 deletions

File tree

  • datafusion/functions-aggregate-common/src/aggregate/count_distinct

datafusion/functions-aggregate-common/src/aggregate/count_distinct/groups.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -108,18 +108,32 @@ where
108108
EmitTo::First(n) => n,
109109
};
110110

111-
let mut group_values: Vec<Vec<T::Native>> = vec![Vec::new(); num_emitted];
111+
// Prefix-sum counts[..num_emitted] into offsets
112+
let mut offsets = Vec::with_capacity(num_emitted + 1);
113+
offsets.push(0i32);
114+
let mut total = 0i32;
115+
for &c in &self.counts[..num_emitted] {
116+
total += c as i32;
117+
offsets.push(total);
118+
}
119+
120+
let mut all_values = vec![T::Native::default(); total as usize];
121+
let mut cursors: Vec<i32> = offsets[..num_emitted].to_vec();
112122

113123
if matches!(emit_to, EmitTo::All) {
114124
for (group_idx, value) in self.seen.drain() {
115-
group_values[group_idx].push(value);
125+
let pos = cursors[group_idx] as usize;
126+
all_values[pos] = value;
127+
cursors[group_idx] += 1;
116128
}
117129
self.counts.clear();
118130
} else {
119131
let mut remaining = HashSet::default();
120132
for (group_idx, value) in self.seen.drain() {
121133
if group_idx < num_emitted {
122-
group_values[group_idx].push(value);
134+
let pos = cursors[group_idx] as usize;
135+
all_values[pos] = value;
136+
cursors[group_idx] += 1;
123137
} else {
124138
remaining.insert((group_idx - num_emitted, value));
125139
}
@@ -128,13 +142,6 @@ where
128142
let _ = emit_to.take_needed(&mut self.counts);
129143
}
130144

131-
let mut offsets = vec![0i32];
132-
let mut all_values = Vec::new();
133-
for values in &group_values {
134-
all_values.extend(values.iter().copied());
135-
offsets.push(all_values.len() as i32);
136-
}
137-
138145
let values_array = Arc::new(PrimitiveArray::<T>::from_iter_values(all_values));
139146
let list_array = ListArray::new(
140147
Arc::new(Field::new_list_field(T::DATA_TYPE, true)),

0 commit comments

Comments
 (0)