Skip to content

Commit dbd05b5

Browse files
committed
Optimize ClickBench q17 aggregate limit
1 parent 948cd09 commit dbd05b5

7 files changed

Lines changed: 223 additions & 45 deletions

File tree

benchmarks/src/bin/dfbench.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,7 @@ use datafusion::error::Result;
2020

2121
use clap::{Parser, Subcommand};
2222

23-
#[cfg(all(feature = "snmalloc", feature = "mimalloc"))]
24-
compile_error!(
25-
"feature \"snmalloc\" and feature \"mimalloc\" cannot be enabled at the same time"
26-
);
27-
28-
#[cfg(feature = "snmalloc")]
23+
#[cfg(all(feature = "snmalloc", not(feature = "mimalloc")))]
2924
#[global_allocator]
3025
static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
3126

benchmarks/src/bin/imdb.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,7 @@ use clap::{Parser, Subcommand};
2121
use datafusion::error::Result;
2222
use datafusion_benchmarks::imdb;
2323

24-
#[cfg(all(feature = "snmalloc", feature = "mimalloc"))]
25-
compile_error!(
26-
"feature \"snmalloc\" and feature \"mimalloc\" cannot be enabled at the same time"
27-
);
28-
29-
#[cfg(feature = "snmalloc")]
24+
#[cfg(all(feature = "snmalloc", not(feature = "mimalloc")))]
3025
#[global_allocator]
3126
static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
3227

datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,12 @@ use crate::physical_optimizer::test_utils::{
2828
use arrow::datatypes::DataType;
2929
use arrow::{compute::SortOptions, util::pretty::pretty_format_batches};
3030
use datafusion::prelude::SessionContext;
31-
use datafusion_common::Result;
31+
use datafusion_common::{Result, config::ConfigOptions};
3232
use datafusion_execution::config::SessionConfig;
3333
use datafusion_expr::Operator;
3434
use datafusion_physical_expr::expressions::{self, cast, col};
3535
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
36+
use datafusion_physical_optimizer::PhysicalOptimizerRule;
3637
use datafusion_physical_plan::{
3738
ExecutionPlan,
3839
aggregates::{AggregateExec, AggregateMode},
@@ -332,7 +333,7 @@ fn test_has_aggregate_expression() -> Result<()> {
332333
let schema = source.schema();
333334
let agg = TestAggregate::new_count_star();
334335

335-
// `SELECT <aggregate with no expressions> FROM DataSourceExec LIMIT 10;`, Single AggregateExec
336+
// `SELECT a, COUNT(*) FROM DataSourceExec GROUP BY a LIMIT 10;`, Single AggregateExec
336337
let single_agg = AggregateExec::try_new(
337338
AggregateMode::Single,
338339
build_group_by(&schema, vec!["a".to_string()]),
@@ -345,21 +346,82 @@ fn test_has_aggregate_expression() -> Result<()> {
345346
Arc::new(single_agg),
346347
10, // fetch
347348
);
348-
// expected not to push the limit to the AggregateExec
349+
// expected to push the limit to the AggregateExec
349350
let plan: Arc<dyn ExecutionPlan> = Arc::new(limit_exec);
350351
let formatted = get_optimized_plan(&plan)?;
351352
let actual = formatted.trim();
352353
assert_snapshot!(
353354
actual,
354355
@r"
355356
LocalLimitExec: fetch=10
356-
AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)]
357+
AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)], lim=[10]
357358
DataSourceExec: partitions=1, partition_sizes=[1]
358359
"
359360
);
360361
Ok(())
361362
}
362363

