From de92c7af173ec011cdb5674e824d1f73f4deb0ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 24 Apr 2026 00:04:51 +0200 Subject: [PATCH 1/2] =?UTF-8?q?feat:=20precomputed-hash=20column=20for=20P?= =?UTF-8?q?artial=E2=86=92Hash-Repartition=20shuffle?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an opt-in `datafusion.execution.emit_aggregate_group_hash` config option. When enabled, a Partial `AggregateExec` whose immediate consumer is `RepartitionExec` with `Partitioning::Hash` over the same group columns appends a trailing `__datafusion_precomputed_hash: UInt64` field to its output. The hash is computed once with `REPARTITION_RANDOM_STATE` and `RepartitionExec` consumes it directly via a new fast path, eliminating a full rehashing pass on the shuffle — biggest wins on string/binary group keys (e.g. Clickbench regexp_replace keys). Pieces: - `AggregateExec::with_emit_group_hash(bool)` rebuilds schema + cache. - `create_schema` appends the tagged `UInt64` field with source-column indices in field metadata so multi-column groups work. - `row_hash::emit` / `transform_to_states` hash the group arrays with the repartition seed and append as the last column. - `RepartitionExec::Hash` scans its input schema for the precomputed marker; matches both "partitioning expr IS the hash column" and "partitioning exprs are the recorded source columns in order". - New optimizer rule `EmitPartialAggregateHash` flips the flag when the Partial→Hash-Repartition pattern is present; config-gated so default behavior is unchanged. - Field sits at the end of the Partial's output; Final's indexing into group/state columns is unaffected and its own output schema is clean. Out of scope for this PR: - FinalPartitioned `GroupValues` reuse (requires GroupValues trait extension and a second agg-seed hash column — noted as follow-up). Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/common/src/config.rs | 7 + .../src/emit_partial_aggregate_hash.rs | 259 ++++++++++++++++++ datafusion/physical-optimizer/src/lib.rs | 1 + .../physical-optimizer/src/optimizer.rs | 7 + .../physical-plan/src/aggregates/mod.rs | 213 +++++++++++++- .../physical-plan/src/aggregates/row_hash.rs | 50 ++++ .../physical-plan/src/repartition/mod.rs | 236 +++++++++++++++- docs/source/user-guide/configs.md | 1 + 8 files changed, 758 insertions(+), 16 deletions(-) create mode 100644 datafusion/physical-optimizer/src/emit_partial_aggregate_hash.rs diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 85361ef5e17e1..fceae7e313f03 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -648,6 +648,13 @@ config_namespace! { /// aggregation ratio check and trying to switch to skipping aggregation mode pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000 + /// When `true`, a Partial `AggregateExec` feeding a Hash `RepartitionExec` + /// over the same group columns emits a trailing `UInt64` column of + /// precomputed row hashes. The `RepartitionExec` consumes it directly + /// instead of rehashing the group values, saving the per-row hash on + /// the shuffle path. Largest wins are for string/binary group keys. + pub emit_aggregate_group_hash: bool, default = false + /// Should DataFusion use row number estimates at the input to decide /// whether increasing parallelism is beneficial or not. By default, /// only exact row numbers (not estimates) are used for this decision. diff --git a/datafusion/physical-optimizer/src/emit_partial_aggregate_hash.rs b/datafusion/physical-optimizer/src/emit_partial_aggregate_hash.rs new file mode 100644 index 0000000000000..172b45d4f1992 --- /dev/null +++ b/datafusion/physical-optimizer/src/emit_partial_aggregate_hash.rs @@ -0,0 +1,259 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Physical optimizer rule that enables the precomputed-hash column on a +//! Partial `AggregateExec` whose output is immediately consumed by a +//! `RepartitionExec` with `Partitioning::Hash` over the same group columns. + +use std::any::Any; +use std::sync::Arc; + +use datafusion_common::Result; +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::{Partitioning, PhysicalExpr}; +use datafusion_physical_plan::ExecutionPlan; +use datafusion_physical_plan::aggregates::{AggregateExec, AggregateMode}; +use datafusion_physical_plan::repartition::RepartitionExec; + +use crate::PhysicalOptimizerRule; + +/// Enables [`AggregateExec::with_emit_group_hash`] on Partial aggregates whose +/// immediate downstream consumer is a `RepartitionExec` with +/// `Partitioning::Hash(exprs, _)` where `exprs` matches the Partial's group +/// columns exactly and in order. +#[derive(Default, Debug)] +pub struct EmitPartialAggregateHash {} + +impl EmitPartialAggregateHash { + #[expect(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +impl PhysicalOptimizerRule for EmitPartialAggregateHash { + fn optimize( + &self, + plan: Arc, + config: &ConfigOptions, + ) -> Result> { + if !config.execution.emit_aggregate_group_hash { + return Ok(plan); + } + plan.transform_up(|plan| { + let Some(repartition) = plan.downcast_ref::() else { + return Ok(Transformed::no(plan)); + }; + + let Partitioning::Hash(hash_exprs, _) = repartition.partitioning() else { + return Ok(Transformed::no(plan)); + }; + + let input = repartition.input(); + let Some(partial) = input.downcast_ref::() else { + return Ok(Transformed::no(plan)); + }; + + if *partial.mode() != AggregateMode::Partial + || partial.emit_group_hash() + || partial.group_expr().has_grouping_set() + { + return Ok(Transformed::no(plan)); + } + + if !hash_keys_match_group_columns(hash_exprs, partial) { + return Ok(Transformed::no(plan)); + } + + let new_partial = Arc::new(partial.clone().with_emit_group_hash(true)?) + as Arc; + let new_repartition = Arc::new(RepartitionExec::try_new( + new_partial, + repartition.partitioning().clone(), + )?) as Arc; + + Ok(Transformed::yes(new_repartition)) + }) + .data() + } + + fn name(&self) -> &str { + "EmitPartialAggregateHash" + } + + fn schema_check(&self) -> bool { + // We append a trailing column to the Partial's output schema; the + // RepartitionExec passes it through. The Final's own output schema is + // unaffected, but intermediate schemas do change — opt out of the + // global schema equality check. + false + } +} + +/// `true` when `hash_exprs` is exactly `[Column(i_0), Column(i_1), ...]` +/// matching, in order, the output-column indices that the Partial's group-by +/// emits (i.e. `0..num_group_cols`). Anything else disqualifies the match — +/// the precomputed hash covers only the group-value *arrays* emitted by the +/// Partial, so the partitioning keys must be the same projected columns. +fn hash_keys_match_group_columns( + hash_exprs: &[Arc], + partial: &AggregateExec, +) -> bool { + let group_exprs = partial.group_expr().expr(); + if hash_exprs.len() != group_exprs.len() { + return false; + } + hash_exprs.iter().enumerate().all(|(i, expr)| { + let any = expr.as_ref() as &dyn Any; + any.downcast_ref::().is_some_and(|c| c.index() == i) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_physical_expr::expressions::col; + use datafusion_physical_plan::aggregates::{ + PRECOMPUTED_GROUP_HASH_COLUMN, PhysicalGroupBy, + }; + use datafusion_physical_plan::empty::EmptyExec; + + fn config_enabled() -> ConfigOptions { + let mut cfg = ConfigOptions::default(); + cfg.execution.emit_aggregate_group_hash = true; + cfg + } + + #[test] + fn rule_is_disabled_by_default_config() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let input = Arc::new(EmptyExec::new(Arc::clone(&schema))); + let group_by = + PhysicalGroupBy::new_single(vec![(col("a", &schema)?, "a".to_string())]); + let partial = Arc::new(AggregateExec::try_new( + AggregateMode::Partial, + group_by, + vec![], + vec![], + input, + Arc::clone(&schema), + )?); + let repartition: Arc = Arc::new(RepartitionExec::try_new( + partial as Arc, + Partitioning::Hash(vec![col("a", &schema)?], 4), + )?); + let optimized = EmitPartialAggregateHash::new() + .optimize(Arc::clone(&repartition), &ConfigOptions::default())?; + let rep = optimized.downcast_ref::().unwrap(); + let partial = rep.input().downcast_ref::().unwrap(); + assert!( + !partial.emit_group_hash(), + "default config leaves the rule disabled" + ); + Ok(()) + } + + #[test] + fn enables_emit_group_hash_when_repartition_matches_groups() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Int32, false), + ])); + let input = Arc::new(EmptyExec::new(Arc::clone(&schema))); + let group_by = PhysicalGroupBy::new_single(vec![ + (col("a", &schema)?, "a".to_string()), + (col("b", &schema)?, "b".to_string()), + ]); + let partial = Arc::new(AggregateExec::try_new( + AggregateMode::Partial, + group_by, + vec![], + vec![], + input, + Arc::clone(&schema), + )?); + + let partial_schema = partial.schema(); + let hash_exprs: Vec> = + vec![col("a", &partial_schema)?, col("b", &partial_schema)?]; + let repartition: Arc = Arc::new(RepartitionExec::try_new( + partial as Arc, + Partitioning::Hash(hash_exprs, 4), + )?); + + let optimized = + EmitPartialAggregateHash::new().optimize(repartition, &config_enabled())?; + + let repartition = optimized + .downcast_ref::() + .expect("plan root should still be RepartitionExec"); + let new_partial = repartition + .input() + .downcast_ref::() + .expect("repartition input should be AggregateExec"); + assert!(new_partial.emit_group_hash()); + let schema = new_partial.schema(); + let last = schema.field(schema.fields().len() - 1); + assert_eq!(last.name(), PRECOMPUTED_GROUP_HASH_COLUMN); + Ok(()) + } + + #[test] + fn skips_when_partitioning_expr_is_not_plain_column_ref() -> Result<()> { + use datafusion_expr_common::operator::Operator; + use datafusion_physical_expr::expressions::binary; + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let input = Arc::new(EmptyExec::new(Arc::clone(&schema))); + let group_by = + PhysicalGroupBy::new_single(vec![(col("a", &schema)?, "a".to_string())]); + let partial = Arc::new(AggregateExec::try_new( + AggregateMode::Partial, + group_by, + vec![], + vec![], + input, + Arc::clone(&schema), + )?); + + let partial_schema = partial.schema(); + let plus = binary( + col("a", &partial_schema)?, + Operator::Plus, + col("a", &partial_schema)?, + &partial_schema, + )?; + let repartition: Arc = Arc::new(RepartitionExec::try_new( + partial as Arc, + Partitioning::Hash(vec![plus], 4), + )?); + + let optimized = EmitPartialAggregateHash::new() + .optimize(Arc::clone(&repartition), &config_enabled())?; + + let new_repartition = optimized.downcast_ref::().unwrap(); + let new_partial = new_repartition + .input() + .downcast_ref::() + .unwrap(); + assert!(!new_partial.emit_group_hash()); + Ok(()) + } +} diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index 5fac8948b7f04..39063a657c2ac 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -27,6 +27,7 @@ pub mod aggregate_statistics; pub mod combine_partial_final_agg; +pub mod emit_partial_aggregate_hash; pub mod enforce_distribution; pub mod enforce_sorting; pub mod ensure_coop; diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index 05df642f8446b..60262c2af1068 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use crate::aggregate_statistics::AggregateStatistics; use crate::combine_partial_final_agg::CombinePartialFinalAggregate; +use crate::emit_partial_aggregate_hash::EmitPartialAggregateHash; use crate::enforce_distribution::EnforceDistribution; use crate::enforce_sorting::EnforceSorting; use crate::ensure_coop::EnsureCooperative; @@ -225,6 +226,12 @@ impl PhysicalOptimizer { Arc::new(ProjectionPushdown::new()), // PushdownSort: Detect sorts that can be pushed down to data sources. Arc::new(PushdownSort::new()), + // Enable the precomputed-hash output on any Partial AggregateExec + // whose consumer is a Hash RepartitionExec over the same group + // columns. Runs after CombinePartialFinalAggregate (which may fuse + // Partial+Final into Single) and ProjectionPushdown (which settles + // the group-column positions). + Arc::new(EmitPartialAggregateHash::new()), Arc::new(EnsureCooperative::new()), // This FilterPushdown handles dynamic filters that may have references to the source ExecutionPlan. // Therefore, it should be run at the end of the optimization process since any changes to the plan may break the dynamic filter's references. diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 76ecb3f1485a4..2d97cbf46d894 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -90,6 +90,23 @@ const AGGREGATION_HASH_SEED: datafusion_common::hash_utils::RandomState = // This seed is chosen to be a large 64-bit number datafusion_common::hash_utils::RandomState::with_seed(15395726432021054657); +/// Name of the trailing `UInt64` column that a Partial [`AggregateExec`] may +/// emit when [`AggregateExec::with_emit_group_hash`] is set. Holds per-group +/// hashes computed with [`crate::repartition::REPARTITION_RANDOM_STATE`] so a +/// downstream [`crate::repartition::RepartitionExec`] can skip rehashing. +pub const PRECOMPUTED_GROUP_HASH_COLUMN: &str = "__datafusion_precomputed_hash"; + +/// Field-metadata key used to mark a column as a precomputed hash. +pub const PRECOMPUTED_HASH_METADATA_KEY: &str = "datafusion.precomputed_hash"; +/// Field-metadata value indicating the hash was computed with +/// `REPARTITION_RANDOM_STATE` (seed `0`) and is safe for the repartition fast path. +pub const PRECOMPUTED_HASH_REPARTITION_VALUE: &str = "repartition_seed_0"; +/// Field-metadata key whose value is a comma-separated list of column indices +/// (relative to the emitting operator's output schema) over which the hash was +/// computed, in the order passed to `create_hashes`. Consumers must match the +/// partitioning-key columns against this list exactly and in order. +pub const PRECOMPUTED_HASH_COLS_METADATA_KEY: &str = "datafusion.precomputed_hash_cols"; + /// Whether an aggregate stage consumes raw input data or intermediate /// accumulator state from a previous aggregation stage. /// @@ -670,6 +687,13 @@ pub struct AggregateExec { /// it remains `Some(..)` to enable dynamic filtering during aggregate execution; /// otherwise, it is cleared to `None`. dynamic_filter: Option>, + /// When `true` and `mode` is [`AggregateMode::Partial`], the Partial aggregate + /// appends a trailing [`PRECOMPUTED_GROUP_HASH_COLUMN`] column to each emitted + /// batch. The hash is computed with [`crate::repartition::REPARTITION_RANDOM_STATE`] + /// so a downstream `RepartitionExec` with `Partitioning::Hash` can skip rehashing. + /// + /// Only relevant for Partial mode; setting it on other modes is a no-op. + emit_group_hash: bool, } impl AggregateExec { @@ -695,6 +719,7 @@ impl AggregateExec { schema: Arc::clone(&self.schema), input_schema: Arc::clone(&self.input_schema), dynamic_filter: self.dynamic_filter.clone(), + emit_group_hash: self.emit_group_hash, } } @@ -715,6 +740,7 @@ impl AggregateExec { schema: Arc::clone(&self.schema), input_schema: Arc::clone(&self.input_schema), dynamic_filter: self.dynamic_filter.clone(), + emit_group_hash: self.emit_group_hash, } } @@ -732,7 +758,13 @@ impl AggregateExec { input_schema: SchemaRef, ) -> Result { let group_by = group_by.into(); - let schema = create_schema(&input.schema(), &group_by, &aggr_expr, mode)?; + let schema = create_schema( + &input.schema(), + &group_by, + &aggr_expr, + mode, + /* emit_group_hash */ false, + )?; let schema = Arc::new(schema); AggregateExec::try_new_with_schema( @@ -849,6 +881,7 @@ impl AggregateExec { input_order_mode, cache: Arc::new(cache), dynamic_filter: None, + emit_group_hash: false, }; exec.init_dynamic_filter(); @@ -856,6 +889,47 @@ impl AggregateExec { Ok(exec) } + /// Enable emission of a trailing precomputed-hash column on Partial batches. + /// + /// On `AggregateMode::Partial`, rebuilds `schema`/`cache` so the output + /// schema includes [`PRECOMPUTED_GROUP_HASH_COLUMN`] as its final field. + /// No-op for non-Partial modes. + pub fn with_emit_group_hash(mut self, emit: bool) -> Result { + if self.mode != AggregateMode::Partial || self.emit_group_hash == emit { + self.emit_group_hash = emit && self.mode == AggregateMode::Partial; + return Ok(self); + } + self.emit_group_hash = emit; + + let new_schema = Arc::new(create_schema( + &self.input.schema(), + &self.group_by, + &self.aggr_expr, + self.mode, + emit, + )?); + + let group_expr_mapping = + ProjectionMapping::try_new(self.group_by.expr.clone(), &self.input.schema())?; + let cache = Self::compute_properties( + &self.input, + Arc::clone(&new_schema), + &group_expr_mapping, + &self.mode, + &self.input_order_mode, + self.aggr_expr.as_ref(), + )?; + self.schema = new_schema; + self.cache = Arc::new(cache); + Ok(self) + } + + /// Returns `true` when this Partial `AggregateExec` emits the trailing + /// precomputed-hash column for the downstream repartition fast path. + pub fn emit_group_hash(&self) -> bool { + self.emit_group_hash + } + /// Aggregation mode (full, partial) pub fn mode(&self) -> &AggregateMode { &self.mode @@ -1538,6 +1612,7 @@ impl ExecutionPlan for AggregateExec { )?; me.limit_options = self.limit_options; me.dynamic_filter.clone_from(&self.dynamic_filter); + me.emit_group_hash = self.emit_group_hash; Ok(Arc::new(me)) } @@ -1711,11 +1786,16 @@ impl ExecutionPlan for AggregateExec { /// Creates the output schema for an [`AggregateExec`] containing the group by columns followed /// by the aggregate columns. +/// +/// When `emit_group_hash` is `true` and `mode` is Partial, a trailing +/// [`PRECOMPUTED_GROUP_HASH_COLUMN`] field of type `UInt64` is appended and +/// tagged with the [`PRECOMPUTED_HASH_METADATA_KEY`] field metadata. fn create_schema( input_schema: &Schema, group_by: &PhysicalGroupBy, aggr_expr: &[Arc], mode: AggregateMode, + emit_group_hash: bool, ) -> Result { let mut fields = Vec::with_capacity(group_by.num_output_exprs() + aggr_expr.len()); fields.extend(group_by.output_fields(input_schema)?); @@ -1735,6 +1815,27 @@ fn create_schema( } } + if emit_group_hash && mode == AggregateMode::Partial { + // Group columns are always at output positions 0..num_output_exprs, + // and the hash is computed over that slice. Record the indices so + // downstream consumers can match the partitioning exprs against them. + let num_group_cols = group_by.num_output_exprs(); + let cols_csv = (0..num_group_cols) + .map(|i| i.to_string()) + .collect::>() + .join(","); + let mut metadata = HashMap::new(); + metadata.insert( + PRECOMPUTED_HASH_METADATA_KEY.to_string(), + PRECOMPUTED_HASH_REPARTITION_VALUE.to_string(), + ); + metadata.insert(PRECOMPUTED_HASH_COLS_METADATA_KEY.to_string(), cols_csv); + fields.push(Arc::new( + Field::new(PRECOMPUTED_GROUP_HASH_COLUMN, DataType::UInt64, false) + .with_metadata(metadata), + )); + } + Ok(Schema::new_with_metadata( fields, input_schema.metadata().clone(), @@ -3631,6 +3732,7 @@ mod tests { &grouping_set, &aggr_expr, AggregateMode::Final, + false, )?; let expected_schema = Schema::new(vec![ Field::new("a", DataType::Float32, false), @@ -4781,4 +4883,113 @@ mod tests { Ok(()) } + + /// Partial `AggregateExec` with `emit_group_hash = true` appends a + /// `UInt64` column at the end of each batch, carrying + /// `REPARTITION_RANDOM_STATE` hashes of the group values. Verify the + /// schema marker, value correctness, and that results are unchanged. + #[tokio::test] + async fn partial_aggregate_emits_precomputed_hash() -> Result<()> { + use crate::repartition::REPARTITION_RANDOM_STATE; + use datafusion_common::hash_utils::create_hashes; + + let (input_schema, input_batches) = some_data_v2(); + let input: Arc = Arc::new(TestMemoryExec::try_new( + &[input_batches], + Arc::clone(&input_schema), + None, + )?); + + let group_by = PhysicalGroupBy::new_single(vec![( + col("a", &input_schema)?, + "a".to_string(), + )]); + let aggregates = vec![Arc::new( + AggregateExprBuilder::new(count_udaf(), vec![lit(1i8)]) + .schema(Arc::clone(&input_schema)) + .alias("COUNT(1)") + .build()?, + )]; + + let partial = AggregateExec::try_new( + AggregateMode::Partial, + group_by, + aggregates, + vec![None], + Arc::clone(&input), + Arc::clone(&input_schema), + )? + .with_emit_group_hash(true)?; + + // Schema: [a, COUNT(1)[count], __datafusion_precomputed_hash] + let schema = partial.schema(); + let last = schema.field(schema.fields().len() - 1); + assert_eq!(last.name(), PRECOMPUTED_GROUP_HASH_COLUMN); + assert_eq!(last.data_type(), &DataType::UInt64); + assert_eq!( + last.metadata() + .get(PRECOMPUTED_HASH_METADATA_KEY) + .map(String::as_str), + Some(PRECOMPUTED_HASH_REPARTITION_VALUE), + ); + + let task_ctx = Arc::new(TaskContext::default()); + let batches = crate::collect(Arc::new(partial), task_ctx).await?; + assert!(!batches.is_empty()); + + for batch in &batches { + let group_col = batch.column(0); + let hash_col = batch + .column(batch.num_columns() - 1) + .as_any() + .downcast_ref::() + .expect("precomputed hash column should be UInt64"); + + let mut expected = vec![0u64; batch.num_rows()]; + create_hashes( + &[Arc::clone(group_col)], + REPARTITION_RANDOM_STATE.random_state(), + &mut expected, + )?; + + let actual: Vec = hash_col.values().to_vec(); + assert_eq!( + actual, expected, + "precomputed hash values must match create_hashes with REPARTITION_RANDOM_STATE" + ); + } + + Ok(()) + } + + /// Setting `emit_group_hash` on a non-Partial `AggregateExec` is a no-op + /// and the output schema stays clean. + #[test] + fn emit_group_hash_is_noop_for_non_partial() -> Result<()> { + let schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, false)])); + let input = Arc::new(TestMemoryExec::try_new( + &[vec![RecordBatch::new_empty(Arc::clone(&schema))]], + Arc::clone(&schema), + None, + )?); + let group_by = + PhysicalGroupBy::new_single(vec![(col("a", &schema)?, "a".to_string())]); + let final_agg = AggregateExec::try_new( + AggregateMode::Final, + group_by, + vec![], + vec![], + input, + Arc::clone(&schema), + )? + .with_emit_group_hash(true)?; + assert!(!final_agg.emit_group_hash()); + let out = final_agg.schema(); + assert!( + out.field_with_name(PRECOMPUTED_GROUP_HASH_COLUMN).is_err(), + "non-Partial modes must not expose the precomputed-hash column" + ); + Ok(()) + } } diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 056a7f171a516..418598c709d97 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -30,6 +30,7 @@ use crate::aggregates::{ create_schema, evaluate_group_by, evaluate_many, evaluate_optional, }; use crate::metrics::{BaselineMetrics, MetricBuilder, MetricCategory, RecordOutput}; +use crate::repartition::REPARTITION_RANDOM_STATE; use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder}; use crate::spill::spill_manager::{GetSlicedSize, SpillManager}; use crate::{PhysicalExpr, aggregates, metrics}; @@ -455,6 +456,15 @@ pub(crate) struct GroupedHashAggregateStream { /// Reduction factor metric, calculated as `output_rows/input_rows` (only for partial aggregation) reduction_factor: Option, + + /// Number of group-by columns in the output schema. Used to locate + /// group-value arrays when appending the precomputed-hash column. + num_group_cols: usize, + + /// When `true`, `emit()` appends a trailing `UInt64` column with + /// `REPARTITION_RANDOM_STATE` hashes of the group-value arrays so a + /// downstream `RepartitionExec` can skip rehashing. Partial mode only. + emit_group_hash: bool, } impl GroupedHashAggregateStream { @@ -520,11 +530,15 @@ impl GroupedHashAggregateStream { // Therefore, when we spill these intermediate states or pass them to another // aggregation operator, we must use a schema that includes both the group // columns **and** the partial-state columns. + // Spill batches are read back by the merge path inside this same + // operator, not by a downstream `RepartitionExec`, so we never need + // the precomputed-hash column on disk. let spill_schema = Arc::new(create_schema( &agg.input().schema(), &agg_group_by, &aggregate_exprs, AggregateMode::Partial, + /* emit_group_hash */ false, )?); // Need to update the GROUP BY expressions to point to the correct column after schema change @@ -659,6 +673,9 @@ impl GroupedHashAggregateStream { None }; + let num_group_cols = agg.group_expr().num_output_exprs(); + let emit_group_hash = agg.emit_group_hash() && agg.mode == AggregateMode::Partial; + Ok(GroupedHashAggregateStream { schema: agg_schema, input, @@ -681,6 +698,8 @@ impl GroupedHashAggregateStream { group_values_soft_limit: agg.limit_options().map(|config| config.limit()), skip_aggregation_probe, reduction_factor, + num_group_cols, + emit_group_hash, }) } } @@ -1114,6 +1133,22 @@ impl GroupedHashAggregateStream { output.extend(acc.state(emit_to)?) } } + + // Append the precomputed-hash column for the downstream repartition + // fast path. Only on non-spill emits — spill batches stay inside this + // operator and never reach `RepartitionExec`. + if self.emit_group_hash && !spilling { + let group_arrays = &output[..self.num_group_cols]; + let num_rows = group_arrays.first().map(|a| a.len()).unwrap_or(0); + let mut hashes = vec![0u64; num_rows]; + datafusion_common::hash_utils::create_hashes( + group_arrays, + REPARTITION_RANDOM_STATE.random_state(), + &mut hashes, + )?; + output.push(Arc::new(UInt64Array::from(hashes)) as ArrayRef); + } + drop(timer); // emit reduces the memory usage. Ignore Err from update_memory_reservation. Even if it is @@ -1347,6 +1382,21 @@ impl GroupedHashAggregateStream { output.extend(acc.convert_to_state(values, opt_filter)?); } + // Match the `emit()` path: append the precomputed-hash column when + // enabled so downstream `RepartitionExec` can reuse it even for + // pass-through batches produced in skip-aggregation mode. + if self.emit_group_hash { + let group_arrays = &output[..self.num_group_cols]; + let num_rows = group_arrays.first().map(|a| a.len()).unwrap_or(0); + let mut hashes = vec![0u64; num_rows]; + datafusion_common::hash_utils::create_hashes( + group_arrays, + REPARTITION_RANDOM_STATE.random_state(), + &mut hashes, + )?; + output.push(Arc::new(UInt64Array::from(hashes)) as ArrayRef); + } + let states_batch = RecordBatch::try_new(self.schema(), output)?; Ok(states_batch) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index d4406360504f9..16a0899829147 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -44,9 +44,9 @@ use crate::{ check_if_same_properties, }; -use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions}; +use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions, UInt64Array}; use arrow::compute::take_arrays; -use arrow::datatypes::{SchemaRef, UInt32Type}; +use arrow::datatypes::{DataType, SchemaRef, UInt32Type}; use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; use datafusion_common::tree_node::TreeNodeRecursion; @@ -438,6 +438,93 @@ enum BatchPartitionerState { /// executions and runs. pub const REPARTITION_RANDOM_STATE: SeededRandomState = SeededRandomState::with_seed(0); +/// If `schema` carries a `UInt64` field tagged as a precomputed hash +/// (see `PRECOMPUTED_HASH_METADATA_KEY` and +/// `PRECOMPUTED_HASH_COLS_METADATA_KEY` in [`crate::aggregates`]) and `exprs` +/// are the `Column` references over which that hash was computed — same +/// columns, same order — returns the hash column's index. +/// +/// Matches in two shapes: +/// +/// 1. `exprs == [Column(hash_col_idx)]`. The caller already references the +/// hash column directly; we just return its index. +/// 2. `exprs == [Column(c0), Column(c1), ...]` matching the +/// `precomputed_hash_cols` list (in order). This is the common case: the +/// partitioning keys are the group columns and we swap in the hash. +/// +/// When this matches, [`BatchPartitioner`] can skip calling [`create_hashes`] +/// and use the `UInt64` array directly. +fn detect_precomputed_hash_column( + exprs: &[Arc], + schema: &arrow::datatypes::Schema, +) -> Option { + use crate::aggregates::{ + PRECOMPUTED_HASH_COLS_METADATA_KEY, PRECOMPUTED_HASH_METADATA_KEY, + PRECOMPUTED_HASH_REPARTITION_VALUE, + }; + use datafusion_physical_expr::expressions::Column; + + if exprs.is_empty() { + return None; + } + + // All partitioning exprs must be plain `Column`s for the fast path. + let expr_cols: Option> = exprs + .iter() + .map(|e| (e.as_ref() as &dyn std::any::Any).downcast_ref::()) + .collect(); + let expr_cols = expr_cols?; + + // Scan fields for the precomputed-hash marker. + for (idx, field) in schema.fields().iter().enumerate() { + if field.data_type() != &DataType::UInt64 { + continue; + } + let md = field.metadata(); + if md.get(PRECOMPUTED_HASH_METADATA_KEY).map(String::as_str) + != Some(PRECOMPUTED_HASH_REPARTITION_VALUE) + { + continue; + } + + // Shape 1: partitioning expr IS the hash column. + if expr_cols.len() == 1 && expr_cols[0].index() == idx { + return Some(idx); + } + + // Shape 2: partitioning exprs cover the recorded source columns + // in the same order. + let Some(cols_csv) = md.get(PRECOMPUTED_HASH_COLS_METADATA_KEY) else { + continue; + }; + let Some(source_cols) = parse_hash_source_cols(cols_csv) else { + continue; + }; + if source_cols.len() != expr_cols.len() { + continue; + } + let matches = expr_cols + .iter() + .zip(source_cols.iter()) + .all(|(c, &src)| c.index() == src); + if matches { + return Some(idx); + } + } + + None +} + +/// Parse a comma-separated list of column indices (as emitted into the +/// `PRECOMPUTED_HASH_COLS_METADATA_KEY` field metadata). Returns `None` on +/// malformed input rather than erroring — a bad marker just disables the +/// fast path. +fn parse_hash_source_cols(csv: &str) -> Option> { + csv.split(',') + .map(|s| s.trim().parse::().ok()) + .collect() +} + impl BatchPartitioner { /// Create a new [`BatchPartitioner`] for hash-based repartitioning. /// @@ -582,22 +669,53 @@ impl BatchPartitioner { // Tracking time required for distributing indexes across output partitions let timer = self.timer.timer(); - let arrays = - evaluate_expressions_to_arrays(exprs.as_slice(), &batch)?; - - hash_buffer.clear(); - hash_buffer.resize(batch.num_rows(), 0); - - create_hashes( - &arrays, - REPARTITION_RANDOM_STATE.random_state(), - hash_buffer, - )?; + // Fast path: a single column expression that points at a + // UInt64 field tagged as a precomputed hash (produced by a + // Partial `AggregateExec` with `emit_group_hash = true`). + // The values were hashed with `REPARTITION_RANDOM_STATE` + // upstream, so we can consume them directly. + let precomputed_hash_col = detect_precomputed_hash_column( + exprs.as_slice(), + batch.schema().as_ref(), + ); indices.iter_mut().for_each(|v| v.clear()); - for (index, hash) in hash_buffer.iter().enumerate() { - indices[(*hash % *partitions as u64) as usize].push(index as u32); + if let Some(col_idx) = precomputed_hash_col { + let column = batch.column(col_idx); + let hashes = column + .as_any() + .downcast_ref::() + .ok_or_else(|| { + internal_datafusion_err!( + "precomputed hash column is not UInt64Array" + ) + })?; + for row in 0..hashes.len() { + // Precomputed hashes are produced by `create_hashes` + // over a non-nullable UInt64 field, so the values + // are always valid. + let hash = hashes.value(row); + indices[(hash % *partitions as u64) as usize] + .push(row as u32); + } + } else { + let arrays = + evaluate_expressions_to_arrays(exprs.as_slice(), &batch)?; + + hash_buffer.clear(); + hash_buffer.resize(batch.num_rows(), 0); + + create_hashes( + &arrays, + REPARTITION_RANDOM_STATE.random_state(), + hash_buffer, + )?; + + for (index, hash) in hash_buffer.iter().enumerate() { + indices[(*hash % *partitions as u64) as usize] + .push(index as u32); + } } // Finished building index-arrays for output partitions @@ -1901,6 +2019,94 @@ mod tests { Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])) } + /// `detect_precomputed_hash_column` recognizes the precomputed-hash marker + /// in both shapes: (a) partitioning expr IS the hash column, and + /// (b) partitioning exprs are the recorded source columns in order. + #[test] + fn detect_precomputed_hash_column_matches_marker() { + use crate::aggregates::{ + PRECOMPUTED_HASH_COLS_METADATA_KEY, PRECOMPUTED_HASH_METADATA_KEY, + PRECOMPUTED_HASH_REPARTITION_VALUE, + }; + use datafusion_physical_expr::expressions::Column; + use std::collections::HashMap; + + // Single-column group: hash computed over column 0. + let mut md = HashMap::new(); + md.insert( + PRECOMPUTED_HASH_METADATA_KEY.to_string(), + PRECOMPUTED_HASH_REPARTITION_VALUE.to_string(), + ); + md.insert( + PRECOMPUTED_HASH_COLS_METADATA_KEY.to_string(), + "0".to_string(), + ); + let schema = Schema::new(vec![ + Field::new("group", DataType::Utf8, true), + Field::new("hash", DataType::UInt64, false).with_metadata(md), + ]); + + // Shape (a): partitioning expr IS the hash column. + let hash_col: Arc = Arc::new(Column::new("hash", 1)); + assert_eq!( + detect_precomputed_hash_column(&[Arc::clone(&hash_col)], &schema), + Some(1) + ); + + // Shape (b): partitioning expr is the source group column; detection + // finds the tagged hash field by scanning schema. + let group_col: Arc = Arc::new(Column::new("group", 0)); + assert_eq!( + detect_precomputed_hash_column(&[Arc::clone(&group_col)], &schema), + Some(1) + ); + + // Multi-column group: hash covers columns 0, 1, 2. + let mut md3 = HashMap::new(); + md3.insert( + PRECOMPUTED_HASH_METADATA_KEY.to_string(), + PRECOMPUTED_HASH_REPARTITION_VALUE.to_string(), + ); + md3.insert( + PRECOMPUTED_HASH_COLS_METADATA_KEY.to_string(), + "0,1,2".to_string(), + ); + let schema3 = Schema::new(vec![ + Field::new("a", DataType::Utf8, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Int64, true), + Field::new("hash", DataType::UInt64, false).with_metadata(md3), + ]); + let a: Arc = Arc::new(Column::new("a", 0)); + let b: Arc = Arc::new(Column::new("b", 1)); + let c: Arc = Arc::new(Column::new("c", 2)); + assert_eq!( + detect_precomputed_hash_column( + &[Arc::clone(&a), Arc::clone(&b), Arc::clone(&c)], + &schema3, + ), + Some(3), + "multi-column partitioning should match the recorded source columns" + ); + + // Wrong order — refuse to match (would give incorrect routing). + assert_eq!( + detect_precomputed_hash_column( + &[Arc::clone(&b), Arc::clone(&a), Arc::clone(&c)], + &schema3, + ), + None, + "order mismatch must disable the fast path" + ); + + // Subset — refuse to match. + assert_eq!( + detect_precomputed_hash_column(&[a, b], &schema3), + None, + "subset of source columns must disable the fast path" + ); + } + async fn repartition( schema: &SchemaRef, input_partitions: Vec>, diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 46039f3c99c27..929cd811202f7 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -131,6 +131,7 @@ The following configuration settings are available: | datafusion.execution.keep_partition_by_columns | false | Should DataFusion keep the columns used for partition_by in the output RecordBatches | | datafusion.execution.skip_partial_aggregation_probe_ratio_threshold | 0.8 | Aggregation ratio (number of distinct groups / number of input rows) threshold for skipping partial aggregation. If the value is greater then partial aggregation will skip aggregation for further input | | datafusion.execution.skip_partial_aggregation_probe_rows_threshold | 100000 | Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode | +| datafusion.execution.emit_aggregate_group_hash | false | When `true`, a Partial `AggregateExec` feeding a Hash `RepartitionExec` over the same group columns emits a trailing `UInt64` column of precomputed row hashes. The `RepartitionExec` consumes it directly instead of rehashing the group values, saving the per-row hash on the shuffle path. Largest wins are for string/binary group keys. | | datafusion.execution.use_row_number_estimates_to_optimize_partitioning | false | Should DataFusion use row number estimates at the input to decide whether increasing parallelism is beneficial or not. By default, only exact row numbers (not estimates) are used for this decision. Setting this flag to `true` will likely produce better plans. if the source of statistics is accurate. We plan to make this the default in the future. | | datafusion.execution.enforce_batch_size_in_joins | false | Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower. | | datafusion.execution.objectstore_writer_buffer_size | 10485760 | Size (bytes) of data buffer DataFusion uses when writing output files. This affects the size of the data chunks that are uploaded to remote object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being written, it may be necessary to increase this size to avoid errors from the remote end point. | From ac295b22877db36ffc0601c5e38c61f89ec3a78c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 24 Apr 2026 00:14:50 +0200 Subject: [PATCH 2/2] feat: reuse precomputed hash in FinalPartitioned + default emit=true MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extends the precomputed-hash pipeline through the Final side: - Unify `AGGREGATION_HASH_SEED` with `REPARTITION_RANDOM_STATE` (seed 0) so a single hash value is valid for the Partial's hash table, RepartitionExec routing, and the Final's hash-table probe. Hashbrown selects buckets from high bits while routing uses low bits, so reuse is safe in practice. - Add `GroupValues::intern_with_hashes(cols, hashes, groups)` with a default implementation that falls back to `intern`. Override on `GroupValuesRows` (the row-based general path) to skip the `create_hashes` call; factored the probe/insert loop into `GroupValuesRows::intern_rows`. - Wire `GroupedHashAggregateStream`: when the Final-side input schema carries a `UInt64` field tagged with the precomputed-hash metadata, extract the array and feed its values into `intern_with_hashes`. - Flip `datafusion.execution.emit_aggregate_group_hash` default to `true`. The rule is still gated by the config so it's trivially disabled per session if needed. Other `GroupValues` specializations (single-column bytes/primitive/ boolean, multi-column composite) pick up the fallback behavior — they still work correctly, just without the Final-side saving until each implementation opts in. Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/common/src/config.rs | 8 +- .../src/emit_partial_aggregate_hash.rs | 10 +- .../src/aggregates/group_values/mod.rs | 16 +++ .../src/aggregates/group_values/row.rs | 126 ++++++++++++------ .../physical-plan/src/aggregates/mod.rs | 9 +- .../physical-plan/src/aggregates/row_hash.rs | 69 +++++++++- docs/source/user-guide/configs.md | 2 +- 7 files changed, 184 insertions(+), 56 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index fceae7e313f03..8484f6faf4947 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -651,9 +651,11 @@ config_namespace! { /// When `true`, a Partial `AggregateExec` feeding a Hash `RepartitionExec` /// over the same group columns emits a trailing `UInt64` column of /// precomputed row hashes. The `RepartitionExec` consumes it directly - /// instead of rehashing the group values, saving the per-row hash on - /// the shuffle path. Largest wins are for string/binary group keys. - pub emit_aggregate_group_hash: bool, default = false + /// instead of rehashing the group values, and the downstream + /// `FinalPartitioned` aggregate reuses it in its hash-table probing, + /// saving two full rehashing passes. Largest wins are for + /// string/binary group keys. + pub emit_aggregate_group_hash: bool, default = true /// Should DataFusion use row number estimates at the input to decide /// whether increasing parallelism is beneficial or not. By default, diff --git a/datafusion/physical-optimizer/src/emit_partial_aggregate_hash.rs b/datafusion/physical-optimizer/src/emit_partial_aggregate_hash.rs index 172b45d4f1992..466c3eaadfee2 100644 --- a/datafusion/physical-optimizer/src/emit_partial_aggregate_hash.rs +++ b/datafusion/physical-optimizer/src/emit_partial_aggregate_hash.rs @@ -142,7 +142,7 @@ mod tests { } #[test] - fn rule_is_disabled_by_default_config() -> Result<()> { + fn rule_respects_disabled_config() -> Result<()> { let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); let input = Arc::new(EmptyExec::new(Arc::clone(&schema))); let group_by = @@ -159,13 +159,15 @@ mod tests { partial as Arc, Partitioning::Hash(vec![col("a", &schema)?], 4), )?); - let optimized = EmitPartialAggregateHash::new() - .optimize(Arc::clone(&repartition), &ConfigOptions::default())?; + let mut cfg = ConfigOptions::default(); + cfg.execution.emit_aggregate_group_hash = false; + let optimized = + EmitPartialAggregateHash::new().optimize(Arc::clone(&repartition), &cfg)?; let rep = optimized.downcast_ref::().unwrap(); let partial = rep.input().downcast_ref::().unwrap(); assert!( !partial.emit_group_hash(), - "default config leaves the rule disabled" + "explicit disable leaves the rule off" ); Ok(()) } diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index 2f3b1a19e7d73..feb11c1de9819 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -99,6 +99,22 @@ pub trait GroupValues: Send { /// assigned. fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()>; + /// Same as [`Self::intern`] but the caller has already computed per-row + /// hashes of `cols` using [`AGGREGATION_HASH_SEED`]. Implementations that + /// can reuse externally computed hashes should override this to skip + /// recomputation. The default implementation ignores `hashes` and calls + /// [`Self::intern`]. + /// + /// [`AGGREGATION_HASH_SEED`]: super::AGGREGATION_HASH_SEED + fn intern_with_hashes( + &mut self, + cols: &[ArrayRef], + _hashes: &[u64], + groups: &mut Vec, + ) -> Result<()> { + self.intern(cols, groups) + } + /// Returns the number of bytes of memory used by this [`GroupValues`] fn size(&self) -> usize; diff --git a/datafusion/physical-plan/src/aggregates/group_values/row.rs b/datafusion/physical-plan/src/aggregates/group_values/row.rs index a3bd31f76c233..d6ddead2e9714 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/row.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/row.rs @@ -116,63 +116,52 @@ impl GroupValuesRows { impl GroupValues for GroupValuesRows { fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()> { - // Convert the group keys into the row format let group_rows = &mut self.rows_buffer; group_rows.clear(); self.row_converter.append(group_rows, cols)?; let n_rows = group_rows.num_rows(); - let mut group_values = match self.group_values.take() { - Some(group_values) => group_values, - None => self.row_converter.empty_rows(0, 0), - }; - - // tracks to which group each of the input rows belongs - groups.clear(); - - // 1.1 Calculate the group keys for the group values let batch_hashes = &mut self.hashes_buffer; batch_hashes.clear(); batch_hashes.resize(n_rows, 0); create_hashes(cols, &self.random_state, batch_hashes)?; - for (row, &target_hash) in batch_hashes.iter().enumerate() { - let entry = self.map.find_mut(target_hash, |(exist_hash, group_idx)| { - // Somewhat surprisingly, this closure can be called even if the - // hash doesn't match, so check the hash first with an integer - // comparison first avoid the more expensive comparison with - // group value. https://github.com/apache/datafusion/pull/11718 - target_hash == *exist_hash - // verify that the group that we are inserting with hash is - // actually the same key value as the group in - // existing_idx (aka group_values @ row) - && group_rows.row(row) == group_values.row(*group_idx) - }); - - let group_idx = match entry { - // Existing group_index for this group value - Some((_hash, group_idx)) => *group_idx, - // 1.2 Need to create new entry for the group - None => { - // Add new entry to aggr_state and save newly created index - let group_idx = group_values.num_rows(); - group_values.push(group_rows.row(row)); + Self::intern_rows( + &mut self.map, + &mut self.map_size, + &mut self.group_values, + &self.row_converter, + group_rows, + batch_hashes, + groups, + ) + } - // for hasher function, use precomputed hash value - self.map.insert_accounted( - (target_hash, group_idx), - |(hash, _group_index)| *hash, - &mut self.map_size, - ); - group_idx - } - }; - groups.push(group_idx); + fn intern_with_hashes( + &mut self, + cols: &[ArrayRef], + hashes: &[u64], + groups: &mut Vec, + ) -> Result<()> { + // Fallback: if hashes don't match the input rows, drop to recomputing. + let num_rows = cols.first().map(|a| a.len()).unwrap_or(0); + if hashes.len() != num_rows { + return self.intern(cols, groups); } - self.group_values = Some(group_values); + let group_rows = &mut self.rows_buffer; + group_rows.clear(); + self.row_converter.append(group_rows, cols)?; - Ok(()) + Self::intern_rows( + &mut self.map, + &mut self.map_size, + &mut self.group_values, + &self.row_converter, + group_rows, + hashes, + groups, + ) } fn size(&self) -> usize { @@ -259,6 +248,57 @@ impl GroupValues for GroupValuesRows { } } +impl GroupValuesRows { + /// Hash-table probe/insert loop shared by [`GroupValues::intern`] and + /// [`GroupValues::intern_with_hashes`]. `batch_hashes` must have the same + /// length as `group_rows` and must be keyed with [`AGGREGATION_HASH_SEED`]. + /// + /// [`AGGREGATION_HASH_SEED`]: crate::aggregates::AGGREGATION_HASH_SEED + fn intern_rows( + map: &mut HashTable<(u64, usize)>, + map_size: &mut usize, + group_values_slot: &mut Option, + row_converter: &RowConverter, + group_rows: &Rows, + batch_hashes: &[u64], + groups: &mut Vec, + ) -> Result<()> { + let mut group_values = match group_values_slot.take() { + Some(group_values) => group_values, + None => row_converter.empty_rows(0, 0), + }; + + groups.clear(); + + for (row, &target_hash) in batch_hashes.iter().enumerate() { + let entry = map.find_mut(target_hash, |(exist_hash, group_idx)| { + // Integer compare first, row-equality only on hash match. + // https://github.com/apache/datafusion/pull/11718 + target_hash == *exist_hash + && group_rows.row(row) == group_values.row(*group_idx) + }); + + let group_idx = match entry { + Some((_hash, group_idx)) => *group_idx, + None => { + let group_idx = group_values.num_rows(); + group_values.push(group_rows.row(row)); + map.insert_accounted( + (target_hash, group_idx), + |(hash, _group_index)| *hash, + map_size, + ); + group_idx + } + }; + groups.push(group_idx); + } + + *group_values_slot = Some(group_values); + Ok(()) + } +} + fn dictionary_encode_if_necessary( array: &ArrayRef, expected: &DataType, diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 2d97cbf46d894..7abb1bd533055 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -85,10 +85,13 @@ pub fn topk_types_supported(key_type: &DataType, value_type: &DataType) -> bool is_supported_hash_key_type(key_type) && is_supported_heap_type(value_type) } -/// Hard-coded seed for aggregations to ensure hash values differ from `RepartitionExec`, avoiding collisions. +/// Hash seed shared with [`crate::repartition::REPARTITION_RANDOM_STATE`]. Using +/// the same seed across the Partial's internal hash table, the emitted +/// precomputed-hash column, `RepartitionExec` routing, and the Final's hash +/// table lets a single hash value flow end-to-end. Hashbrown bucket selection +/// uses high bits while routing uses low bits, so reuse is safe in practice. const AGGREGATION_HASH_SEED: datafusion_common::hash_utils::RandomState = - // This seed is chosen to be a large 64-bit number - datafusion_common::hash_utils::RandomState::with_seed(15395726432021054657); + datafusion_common::hash_utils::RandomState::with_seed(0); /// Name of the trailing `UInt64` column that a Partial [`AggregateExec`] may /// emit when [`AggregateExec::with_emit_group_hash`] is set. Holds per-group diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 418598c709d97..a7130be897a6f 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -465,6 +465,13 @@ pub(crate) struct GroupedHashAggregateStream { /// `REPARTITION_RANDOM_STATE` hashes of the group-value arrays so a /// downstream `RepartitionExec` can skip rehashing. Partial mode only. emit_group_hash: bool, + + /// If `Some(idx)`, incoming batches carry a precomputed-hash column at + /// `idx` (produced by a Partial `AggregateExec` with `emit_group_hash` + /// enabled). We extract it in `group_aggregate_batch` and feed it to + /// [`super::group_values::GroupValues::intern_with_hashes`] to skip the + /// Final-side rehashing pass. + precomputed_hash_input_col: Option, } impl GroupedHashAggregateStream { @@ -676,6 +683,15 @@ impl GroupedHashAggregateStream { let num_group_cols = agg.group_expr().num_output_exprs(); let emit_group_hash = agg.emit_group_hash() && agg.mode == AggregateMode::Partial; + // Detect a precomputed-hash column in the input schema (emitted by a + // Partial upstream). We only consume it when we ourselves are in a + // non-Partial mode — a Partial fed precomputed hashes is unusual. + let precomputed_hash_input_col = if agg.mode != AggregateMode::Partial { + detect_precomputed_hash_column_in_input(&agg.input.schema()) + } else { + None + }; + Ok(GroupedHashAggregateStream { schema: agg_schema, input, @@ -700,10 +716,35 @@ impl GroupedHashAggregateStream { reduction_factor, num_group_cols, emit_group_hash, + precomputed_hash_input_col, }) } } +/// Scans `schema` for a `UInt64` field tagged with the precomputed-hash +/// metadata and returns its column index. Used by Final-side stream +/// construction to know whether it can skip rehashing in `GroupValues::intern`. +fn detect_precomputed_hash_column_in_input( + schema: &arrow::datatypes::Schema, +) -> Option { + use crate::aggregates::{ + PRECOMPUTED_HASH_METADATA_KEY, PRECOMPUTED_HASH_REPARTITION_VALUE, + }; + use arrow::datatypes::DataType; + for (idx, field) in schema.fields().iter().enumerate() { + if field.data_type() == &DataType::UInt64 + && field + .metadata() + .get(PRECOMPUTED_HASH_METADATA_KEY) + .map(String::as_str) + == Some(PRECOMPUTED_HASH_REPARTITION_VALUE) + { + return Some(idx); + } + } + None +} + /// Create an accumulator for `agg_expr` -- a [`GroupsAccumulator`] if /// that is supported by the aggregate, or a /// [`GroupsAccumulatorAdapter`] if not. @@ -963,13 +1004,37 @@ impl GroupedHashAggregateStream { evaluate_optional(&self.filter_expressions, batch)? }; + // Extract precomputed hashes once per batch, if the upstream Partial + // emitted them. Spill-merge input won't carry the column (spilled + // batches use a hash-free schema), so keep the check batch-local. + let precomputed_hashes: Option = + if !self.spill_state.is_stream_merging { + self.precomputed_hash_input_col.and_then(|idx| { + batch + .column(idx) + .as_any() + .downcast_ref::() + .cloned() + }) + } else { + None + }; + for group_values in &group_by_values { let groups_start_time = Instant::now(); // calculate the group indices for each input row let starting_num_groups = self.group_values.len(); - self.group_values - .intern(group_values, &mut self.current_group_indices)?; + if let Some(hashes) = precomputed_hashes.as_ref() { + self.group_values.intern_with_hashes( + group_values, + hashes.values(), + &mut self.current_group_indices, + )?; + } else { + self.group_values + .intern(group_values, &mut self.current_group_indices)?; + } let group_indices = &self.current_group_indices; // Update ordering information if necessary diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 929cd811202f7..3df6b6a779041 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -131,7 +131,7 @@ The following configuration settings are available: | datafusion.execution.keep_partition_by_columns | false | Should DataFusion keep the columns used for partition_by in the output RecordBatches | | datafusion.execution.skip_partial_aggregation_probe_ratio_threshold | 0.8 | Aggregation ratio (number of distinct groups / number of input rows) threshold for skipping partial aggregation. If the value is greater then partial aggregation will skip aggregation for further input | | datafusion.execution.skip_partial_aggregation_probe_rows_threshold | 100000 | Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode | -| datafusion.execution.emit_aggregate_group_hash | false | When `true`, a Partial `AggregateExec` feeding a Hash `RepartitionExec` over the same group columns emits a trailing `UInt64` column of precomputed row hashes. The `RepartitionExec` consumes it directly instead of rehashing the group values, saving the per-row hash on the shuffle path. Largest wins are for string/binary group keys. | +| datafusion.execution.emit_aggregate_group_hash | true | When `true`, a Partial `AggregateExec` feeding a Hash `RepartitionExec` over the same group columns emits a trailing `UInt64` column of precomputed row hashes. The `RepartitionExec` consumes it directly instead of rehashing the group values, and the downstream `FinalPartitioned` aggregate reuses it in its hash-table probing, saving two full rehashing passes. Largest wins are for string/binary group keys. | | datafusion.execution.use_row_number_estimates_to_optimize_partitioning | false | Should DataFusion use row number estimates at the input to decide whether increasing parallelism is beneficial or not. By default, only exact row numbers (not estimates) are used for this decision. Setting this flag to `true` will likely produce better plans. if the source of statistics is accurate. We plan to make this the default in the future. | | datafusion.execution.enforce_batch_size_in_joins | false | Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower. | | datafusion.execution.objectstore_writer_buffer_size | 10485760 | Size (bytes) of data buffer DataFusion uses when writing output files. This affects the size of the data chunks that are uploaded to remote object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being written, it may be necessary to increase this size to avoid errors from the remote end point. |