Skip to content

Commit 38736c2

Browse files
Dandandanclaude
andcommitted
Resolve MIN/MAX aggregates from Parquet metadata for Single-mode and CAST projections
Two changes enable the AggregateStatistics optimizer to resolve MIN/MAX (and COUNT) from file metadata instead of scanning data: 1. `take_optimizable` now handles `Single`/`SinglePartitioned` aggregate modes, not just `Final` wrapping `Partial`. This matters for single-partition scans where the planner skips the Partial→Final split. 2. `project_statistics` now propagates min/max statistics through CAST expressions (recursively), so that projections like `CAST(CAST(col AS Int32) AS Date32)` preserve the underlying column statistics instead of returning unknown. Together these turn ClickBench Q6 (`SELECT MIN("EventDate"), MAX("EventDate") FROM hits`) from a full column scan into a metadata-only lookup (PlaceholderRowExec). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 5c653be commit 38736c2

3 files changed

Lines changed: 78 additions & 19 deletions

File tree

datafusion/physical-expr/src/projection.rs

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::ops::Deref;
2121
use std::sync::Arc;
2222

2323
use crate::PhysicalExpr;
24-
use crate::expressions::{Column, Literal};
24+
use crate::expressions::{CastExpr, Column, Literal};
2525
use crate::scalar_function::ScalarFunctionExpr;
2626
use crate::utils::collect_columns;
2727

@@ -715,9 +715,11 @@ impl ProjectionExprs {
715715
}
716716
}
717717
} else {
718-
// TODO stats: estimate more statistics from expressions
719-
// (expressions should compute their statistics themselves)
720-
ColumnStatistics::new_unknown()
718+
// Try to propagate statistics through expressions like CAST
719+
project_column_statistics_through_expr(
720+
expr.as_ref(),
721+
&mut stats.column_statistics,
722+
)
721723
};
722724
column_statistics.push(col_stats);
723725
}
@@ -727,6 +729,43 @@ impl ProjectionExprs {
727729
}
728730
}
729731

