Skip to content

Commit 17dd86b

Browse files
committed
Merge branch 'main' into add_group_benchmarks_count_distinct
2 parents 19dd479 + cc4717a commit 17dd86b

2 files changed

Lines changed: 21 additions & 12 deletions

File tree

datafusion/functions-aggregate/benches/count_distinct.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -239,19 +239,28 @@ fn count_distinct_groups_benchmark(c: &mut Criterion) {
239239
})
240240
});
241241
} else {
242+
let arr = values.as_any().downcast_ref::<Int64Array>().unwrap();
243+
let mut group_rows: Vec<Vec<i64>> = vec![Vec::new(); num_groups];
244+
for (idx, &group_idx) in group_indices.iter().enumerate() {
245+
if arr.is_valid(idx) {
246+
group_rows[group_idx].push(arr.value(idx));
247+
}
248+
}
249+
let group_arrays: Vec<ArrayRef> = group_rows
250+
.iter()
251+
.map(|rows| Arc::new(Int64Array::from(rows.clone())) as ArrayRef)
252+
.collect();
253+
242254
c.bench_function(&format!("count_distinct_groups {name}"), |b| {
243255
b.iter(|| {
244256
let mut accumulators: Vec<_> = (0..num_groups)
245257
.map(|_| prepare_accumulator(DataType::Int64))
246258
.collect();
247259

248-
let arr = values.as_any().downcast_ref::<Int64Array>().unwrap();
249-
for (idx, group_idx) in group_indices.iter().enumerate() {
250-
if let Some(val) = arr.value(idx).into() {
251-
let single_val =
252-
Arc::new(Int64Array::from(vec![Some(val)])) as ArrayRef;
253-
accumulators[*group_idx]
254-
.update_batch(std::slice::from_ref(&single_val))
260+
for (group_idx, batch) in group_arrays.iter().enumerate() {
261+
if batch.len() > 0 {
262+
accumulators[group_idx]
263+
.update_batch(std::slice::from_ref(batch))
255264
.unwrap();
256265
}
257266
}

datafusion/physical-optimizer/src/window_topn.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -238,15 +238,15 @@ impl PhysicalOptimizerRule for WindowTopN {
238238
fn extract_window_limit(
239239
predicate: &Arc<dyn datafusion_physical_expr::PhysicalExpr>,
240240
) -> Option<(usize, usize)> {
241-
let binary = predicate.as_any().downcast_ref::<BinaryExpr>()?;
241+
let binary = predicate.downcast_ref::<BinaryExpr>()?;
242242
let op = binary.op();
243243
let left = binary.left();
244244
let right = binary.right();
245245

246246
// Try Column op Literal
247247
if let (Some(col), Some(lit_val)) = (
248-
left.as_any().downcast_ref::<Column>(),
249-
right.as_any().downcast_ref::<Literal>(),
248+
left.downcast_ref::<Column>(),
249+
right.downcast_ref::<Literal>(),
250250
) {
251251
let n = scalar_to_usize(lit_val.value())?;
252252
return match *op {
@@ -258,8 +258,8 @@ fn extract_window_limit(
258258

259259
// Try Literal op Column (flipped)
260260
if let (Some(lit_val), Some(col)) = (
261-
left.as_any().downcast_ref::<Literal>(),
262-
right.as_any().downcast_ref::<Column>(),
261+
left.downcast_ref::<Literal>(),
262+
right.downcast_ref::<Column>(),
263263
) {
264264
let n = scalar_to_usize(lit_val.value())?;
265265
return match *op {

0 commit comments

Comments
 (0)