364+
#[tokio::test]
365+
async fn test_partial_final_with_aggregate_expression() -> Result<()> {
366+
let source = mock_data()?;
367+
let schema = source.schema();
368+
let agg = TestAggregate::new_count_star();
369+
370+
// `SELECT a, COUNT(*) FROM DataSourceExec GROUP BY a LIMIT 4;`,
371+
// Partial/Final AggregateExec. Both stages can keep the same deterministic
372+
// top-k group keys.
373+
let partial_agg = AggregateExec::try_new(
374+
AggregateMode::Partial,
375+
build_group_by(&schema.clone(), vec!["a".to_string()]),
376+
vec![Arc::new(agg.count_expr(&schema))], /* aggr_expr */
377+
vec![None], /* filter_expr */
378+
source, /* input */
379+
schema.clone(), /* input_schema */
380+
)?;
381+
let final_agg = AggregateExec::try_new(
382+
AggregateMode::Final,
383+
build_group_by(&schema.clone(), vec!["a".to_string()]),
384+
vec![Arc::new(agg.count_expr(&schema))], /* aggr_expr */
385+
vec![None], /* filter_expr */
386+
Arc::new(partial_agg), /* input */
387+
schema.clone(), /* input_schema */
388+
)?;
389+
let limit_exec = LocalLimitExec::new(
390+
Arc::new(final_agg),
391+
4, // fetch
392+
);
393+
let plan: Arc<dyn ExecutionPlan> = Arc::new(limit_exec);
394+
let formatted = get_optimized_plan(&plan)?;
395+
let actual = formatted.trim();
396+
assert_snapshot!(
397+
actual,
398+
@r"
399+
LocalLimitExec: fetch=4
400+
AggregateExec: mode=Final, gby=[a@0 as a], aggr=[COUNT(*)], lim=[4]
401+
AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[COUNT(*)], lim=[4]
402+
DataSourceExec: partitions=1, partition_sizes=[1]
403+
"
404+
);
405+
let optimized =
406+
datafusion_physical_optimizer::limited_distinct_aggregation::LimitedDistinctAggregation::new()
407+
.optimize(Arc::clone(&plan), &ConfigOptions::new())?;
408+
let expected = run_plan_and_format(optimized).await?;
409+
assert_snapshot!(
410+
expected,
411+
@r"
412+
+---+----------+
413+
| a | COUNT(*) |
414+
+---+----------+
415+
| | 1 |
416+
| 1 | 2 |
417+
| 2 | 1 |
418+
| 4 | 1 |
419+
+---+----------+
420+
"
421+
);
422+
Ok(())
423+
}
424+
363425
#[test]
364426
fn test_has_filter() -> Result<()> {
365427
let source = mock_data()?;

datafusion/physical-optimizer/src/limited_distinct_aggregation.rs

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! A special-case optimizer rule that pushes limit into a grouped aggregation
19-
//! which has no aggregate expressions or sorting requirements
18+
//! A special-case optimizer rule that pushes limit into unordered grouped
19+
//! aggregation when the query only needs an arbitrary subset of groups.
2020
2121
use std::sync::Arc;
2222

23-
use datafusion_physical_plan::aggregates::{AggregateExec, LimitOptions};
23+
use datafusion_physical_plan::aggregates::{AggregateExec, AggregateMode, LimitOptions};
2424
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
2525
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
2626

@@ -32,9 +32,10 @@ use crate::PhysicalOptimizerRule;
3232
use itertools::Itertools;
3333

3434
/// An optimizer rule that passes a `limit` hint into grouped aggregations which don't require all
35-
/// rows in the group to be processed for correctness. Example queries fitting this description are:
35+
/// groups to be produced for correctness. Example queries fitting this description are:
3636
/// - `SELECT distinct l_orderkey FROM lineitem LIMIT 10;`
3737
/// - `SELECT l_orderkey FROM lineitem GROUP BY l_orderkey LIMIT 10;`
38+
/// - `SELECT l_orderkey, COUNT(*) FROM lineitem GROUP BY l_orderkey LIMIT 10;`
3839
#[derive(Debug)]
3940
pub struct LimitedDistinctAggregation {}
4041

@@ -48,21 +49,43 @@ impl LimitedDistinctAggregation {
4849
aggr: &AggregateExec,
4950
limit: usize,
5051
) -> Option<Arc<dyn ExecutionPlan>> {
51-
// rules for transforming this Aggregate are held in this method
52-
if !aggr.is_unordered_unfiltered_group_by_distinct() {
52+
if aggr.is_unordered_unfiltered_group_by_distinct() {
53+
let new_aggr = aggr.with_new_limit_options(Some(LimitOptions::new(limit)));
54+
return Some(Arc::new(new_aggr));
55+
}
56+
57+
if !Self::can_limit_aggregate(aggr) {
5358
return None;
5459
}
5560

56-
// We found what we want: clone, copy the limit down, and return modified node
5761
let new_aggr = aggr.with_new_limit_options(Some(LimitOptions::new(limit)));
5862

5963
Some(Arc::new(new_aggr))
6064
}
6165

66+
fn can_limit_aggregate(aggr: &AggregateExec) -> bool {
67+
if !aggr.is_unordered_unfiltered_group_by() {
68+
return false;
69+
}
70+
if aggr.aggr_expr().is_empty() {
71+
return false;
72+
}
73+
if !aggr.group_expr().is_single() {
74+
return false;
75+
}
76+
matches!(
77+
aggr.mode(),
78+
AggregateMode::Partial
79+
| AggregateMode::Final
80+
| AggregateMode::FinalPartitioned
81+
| AggregateMode::Single
82+
| AggregateMode::SinglePartitioned
83+
)
84+
}
85+
6286
/// transform_limit matches an `AggregateExec` as the child of a `LocalLimitExec`
6387
/// or `GlobalLimitExec` and pushes the limit into the aggregation as a soft limit when
64-
/// there is a group by, but no sorting, no aggregate expressions, and no filters in the
65-
/// aggregation
88+
/// there is a group by, but no sorting or filters in the aggregation
6689
fn transform_limit(plan: Arc<dyn ExecutionPlan>) -> Option<Arc<dyn ExecutionPlan>> {
6790
let limit: usize;
6891
let mut global_fetch: Option<usize> = None;

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -915,6 +915,7 @@ impl AggregateExec {
915915

916916
// grouping by an expression that has a sort/limit upstream
917917
if let Some(config) = self.limit_options
918+
&& config.descending().is_some()
918919
&& !self.is_unordered_unfiltered_group_by_distinct()
919920
{
920921
return Ok(StreamType::GroupedPriorityQueue(
@@ -934,11 +935,9 @@ impl AggregateExec {
934935
agg_expr.get_minmax_desc()
935936
}
936937

937-
/// true, if this Aggregate has a group-by with no required or explicit ordering,
938-
/// no filtering and no aggregate expressions
939-
/// This method qualifies the use of the LimitedDistinctAggregation rewrite rule
940-
/// on an AggregateExec.
941-
pub fn is_unordered_unfiltered_group_by_distinct(&self) -> bool {
938+
/// true if this Aggregate has a group-by with no required or explicit
939+
/// ordering and no aggregate filters.
940+
pub fn is_unordered_unfiltered_group_by(&self) -> bool {
942941
if self
943942
.limit_options()
944943
.and_then(|config| config.descending)
@@ -950,12 +949,7 @@ impl AggregateExec {
950949
if self.group_expr().is_empty() && !self.group_expr().has_grouping_set() {
951950
return false;
952951
}
953-
// ensure there are no aggregate expressions
954-
if !self.aggr_expr().is_empty() {
955-
return false;
956-
}
957-
// ensure there are no filters on aggregate expressions; the above check
958-
// may preclude this case
952+
// ensure there are no filters on aggregate expressions
959953
if self.filter_expr().iter().any(|e| e.is_some()) {
960954
return false;
961955
}
@@ -974,6 +968,15 @@ impl AggregateExec {
974968
true
975969
}
976970

971+
/// true, if this Aggregate has a group-by with no required or explicit ordering,
972+
/// no filtering and no aggregate expressions.
973+
///
974+
/// This method qualifies the distinct-only use of the
975+
/// LimitedDistinctAggregation rewrite rule on an AggregateExec.
976+
pub fn is_unordered_unfiltered_group_by_distinct(&self) -> bool {
977+
self.aggr_expr().is_empty() && self.is_unordered_unfiltered_group_by()
978+
}
979+
977980
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
978981
pub fn compute_properties(
979982
input: &Arc<dyn ExecutionPlan>,

0 commit comments

Comments
 (0)