732+
/// Propagate column statistics through expressions that preserve order.
733+
///
734+
/// Currently handles:
735+
/// - `Column` references (direct passthrough)
736+
/// - `CAST` expressions (casts min/max values to the target type)
737+
///
738+
/// For other expressions, returns unknown statistics.
739+
fn project_column_statistics_through_expr(
740+
expr: &dyn PhysicalExpr,
741+
column_stats: &mut [ColumnStatistics],
742+
) -> ColumnStatistics {
743+
if let Some(col) = expr.as_any().downcast_ref::<Column>() {
744+
std::mem::take(&mut column_stats[col.index()])
745+
} else if let Some(cast_expr) = expr.as_any().downcast_ref::<CastExpr>() {
746+
let inner_stats =
747+
project_column_statistics_through_expr(cast_expr.expr.as_ref(), column_stats);
748+
let target_type = cast_expr.cast_type();
749+
ColumnStatistics {
750+
min_value: inner_stats
751+
.min_value
752+
.cast_to(target_type)
753+
.unwrap_or(Precision::Absent),
754+
max_value: inner_stats
755+
.max_value
756+
.cast_to(target_type)
757+
.unwrap_or(Precision::Absent),
758+
null_count: inner_stats.null_count,
759+
distinct_count: inner_stats.distinct_count,
760+
// Sum and byte size change under CAST, don't propagate
761+
sum_value: Precision::Absent,
762+
byte_size: Precision::Absent,
763+
}
764+
} else {
765+
ColumnStatistics::new_unknown()
766+
}
767+
}
768+
730769
impl<'a> IntoIterator for &'a ProjectionExprs {
731770
type Item = &'a ProjectionExpr;
732771
type IntoIter = std::slice::Iter<'a, ProjectionExpr>;

datafusion/physical-optimizer/src/aggregate_statistics.rs

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ use datafusion_common::Result;
2020
use datafusion_common::config::ConfigOptions;
2121
use datafusion_common::scalar::ScalarValue;
2222
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
23-
use datafusion_physical_plan::aggregates::{AggregateExec, AggregateInputMode};
23+
use datafusion_physical_plan::aggregates::{
24+
AggregateExec, AggregateInputMode, AggregateMode,
25+
};
2426
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
2527
use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
2628
use datafusion_physical_plan::udaf::{AggregateFunctionExpr, StatisticsArgs};
@@ -49,7 +51,7 @@ impl PhysicalOptimizerRule for AggregateStatistics {
4951
plan: Arc<dyn ExecutionPlan>,
5052
config: &ConfigOptions,
5153
) -> Result<Arc<dyn ExecutionPlan>> {
52-
if let Some(partial_agg_exec) = take_optimizable(&*plan) {
54+
if let Some(partial_agg_exec) = take_optimizable(&plan) {
5355
let partial_agg_exec = partial_agg_exec
5456
.downcast_ref::<AggregateExec>()
5557
.expect("take_optimizable() ensures that this is a AggregateExec");
@@ -106,19 +108,37 @@ impl PhysicalOptimizerRule for AggregateStatistics {
106108
}
107109
}
108110

109-
/// assert if the node passed as argument is a final `AggregateExec` node that can be optimized:
110-
/// - its child (with possible intermediate layers) is a partial `AggregateExec` node
111-
/// - they both have no grouping expression
111+
/// Returns an `AggregateExec` whose statistics can be used to replace the
112+
/// entire aggregate with literal values, if the plan is eligible.
112113
///
113-
/// If this is the case, return a ref to the partial `AggregateExec`, else `None`.
114-
/// We would have preferred to return a casted ref to AggregateExec but the recursion requires
115-
/// the `ExecutionPlan.children()` method that returns an owned reference.
116-
fn take_optimizable(node: &dyn ExecutionPlan) -> Option<Arc<dyn ExecutionPlan>> {
117-
if let Some(final_agg_exec) = node.downcast_ref::<AggregateExec>()
118-
&& final_agg_exec.mode().input_mode() == AggregateInputMode::Partial
119-
&& final_agg_exec.group_expr().is_empty()
114+
/// Two patterns are recognized:
115+
///
116+
/// 1. **Final wrapping Partial** (multi-partition): A final `AggregateExec`
117+
/// (input mode = `Partial`) with no GROUP BY whose descendant is a partial
118+
/// `AggregateExec` (input mode = `Raw`) with no GROUP BY and no filters.
119+
/// Returns the inner partial aggregate.
120+
///
121+
/// 2. **Single / SinglePartitioned** (single-partition): A `Single` or
122+
/// `SinglePartitioned` `AggregateExec` with no GROUP BY and no filters.
123+
/// Returns the aggregate itself.
124+
fn take_optimizable(plan: &Arc<dyn ExecutionPlan>) -> Option<Arc<dyn ExecutionPlan>> {
125+
let agg_exec = plan.downcast_ref::<AggregateExec>()?;
126+
127+
// Case 1: Single-mode aggregate — processes raw input, produces final output
128+
if matches!(
129+
agg_exec.mode(),
130+
AggregateMode::Single | AggregateMode::SinglePartitioned
131+
) && agg_exec.group_expr().is_empty()
132+
&& agg_exec.filter_expr().iter().all(|e| e.is_none())
133+
{
134+
return Some(Arc::clone(plan));
135+
}
136+
137+
// Case 2: Final aggregate wrapping a Partial aggregate
138+
if agg_exec.mode().input_mode() == AggregateInputMode::Partial
139+
&& agg_exec.group_expr().is_empty()
120140
{
121-
let mut child = Arc::clone(final_agg_exec.input());
141+
let mut child = Arc::clone(agg_exec.input());
122142
loop {
123143
if let Some(partial_agg_exec) = child.downcast_ref::<AggregateExec>()
124144
&& partial_agg_exec.mode().input_mode() == AggregateInputMode::Raw

datafusion/sqllogictest/test_files/clickbench.slt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,8 @@ logical_plan
192192
03)----Projection: CAST(CAST(hits_raw.EventDate AS Int32) AS Date32) AS EventDate
193193
04)------TableScan: hits_raw projection=[EventDate]
194194
physical_plan
195-
01)AggregateExec: mode=Single, gby=[], aggr=[min(hits.EventDate), max(hits.EventDate)]
196-
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[CAST(CAST(EventDate@5 AS Int32) AS Date32) as EventDate], file_type=parquet
195+
01)ProjectionExec: expr=[2013-07-15 as min(hits.EventDate), 2013-07-15 as max(hits.EventDate)]
196+
02)--PlaceholderRowExec
197197

198198
query DD
199199
SELECT MIN("EventDate"), MAX("EventDate") FROM hits;

0 commit comments

Comments
 (0)