Skip to content

Commit 7c3ea05

Browse files
authored
feat: add AggregateMode::PartialReduce for tree-reduce aggregation (#20019)
DataFusion's current `AggregateMode` enum has four variants covering three of the four cells in the input/output matrix: | | Input: raw data | Input: partial state | | - | - | - | | Output: final values | `Single` / `SinglePartitioned` | `Final` / `FinalPartitioned` | | Output: partial state | `Partial` | ??? | This PR adds `AggregateMode::PartialReduce` to fill in the missing cell: it takes partially-reduced values as input, and reduces them further, but without finalizing. This is useful because it's the key component needed to implement distributed tree-reduction (as seen in e.g. the Scuba or Honeycomb papers): a set of worker nodes each perform multithreaded `Partial` aggregations, feed those into a `PartialReduce` to reduce all of this node's values into a single row, and then a head node collects the outputs from all nodes' `PartialReduce` to feed into a `Final` reduction. PR can be reviewed commit by commit: first commit is pure refactor/simplification; most places we were matching on `AggregateMode` we were actually just trying to either check which row of the above table we were in, or else which column. So now we have `is_first_stage` (tells you which column) and `is_last_stage` (tells you which row) and we use them everywhere. Second commit adds `PartialReduce`, and is pretty small because `is_first_stage`/`is_last_stage` do most of the heavy lifting. It also adds a test demonstrating a minimal Partial -> PartialReduce -> Final tree-reduction.
1 parent f997169 commit 7c3ea05

9 files changed

Lines changed: 304 additions & 108 deletions

File tree

datafusion/physical-optimizer/src/aggregate_statistics.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use datafusion_common::Result;
2020
use datafusion_common::config::ConfigOptions;
2121
use datafusion_common::scalar::ScalarValue;
2222
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
23-
use datafusion_physical_plan::aggregates::AggregateExec;
23+
use datafusion_physical_plan::aggregates::{AggregateExec, AggregateInputMode};
2424
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
2525
use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
2626
use datafusion_physical_plan::udaf::{AggregateFunctionExpr, StatisticsArgs};
@@ -116,13 +116,13 @@ impl PhysicalOptimizerRule for AggregateStatistics {
116116
/// the `ExecutionPlan.children()` method that returns an owned reference.
117117
fn take_optimizable(node: &dyn ExecutionPlan) -> Option<Arc<dyn ExecutionPlan>> {
118118
if let Some(final_agg_exec) = node.as_any().downcast_ref::<AggregateExec>()
119-
&& !final_agg_exec.mode().is_first_stage()
119+
&& final_agg_exec.mode().input_mode() == AggregateInputMode::Partial
120120
&& final_agg_exec.group_expr().is_empty()
121121
{
122122
let mut child = Arc::clone(final_agg_exec.input());
123123
loop {
124124
if let Some(partial_agg_exec) = child.as_any().downcast_ref::<AggregateExec>()
125-
&& partial_agg_exec.mode().is_first_stage()
125+
&& partial_agg_exec.mode().input_mode() == AggregateInputMode::Raw
126126
&& partial_agg_exec.group_expr().is_empty()
127127
&& partial_agg_exec.filter_expr().iter().all(|e| e.is_none())
128128
{

datafusion/physical-optimizer/src/update_aggr_exprs.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
2525
use datafusion_common::{Result, plan_datafusion_err};
2626
use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
2727
use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortRequirement};
28-
use datafusion_physical_plan::aggregates::{AggregateExec, concat_slices};
28+
use datafusion_physical_plan::aggregates::{
29+
AggregateExec, AggregateInputMode, concat_slices,
30+
};
2931
use datafusion_physical_plan::windows::get_ordered_partition_by_indices;
3032
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
3133

@@ -81,7 +83,7 @@ impl PhysicalOptimizerRule for OptimizeAggregateOrder {
8183
// ordering fields may be pruned out by first stage aggregates.
8284
// Hence, necessary information for proper merge is added during
8385
// the first stage to the state field, which the final stage uses.
84-
if !aggr_exec.mode().is_first_stage() {
86+
if aggr_exec.mode().input_mode() == AggregateInputMode::Partial {
8587
return Ok(Transformed::no(plan));
8688
}
8789
let input = aggr_exec.input();

0 commit comments

Comments
 (0)