diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 0f38988c69405..cc8e2e4874b20 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -2482,6 +2482,7 @@ impl DataFrame { relation: None, name: field.name().to_string(), metadata: None, + is_internal: false, }), Err(_) => col(field.name()), } diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index a025446aa37e8..b26e88bdb4abe 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -2383,10 +2383,10 @@ pub fn create_window_expr( ) -> Result> { // unpack aliased logical expressions, e.g. "sum(col) over () as total" let (name, e) = match e { - Expr::Alias(Alias { expr, name, .. }) => (name.clone(), expr.as_ref()), - _ => (e.schema_name().to_string(), e), + Expr::Alias(alias) => (alias.name.clone(), e.clone().unalias_nested().data), + _ => (e.schema_name().to_string(), e.clone()), }; - create_window_expr_with_name(e, name, logical_schema, execution_props) + create_window_expr_with_name(&e, name, logical_schema, execution_props) } type AggregateExprWithOptionalArgs = ( @@ -2401,7 +2401,7 @@ type AggregateExprWithOptionalArgs = ( pub fn create_aggregate_expr_with_name_and_maybe_filter( e: &Expr, name: Option, - human_displan: String, + human_display: Option, logical_input_schema: &DFSchema, physical_input_schema: &Schema, execution_props: &ExecutionProps, @@ -2445,16 +2445,17 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter( execution_props, )?; - let agg_expr = + let mut builder = AggregateExprBuilder::new(func.to_owned(), physical_args.to_vec()) .order_by(order_bys.clone()) .schema(Arc::new(physical_input_schema.to_owned())) .alias(name) - .human_display(human_displan) .with_ignore_nulls(ignore_nulls) - .with_distinct(*distinct) - .build() - .map(Arc::new)?; + .with_distinct(*distinct); + if let Some(human_display) = human_display { + builder = builder.human_display(human_display); + } + let agg_expr = builder.build().map(Arc::new)?; (agg_expr, filter, order_bys) }; @@ -2472,20 +2473,28 @@ pub fn create_aggregate_expr_and_maybe_filter( physical_input_schema: &Schema, execution_props: &ExecutionProps, ) -> Result { - // Unpack (potentially nested) aliased logical expressions, e.g. "sum(col) as total" - // Some functions like `count_all()` create internal aliases, - // Unwrap all alias layers to get to the underlying aggregate function + // Unpack (potentially nested) aliased logical expressions, e.g. "sum(col) as total". + // Physical explain prefers the lowered aggregate form, so unwrap all alias + // layers to recover the underlying aggregate function and then re-attach + // only the visible output alias. let (name, human_display, e) = match e { - Expr::Alias(Alias { name, .. }) => { + Expr::Alias(alias) => { let unaliased = e.clone().unalias_nested().data; - (Some(name.clone()), e.human_display().to_string(), unaliased) + let human_display = unaliased.human_display().to_string(); + let human_display = if human_display.is_empty() || human_display == alias.name + { + alias.name.clone() + } else { + format!("{human_display} as {}", alias.name) + }; + (Some(alias.name.clone()), Some(human_display), unaliased) } Expr::AggregateFunction(_) => ( Some(e.schema_name().to_string()), - e.human_display().to_string(), + Some(e.human_display().to_string()), e.clone(), ), - _ => (None, String::default(), e.clone()), + _ => (None, None, e.clone()), }; create_aggregate_expr_with_name_and_maybe_filter( @@ -3106,6 +3115,7 @@ impl<'n> TreeNodeVisitor<'n> for InvariantChecker { mod tests { use std::cmp::Ordering; use std::fmt::{self, Debug}; + use std::mem::size_of_val; use std::ops::{BitAnd, Not}; use super::*; @@ -3121,19 +3131,22 @@ mod tests { use crate::execution::session_state::SessionStateBuilder; use arrow::array::{ArrayRef, DictionaryArray, Int32Array}; use arrow::datatypes::{DataType, Field, Int32Type}; - use arrow_schema::SchemaRef; + use arrow_schema::{FieldRef, SchemaRef}; use datafusion_common::config::ConfigOptions; use datafusion_common::{ - DFSchemaRef, TableReference, ToDFSchema as _, assert_contains, + DFSchemaRef, ScalarValue, TableReference, ToDFSchema as _, assert_contains, }; use datafusion_execution::TaskContext; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_expr::builder::subquery_alias; + use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::{ - LogicalPlanBuilder, TableSource, UserDefinedLogicalNodeCore, col, lit, + Accumulator, AggregateUDF, AggregateUDFImpl, ExprFunctionExt, LogicalPlanBuilder, + Signature, TableSource, UserDefinedLogicalNodeCore, Volatility, col, lit, }; use datafusion_functions_aggregate::count::count_all; use datafusion_functions_aggregate::expr_fn::sum; + use datafusion_functions_aggregate::first_last::first_value_udaf; use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; @@ -3158,6 +3171,83 @@ mod tests { .await } + async fn aggregate_explain(logical_plan: &LogicalPlan) -> Result { + let physical_plan = plan(logical_plan).await?; + Ok(displayable(physical_plan.as_ref()).indent(true).to_string()) + } + + #[derive(Debug, Default)] + struct NullAccumulator; + + impl Accumulator for NullAccumulator { + fn state(&mut self) -> Result> { + Ok(vec![self.evaluate()?]) + } + + fn update_batch(&mut self, _values: &[ArrayRef]) -> Result<()> { + Ok(()) + } + + fn merge_batch(&mut self, _states: &[ArrayRef]) -> Result<()> { + Ok(()) + } + + fn evaluate(&mut self) -> Result { + Ok(ScalarValue::Int64(None)) + } + + fn size(&self) -> usize { + size_of_val(self) + } + } + + #[derive(Debug, Clone, PartialEq, Eq, Hash)] + struct CustomHumanDisplayUdaf { + signature: Signature, + } + + impl CustomHumanDisplayUdaf { + fn new() -> Self { + Self { + signature: Signature::exact(vec![DataType::Int64], Volatility::Immutable), + } + } + } + + impl AggregateUDFImpl for CustomHumanDisplayUdaf { + fn name(&self) -> &str { + "custom_human_display_udaf" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Int64) + } + + fn accumulator( + &self, + _acc_args: AccumulatorArgs, + ) -> Result> { + Ok(Box::new(NullAccumulator)) + } + + fn state_fields(&self, _args: StateFieldsArgs) -> Result> { + Ok(vec![ + Field::new("custom_state", DataType::Int64, true).into(), + ]) + } + + fn human_display(&self, params: &AggregateFunctionParams) -> Result { + Ok(format!( + "custom_display({})", + params.args[0].human_display() + )) + } + } + #[tokio::test] async fn test_all_operators() -> Result<()> { let logical_plan = test_csv_scan() @@ -3769,6 +3859,142 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_aggregate_explain_shows_quoted_user_alias() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new( + "column1", + DataType::Int64, + false, + )])); + + let logical_plan = scan_empty(None, schema.as_ref(), None)? + .aggregate( + Vec::::new(), + vec![sum(col("column1")).alias("total rows")], + )? + .build()?; + + assert_contains!( + aggregate_explain(&logical_plan).await?, + "AggregateExec: mode=Single, gby=[], aggr=[sum(?table?.column1) as total rows]" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_aggregate_explain_shows_aliased_filter_expression() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("column1", DataType::Int64, false), + Field::new("column2", DataType::Int64, false), + ])); + + let logical_plan = scan_empty(None, schema.as_ref(), None)? + .aggregate( + Vec::::new(), + vec![ + sum(col("column1")) + .filter(col("column2").lt_eq(lit(0_i64))) + .build()? + .alias("agg"), + ], + )? + .build()?; + + assert_contains!( + aggregate_explain(&logical_plan).await?, + "AggregateExec: mode=Single, gby=[], aggr=[sum(?table?.column1) FILTER (WHERE ?table?.column2 <= Int64(0)) as agg]" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_aggregate_explain_shows_aliased_respect_nulls() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("column1", DataType::Int64, true), + Field::new("column2", DataType::Int64, false), + ])); + + let logical_plan = scan_empty(None, schema.as_ref(), None)? + .aggregate( + Vec::::new(), + vec![ + first_value_udaf() + .call(vec![col("column1")]) + .order_by(vec![col("column2").sort(true, true)]) + .null_treatment(NullTreatment::RespectNulls) + .build()? + .alias("agg"), + ], + )? + .build()?; + + assert_contains!( + aggregate_explain(&logical_plan).await?, + "AggregateExec: mode=Single, gby=[], aggr=[first_value(?table?.column1) RESPECT NULLS ORDER BY [?table?.column2 ASC NULLS FIRST] as agg]" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_aggregate_explain_shows_count_all() -> Result<()> { + let logical_plan = test_csv_scan() + .await? + .aggregate(Vec::::new(), vec![count_all()])? + .build()?; + + assert_contains!( + aggregate_explain(&logical_plan).await?, + "aggr=[count(1) as count(*)]" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_aggregate_explain_shows_count_all_with_user_alias() -> Result<()> { + let logical_plan = test_csv_scan() + .await? + .aggregate(Vec::::new(), vec![count_all().alias("total_rows")])? + .build()?; + + assert_contains!( + aggregate_explain(&logical_plan).await?, + "aggr=[count(1) as total_rows]" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_aggregate_explain_shows_aliased_custom_human_display() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new( + "column1", + DataType::Int64, + false, + )])); + + let logical_plan = scan_empty(None, schema.as_ref(), None)? + .aggregate( + Vec::::new(), + vec![ + AggregateUDF::from(CustomHumanDisplayUdaf::new()) + .call(vec![col("column1")]) + .alias("agg"), + ], + )? + .build()?; + + assert_contains!( + aggregate_explain(&logical_plan).await?, + "AggregateExec: mode=Single, gby=[], aggr=[custom_display(?table?.column1) as agg]" + ); + + Ok(()) + } + #[tokio::test] async fn test_explain() { let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]); diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index e0830754399db..fa4fec08af47c 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -3022,20 +3022,20 @@ async fn test_count_wildcard_on_sort() -> Result<()> { assert_snapshot!( pretty_format_batches(&df_results).unwrap(), @r" - +---------------+----------------------------------------------------------------------------+ - | plan_type | plan | - +---------------+----------------------------------------------------------------------------+ - | logical_plan | Sort: count(*) AS count(*) ASC NULLS LAST | - | | Aggregate: groupBy=[[t1.b]], aggr=[[count(Int64(1)) AS count(*)]] | - | | TableScan: t1 projection=[b] | - | physical_plan | SortPreservingMergeExec: [count(*)@1 ASC NULLS LAST] | - | | SortExec: expr=[count(*)@1 ASC NULLS LAST], preserve_partitioning=[true] | - | | AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[count(*)] | - | | RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=1 | - | | AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[count(*)] | - | | DataSourceExec: partitions=1, partition_sizes=[1] | - | | | - +---------------+----------------------------------------------------------------------------+ + +---------------+---------------------------------------------------------------------------------------+ + | plan_type | plan | + +---------------+---------------------------------------------------------------------------------------+ + | logical_plan | Sort: count(*) AS count(*) ASC NULLS LAST | + | | Aggregate: groupBy=[[t1.b]], aggr=[[count(Int64(1)) AS count(*)]] | + | | TableScan: t1 projection=[b] | + | physical_plan | SortPreservingMergeExec: [count(*)@1 ASC NULLS LAST] | + | | SortExec: expr=[count(*)@1 ASC NULLS LAST], preserve_partitioning=[true] | + | | AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[count(1) as count(*)] | + | | RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=1 | + | | AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[count(1) as count(*)] | + | | DataSourceExec: partitions=1, partition_sizes=[1] | + | | | + +---------------+---------------------------------------------------------------------------------------+ " ); Ok(()) @@ -3500,9 +3500,9 @@ async fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> { | | HashJoinExec: mode=CollectLeft, join_type=Right, on=[(a@1, a@0)], projection=[a@3, b@4, count(*)@0, __always_true@2] | | | CoalescePartitionsExec | | | ProjectionExec: expr=[count(*)@1 as count(*), a@0 as a, true as __always_true] | - | | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(*)] | + | | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(1) as count(*)] | | | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1 | - | | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(*)] | + | | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(1) as count(*)] | | | DataSourceExec: partitions=1, partition_sizes=[1] | | | DataSourceExec: partitions=1, partition_sizes=[1] | | | | diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 7dba4f0579b97..ce635a85768a4 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -561,6 +561,7 @@ pub struct Alias { pub relation: Option, pub name: String, pub metadata: Option, + pub is_internal: bool, } impl Hash for Alias { @@ -568,6 +569,8 @@ impl Hash for Alias { self.expr.hash(state); self.relation.hash(state); self.name.hash(state); + self.metadata.hash(state); + self.is_internal.hash(state); } } @@ -600,6 +603,22 @@ impl Alias { relation: relation.map(|r| r.into()), name: name.into(), metadata: None, + is_internal: false, + } + } + + #[doc(hidden)] + pub fn new_internal( + expr: Expr, + relation: Option>, + name: impl Into, + ) -> Self { + Self { + expr: Box::new(expr), + relation: relation.map(|r| r.into()), + name: name.into(), + metadata: None, + is_internal: true, } } @@ -607,6 +626,30 @@ impl Alias { self.metadata = metadata; self } + + #[doc(hidden)] + pub fn with_expr(mut self, expr: Expr) -> Self { + self.expr = Box::new(expr); + self + } + + #[doc(hidden)] + pub fn try_map_expr(self, f: impl FnOnce(Expr) -> Result) -> Result { + let Alias { + expr, + relation, + name, + metadata, + is_internal, + } = self; + Ok(Expr::Alias(Alias { + expr: Box::new(f(*expr)?), + relation, + name, + metadata, + is_internal, + })) + } } /// Binary expression for [`Expr::BinaryExpr`] @@ -1722,6 +1765,11 @@ impl Expr { Expr::Alias(Alias::new(self, None::<&str>, name.into())) } + #[doc(hidden)] + pub fn alias_internal(self, name: impl Into) -> Expr { + Expr::Alias(Alias::new_internal(self, None::<&str>, name.into())) + } + /// Return `self AS name` alias expression with metadata /// /// The metadata will be attached to the Arrow Schema field when the expression @@ -1753,6 +1801,15 @@ impl Expr { Expr::Alias(Alias::new(self, relation, name.into())) } + #[doc(hidden)] + pub fn alias_qualified_internal( + self, + relation: Option>, + name: impl Into, + ) -> Expr { + Expr::Alias(Alias::new_internal(self, relation, name.into())) + } + /// Return `self AS name` alias expression with a specific qualifier and metadata /// /// The metadata will be attached to the Arrow Schema field when the expression @@ -1777,6 +1834,11 @@ impl Expr { Expr::Alias(Alias::new(self, relation, name.into()).with_metadata(metadata)) } + #[doc(hidden)] + pub fn alias_with_existing(self, alias: &Alias) -> Expr { + Expr::Alias(alias.clone().with_expr(self)) + } + /// Remove an alias from an expression if one exists. /// /// If the expression is not an alias, the expression is returned unchanged. @@ -1846,7 +1908,7 @@ impl Expr { match alias .metadata .as_ref() - .map(|h| h.is_empty()) + .map(|metadata| metadata.is_empty()) .unwrap_or(true) { true => Ok(Transformed::yes(*alias.expr)), @@ -2261,17 +2323,21 @@ impl NormalizeEq for Expr { expr: self_expr, relation: self_relation, name: self_name, - .. + metadata: self_metadata, + is_internal: self_is_internal, }), Expr::Alias(Alias { expr: other_expr, relation: other_relation, name: other_name, - .. + metadata: other_metadata, + is_internal: other_is_internal, }), ) => { self_name == other_name && self_relation == other_relation + && self_metadata == other_metadata + && self_is_internal == other_is_internal && self_expr.normalize_eq(other_expr) } ( @@ -2617,10 +2683,13 @@ impl HashNode for Expr { expr: _expr, relation, name, - .. + metadata, + is_internal, }) => { relation.hash(state); name.hash(state); + metadata.hash(state); + is_internal.hash(state); } Expr::Column(column) => { column.hash(state); @@ -4048,6 +4117,65 @@ mod test { ); } + #[test] + fn test_internal_alias_changes_equality() { + let user_alias = col("id").alias("count(*)"); + let internal_alias = col("id").alias_internal("count(*)"); + + assert_ne!(user_alias, internal_alias); + assert_ne!(internal_alias, user_alias); + } + + #[test] + fn test_internal_alias_changes_normalized_identity() { + let user_alias = col("id").alias("count(*)"); + let internal_alias = col("id").alias_internal("count(*)"); + + assert!(!user_alias.normalize_eq(&internal_alias)); + assert!(!internal_alias.normalize_eq(&user_alias)); + } + + #[test] + fn test_unalias_nested_respects_user_metadata() { + use std::collections::HashMap; + + let base_expr = col("id"); + + let no_metadata = base_expr.clone().alias("alias"); + assert_eq!(no_metadata.unalias_nested().data, base_expr); + + let Expr::Alias(empty_metadata_alias) = base_expr.clone().alias("alias") else { + unreachable!(); + }; + let empty_metadata_alias = Expr::Alias( + empty_metadata_alias.with_metadata(Some(FieldMetadata::default())), + ); + assert_eq!(empty_metadata_alias.unalias_nested().data, base_expr); + + let internal_alias = base_expr.clone().alias_internal("alias"); + assert_eq!(internal_alias.unalias_nested().data, base_expr); + + let user_metadata = FieldMetadata::from(HashMap::from([( + "some_key".to_string(), + "some_value".to_string(), + )])); + + let Expr::Alias(user_alias) = base_expr.clone().alias("alias") else { + unreachable!(); + }; + let user_alias = + Expr::Alias(user_alias.with_metadata(Some(user_metadata.clone()))); + assert_eq!(user_alias.clone().unalias_nested().data, user_alias); + + let Expr::Alias(internal_alias) = base_expr.clone().alias_internal("alias") + else { + unreachable!(); + }; + let internal_alias = + Expr::Alias(internal_alias.with_metadata(Some(user_metadata.clone()))); + assert_eq!(internal_alias.clone().unalias_nested().data, internal_alias); + } + fn wildcard_options( opt_ilike: Option, opt_exclude: Option, diff --git a/datafusion/expr/src/expr_rewriter/mod.rs b/datafusion/expr/src/expr_rewriter/mod.rs index 32a88ab8cf310..14a9b284d133e 100644 --- a/datafusion/expr/src/expr_rewriter/mod.rs +++ b/datafusion/expr/src/expr_rewriter/mod.rs @@ -255,8 +255,8 @@ fn coerce_exprs_for_schema( let new_type = dst_schema.field(idx).data_type(); if new_type != &expr.get_type(src_schema)? { match expr { - Expr::Alias(Alias { expr, name, .. }) => { - Ok(expr.cast_to(new_type, src_schema)?.alias(name)) + Expr::Alias(alias) => { + alias.try_map_expr(|expr| expr.cast_to(new_type, src_schema)) } #[expect(deprecated)] Expr::Wildcard { .. } => Ok(expr), @@ -309,6 +309,7 @@ pub enum SavedName { Saved { relation: Option, name: String, + is_internal_generated: bool, }, /// Name is not preserved None, @@ -340,8 +341,23 @@ impl NamePreserver { pub fn save(&self, expr: &Expr) -> SavedName { if self.use_alias { - let (relation, name) = expr.qualified_name(); - SavedName::Saved { relation, name } + match expr { + Expr::Alias(alias) => SavedName::Saved { + relation: alias.relation.clone(), + name: alias.name.clone(), + is_internal_generated: alias.is_internal, + }, + _ => { + // Rewrites preserve this output name for schema stability, but the + // alias itself is planner-generated rather than user-written. + let (relation, name) = expr.qualified_name(); + SavedName::Saved { + relation, + name, + is_internal_generated: true, + } + } + } } else { SavedName::None } @@ -352,10 +368,18 @@ impl SavedName { /// Ensures the qualified name of the rewritten expression is preserved pub fn restore(self, expr: Expr) -> Expr { match self { - SavedName::Saved { relation, name } => { + SavedName::Saved { + relation, + name, + is_internal_generated, + } => { let (new_relation, new_name) = expr.qualified_name(); if new_relation != relation || new_name != name { - expr.alias_qualified(relation, name) + if is_internal_generated { + expr.alias_qualified_internal(relation, name) + } else { + expr.alias_qualified(relation, name) + } } else { expr } @@ -543,6 +567,20 @@ mod test { ); } + #[test] + fn test_name_preserver_marks_non_alias_restore_as_internal() { + let saved_name = NamePreserver::new_for_projection().save(&col("a")); + let restored = saved_name.restore(col("b")); + + let Expr::Alias(alias) = restored else { + panic!("expected alias"); + }; + + assert!(alias.is_internal); + assert_eq!(alias.name, "a"); + assert_eq!(*alias.expr, col("b")); + } + /// rewrites `expr_from` to `rewrite_to` while preserving the original qualified name /// by using the `NamePreserver` fn test_rewrite(expr_from: Expr, rewrite_to: Expr) { diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index b613db4b6ad1d..43e683413cf5f 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -454,14 +454,10 @@ impl ExprSchemable for Expr { let (relation, schema_name) = self.qualified_name(); #[expect(deprecated)] let field = match self { - Expr::Alias(Alias { - expr, - name: _, - metadata, - .. - }) => { + Expr::Alias(alias) => { + let expr = &alias.expr; let mut combined_metadata = expr.metadata(schema)?; - if let Some(metadata) = metadata { + if let Some(metadata) = &alias.metadata { combined_metadata.extend(metadata.clone()); } @@ -1006,6 +1002,27 @@ mod tests { assert_eq!(meta, outer_ref.metadata(&schema).unwrap()); } + #[test] + fn test_internal_alias_metadata_is_preserved_in_field_metadata() { + let schema = MockExprSchema::new().with_data_type(DataType::Int32); + let alias_metadata = FieldMetadata::from(HashMap::from([( + "some_key".to_string(), + "some_value".to_string(), + )])); + + let Expr::Alias(alias) = col("foo").alias_internal("alias") else { + unreachable!(); + }; + let expr = Expr::Alias(alias.with_metadata(Some(alias_metadata.clone()))); + + let field = expr.to_field(&schema).unwrap().1; + assert_eq!( + field.metadata().get("some_key"), + Some(&"some_value".to_string()) + ); + assert_eq!(expr.metadata(&schema).unwrap(), alias_metadata); + } + #[test] fn test_expr_placeholder() { let schema = MockExprSchema::new(); diff --git a/datafusion/expr/src/tree_node.rs b/datafusion/expr/src/tree_node.rs index f43b138a284ea..fb2b04ff0b38e 100644 --- a/datafusion/expr/src/tree_node.rs +++ b/datafusion/expr/src/tree_node.rs @@ -150,12 +150,14 @@ impl TreeNode for Expr { relation, name, metadata, + is_internal, }) => expr.map_elements(f)?.update_data(|expr| { Expr::Alias(Alias { expr, relation, name, metadata, + is_internal, }) }), Expr::InSubquery(InSubquery { diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index eab36d4951a9c..343a0a486d9b2 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -102,7 +102,7 @@ pub fn count_distinct(expr: Expr) -> Expr { /// let expr = col(expr.schema_name().to_string()); /// ``` pub fn count_all() -> Expr { - count(Expr::Literal(COUNT_STAR_EXPANSION, None)).alias("count(*)") + count(Expr::Literal(COUNT_STAR_EXPANSION, None)).alias_internal("count(*)") } /// Creates window aggregation to count all rows. diff --git a/datafusion/optimizer/src/analyzer/resolve_grouping_function.rs b/datafusion/optimizer/src/analyzer/resolve_grouping_function.rs index 95649ab8286b7..eed88cf7f5e4d 100644 --- a/datafusion/optimizer/src/analyzer/resolve_grouping_function.rs +++ b/datafusion/optimizer/src/analyzer/resolve_grouping_function.rs @@ -111,7 +111,7 @@ fn replace_grouping_exprs( &group_expr_to_bitmap_index, grouping_id_type, )?; - projection_exprs.push(Expr::Alias(Alias::new( + projection_exprs.push(Expr::Alias(Alias::new_internal( grouping_expr, column.relation, column.name, diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index bb17d13f5c769..45ecdcbb44246 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -1241,19 +1241,21 @@ fn project_with_column_index( .enumerate() .map(|(i, e)| match e { Expr::Alias(Alias { ref name, .. }) if name != schema.field(i).name() => { - Ok(e.unalias().alias(schema.field(i).name())) + Ok(e.unalias().alias_internal(schema.field(i).name())) } Expr::Column(Column { relation: _, ref name, spans: _, - }) if name != schema.field(i).name() => Ok(e.alias(schema.field(i).name())), + }) if name != schema.field(i).name() => { + Ok(e.alias_internal(schema.field(i).name())) + } Expr::Alias { .. } | Expr::Column { .. } => Ok(e), #[expect(deprecated)] Expr::Wildcard { .. } => { plan_err!("Wildcard should be expanded before type coercion") } - _ => Ok(e.alias(schema.field(i).name())), + _ => Ok(e.alias_internal(schema.field(i).name())), }) .collect::>>()?; diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 4213c23ccc897..a590a8c6b56e9 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -30,7 +30,7 @@ use datafusion_common::alias::AliasGenerator; use datafusion_common::cse::{CSE, CSEController, FoundCommonNodes}; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{Column, DFSchema, DFSchemaRef, Result, qualified_name}; -use datafusion_expr::expr::{Alias, ScalarFunction}; +use datafusion_expr::expr::ScalarFunction; use datafusion_expr::logical_plan::{ Aggregate, Filter, LogicalPlan, Projection, Sort, Window, }; @@ -330,7 +330,7 @@ impl CommonSubexprEliminate { let mut agg_exprs = common_exprs .into_iter() - .map(|(expr, expr_alias)| expr.alias(expr_alias)) + .map(|(expr, expr_alias)| expr.alias_internal(expr_alias)) .collect::>(); let mut proj_exprs = vec![]; @@ -350,10 +350,9 @@ impl CommonSubexprEliminate { } else { expr_rewritten }; - if let Expr::Alias(Alias { expr, name, .. }) = - expr_rewritten - { - agg_exprs.push(expr.alias(&name)); + if let Expr::Alias(alias) = expr_rewritten { + let name = alias.name.clone(); + agg_exprs.push(Expr::Alias(alias)); proj_exprs .push(Expr::Column(Column::from_name(name))); } else { @@ -364,10 +363,11 @@ impl CommonSubexprEliminate { let out_name = qualified_name(qualifier.as_ref(), &field_name); - agg_exprs.push(expr_rewritten.alias(&expr_alias)); + agg_exprs + .push(expr_rewritten.alias_internal(&expr_alias)); proj_exprs.push( Expr::Column(Column::from_name(expr_alias)) - .alias(out_name), + .alias_internal(out_name), ); } } else { @@ -742,7 +742,7 @@ impl CSEController for ExprCSEController<'_> { col(alias) } else { self.alias_counter += 1; - col(alias).alias(node.schema_name().to_string()) + col(alias).alias_internal(node.schema_name().to_string()) } } @@ -783,7 +783,7 @@ fn build_common_expr_project_plan( .into_iter() .map(|(expr, expr_alias)| { fields_set.insert(expr_alias.clone()); - Ok(expr.alias(expr_alias)) + Ok(expr.alias_internal(expr_alias)) }) .collect::>>()?; diff --git a/datafusion/optimizer/src/decorrelate_lateral_join.rs b/datafusion/optimizer/src/decorrelate_lateral_join.rs index ea25ab479f070..929d90f1a1ee4 100644 --- a/datafusion/optimizer/src/decorrelate_lateral_join.rs +++ b/datafusion/optimizer/src/decorrelate_lateral_join.rs @@ -260,7 +260,13 @@ fn rewrite_internal(join: Join) -> Result> { )], else_expr: Some(Box::new(col)), }); - proj_exprs.push(case_expr.alias_qualified(qualifier.cloned(), name)); + proj_exprs.push(Expr::Alias(expr::Alias { + expr: Box::new(case_expr), + relation: qualifier.cloned(), + name: name.to_string(), + metadata: None, + is_internal: false, + })); continue; } proj_exprs.push(col); diff --git a/datafusion/optimizer/src/extract_leaf_expressions.rs b/datafusion/optimizer/src/extract_leaf_expressions.rs index d3a84c0172e7b..249be55bb1a2e 100644 --- a/datafusion/optimizer/src/extract_leaf_expressions.rs +++ b/datafusion/optimizer/src/extract_leaf_expressions.rs @@ -614,7 +614,8 @@ fn build_extraction_projection_impl( // Add new extracted expressions, resolving column refs through the projection for (expr, alias) in extracted_exprs { - let resolved = replace_cols_by_name(expr.clone().alias(alias), &replace_map)?; + let resolved = + replace_cols_by_name(expr.clone().alias_internal(alias), &replace_map)?; let resolved_inner = if let Expr::Alias(a) = &resolved { a.expr.as_ref() } else { @@ -669,7 +670,7 @@ fn build_extraction_projection_impl( // Build new projection with extracted expressions + all input columns let mut proj_exprs = Vec::new(); for (expr, alias) in extracted_exprs { - proj_exprs.push(expr.clone().alias(alias)); + proj_exprs.push(expr.clone().alias_internal(alias)); } for (qualifier, field) in target_schema.iter() { proj_exprs.push(Expr::from((qualifier, field))); diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 14badcf1435d5..2ce0b8e4de91e 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -575,6 +575,7 @@ fn merge_consecutive_projections(proj: Projection) -> Result rewrite_expr(*expr, &prev_projection).map(|result| { result.update_data(|expr| { // After substitution, the inner expression may now have the @@ -583,12 +584,19 @@ fn merge_consecutive_projections(proj: Projection) -> Result>, alias: Option, /// A human readable name - human_display: String, + human_display: Option, /// Arrow Schema for the aggregate function schema: SchemaRef, /// The physical order by expressions @@ -85,7 +85,7 @@ impl AggregateExprBuilder { fun, args, alias: None, - human_display: String::default(), + human_display: None, schema: Arc::new(Schema::empty()), order_bys: vec![], ignore_nulls: false, @@ -255,8 +255,9 @@ impl AggregateExprBuilder { self } - pub fn human_display(mut self, name: String) -> Self { - self.human_display = name; + pub fn human_display(mut self, name: impl Into) -> Self { + let name = name.into(); + self.human_display = (!name.is_empty()).then_some(name); self } @@ -315,7 +316,7 @@ pub struct AggregateFunctionExpr { /// Output column name that this expression creates name: String, /// Simplified name for `tree` explain. - human_display: String, + human_display: Option, schema: Schema, // The physical order by expressions order_bys: Vec, @@ -347,8 +348,8 @@ impl AggregateFunctionExpr { } /// Simplified name for `tree` explain. - pub fn human_display(&self) -> &str { - &self.human_display + pub fn human_display(&self) -> Option<&str> { + self.human_display.as_deref() } /// Return if the aggregation is distinct @@ -456,15 +457,18 @@ impl AggregateFunctionExpr { return Ok(None); }; - AggregateExprBuilder::new(Arc::new(updated_fn), self.args.to_vec()) - .order_by(self.order_bys.clone()) - .schema(Arc::new(self.schema.clone())) - .alias(self.name().to_string()) - .with_ignore_nulls(self.ignore_nulls) - .with_distinct(self.is_distinct) - .with_reversed(self.is_reversed) - .build() - .map(Some) + let mut builder = + AggregateExprBuilder::new(Arc::new(updated_fn), self.args.to_vec()) + .order_by(self.order_bys.clone()) + .schema(Arc::new(self.schema.clone())) + .alias(self.name().to_string()) + .with_ignore_nulls(self.ignore_nulls) + .with_distinct(self.is_distinct) + .with_reversed(self.is_reversed); + if let Some(human_display) = self.human_display() { + builder = builder.human_display(human_display); + } + builder.build().map(Some) } /// Creates accumulator implementation that supports retract @@ -582,23 +586,59 @@ impl AggregateFunctionExpr { ReversedUDAF::NotSupported => None, ReversedUDAF::Identical => Some(self.clone()), ReversedUDAF::Reversed(reverse_udf) => { + let aliased_human_display = self + .human_display() + .and_then(|human_display| { + strip_alias_suffix(human_display, self.name()) + }) + .map(str::to_string); + let was_aliased = aliased_human_display.is_some(); let mut name = self.name().to_string(); + let mut human_display = self.human_display().map(str::to_string); // If the function is changed, we need to reverse order_by clause as well // i.e. First(a order by b asc null first) -> Last(a order by b desc null last) - if self.fun().name() != reverse_udf.name() { + if !was_aliased && self.fun().name() != reverse_udf.name() { replace_order_by_clause(&mut name); } - replace_fn_name_clause(&mut name, self.fun.name(), reverse_udf.name()); - - AggregateExprBuilder::new(reverse_udf, self.args.to_vec()) - .order_by(self.order_bys.iter().map(|e| e.reverse()).collect()) - .schema(Arc::new(self.schema.clone())) - .alias(name) - .with_ignore_nulls(self.ignore_nulls) - .with_distinct(self.is_distinct) - .with_reversed(!self.is_reversed) - .build() - .ok() + if !was_aliased { + replace_fn_name_clause( + &mut name, + self.fun.name(), + reverse_udf.name(), + ); + } + + if let Some(human_display) = human_display.as_mut() { + if let Some(expr_display) = aliased_human_display { + *human_display = expr_display; + } + + if self.fun().name() != reverse_udf.name() { + replace_order_by_clause(human_display); + } + replace_fn_name_clause( + human_display, + self.fun.name(), + reverse_udf.name(), + ); + + if was_aliased { + *human_display = format!("{human_display} as {}", self.name()); + } + } + + let mut builder = + AggregateExprBuilder::new(reverse_udf, self.args.to_vec()) + .order_by(self.order_bys.iter().map(|e| e.reverse()).collect()) + .schema(Arc::new(self.schema.clone())) + .alias(name) + .with_ignore_nulls(self.ignore_nulls) + .with_distinct(self.is_distinct) + .with_reversed(!self.is_reversed); + if let Some(human_display) = human_display { + builder = builder.human_display(human_display); + } + builder.build().ok() } } } @@ -753,5 +793,12 @@ fn replace_order_by_clause(order_by: &mut String) { } fn replace_fn_name_clause(aggr_name: &mut String, fn_name_old: &str, fn_name_new: &str) { - *aggr_name = aggr_name.replace(fn_name_old, fn_name_new); + if let Some(rest) = aggr_name.strip_prefix(fn_name_old) { + *aggr_name = format!("{fn_name_new}{rest}"); + } +} + +fn strip_alias_suffix<'a>(human_display: &'a str, alias: &str) -> Option<&'a str> { + let alias_suffix = format!(" as {alias}"); + human_display.strip_suffix(&alias_suffix) } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 76ecb3f1485a4..51c6110bd6912 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1374,7 +1374,7 @@ impl DisplayAs for AggregateExec { let a: Vec = self .aggr_expr .iter() - .map(|agg| agg.name().to_string()) + .map(|agg| format_aggregate_exec_expr(agg).to_string()) .collect(); write!(f, ", aggr=[{}]", a.join(", "))?; if let Some(config) = self.limit_options { @@ -1428,7 +1428,7 @@ impl DisplayAs for AggregateExec { let a: Vec = self .aggr_expr .iter() - .map(|agg| agg.human_display().to_string()) + .map(|agg| format_tree_aggregate_expr(agg).to_string()) .collect(); writeln!(f, "mode={:?}", self.mode)?; if !g.is_empty() { @@ -1446,6 +1446,24 @@ impl DisplayAs for AggregateExec { } } +fn format_aggregate_exec_expr(agg: &AggregateFunctionExpr) -> &str { + if has_aliased_human_display(agg) { + agg.human_display().unwrap_or_else(|| agg.name()) + } else { + agg.name() + } +} + +fn format_tree_aggregate_expr(agg: &AggregateFunctionExpr) -> &str { + agg.human_display().unwrap_or_else(|| agg.name()) +} + +fn has_aliased_human_display(agg: &AggregateFunctionExpr) -> bool { + agg.human_display() + .and_then(|human_display| human_display.strip_suffix(agg.name())) + .is_some_and(|prefix| prefix.ends_with(" as ")) +} + impl ExecutionPlan for AggregateExec { fn name(&self) -> &'static str { "AggregateExec" @@ -3022,6 +3040,87 @@ mod tests { .map(Arc::new) } + #[test] + fn test_reverse_expr_preserves_aliased_human_display() -> Result<()> { + let schema = create_test_schema()?; + let agg = AggregateExprBuilder::new(first_value_udaf(), vec![col("b", &schema)?]) + .order_by(vec![PhysicalSortExpr { + expr: col("b", &schema)?, + options: SortOptions::new(false, false), + }]) + .schema(Arc::clone(&schema)) + .alias("agg") + .human_display( + "first_value(b) ORDER BY [b ASC NULLS LAST] as agg".to_string(), + ) + .build()?; + + let reversed = agg.reverse_expr().expect("expected reverse expr"); + + assert_eq!(reversed.name(), "agg"); + assert_eq!( + reversed.human_display(), + Some("last_value(b) ORDER BY [b DESC NULLS FIRST] as agg") + ); + + Ok(()) + } + + #[test] + fn test_reverse_expr_does_not_rewrite_column_names_in_human_display() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new( + "first_value_col", + DataType::Int32, + true, + )])); + let agg = AggregateExprBuilder::new( + first_value_udaf(), + vec![col("first_value_col", &schema)?], + ) + .order_by(vec![PhysicalSortExpr { + expr: col("first_value_col", &schema)?, + options: SortOptions::new(false, false), + }]) + .schema(Arc::clone(&schema)) + .alias("agg") + .human_display( + "first_value(first_value_col) ORDER BY [first_value_col ASC NULLS LAST] as agg" + .to_string(), + ) + .build()?; + + let reversed = agg.reverse_expr().expect("expected reverse expr"); + + assert_eq!(reversed.name(), "agg"); + assert_eq!( + reversed.human_display(), + Some( + "last_value(first_value_col) ORDER BY [first_value_col DESC NULLS FIRST] as agg" + ) + ); + + Ok(()) + } + + #[test] + fn test_empty_human_display_is_treated_as_absent() -> Result<()> { + let schema = create_test_schema()?; + let agg = AggregateExprBuilder::new(first_value_udaf(), vec![col("b", &schema)?]) + .order_by(vec![PhysicalSortExpr { + expr: col("b", &schema)?, + options: SortOptions::new(false, false), + }]) + .schema(Arc::clone(&schema)) + .alias("agg") + .human_display("") + .build()?; + + assert_eq!(agg.human_display(), None); + assert_eq!(format_tree_aggregate_expr(&agg), "agg"); + + Ok(()) + } + // This function constructs the physical plan below, // // "AggregateExec: mode=Final, gby=[a@0 as a], aggr=[FIRST_VALUE(b)]", diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 056a7f171a516..d593c46fa83b8 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -561,7 +561,7 @@ impl GroupedHashAggregateStream { let agg_fn_names = aggregate_exprs .iter() - .map(|expr| expr.human_display()) + .map(|expr| expr.human_display().unwrap_or_else(|| expr.name())) .collect::>() .join(", "); let name = format!("GroupedHashAggregateStream[{partition}] ({agg_fn_names})"); diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index c61226fb526f6..b08f1b9749170 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -513,6 +513,7 @@ message AliasNode { string alias = 2; repeated TableReference relation = 3; map metadata = 4; + bool is_internal = 5; } message BinaryExprNode { @@ -1477,4 +1478,4 @@ message AsyncFuncExecNode { message BufferExecNode { PhysicalPlanNode input = 1; uint64 capacity = 2; -} \ No newline at end of file +} diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 82bcdac898204..bf8b38db7c9d4 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -831,6 +831,9 @@ impl serde::Serialize for AliasNode { if !self.metadata.is_empty() { len += 1; } + if self.is_internal { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.AliasNode", len)?; if let Some(v) = self.expr.as_ref() { struct_ser.serialize_field("expr", v)?; @@ -844,6 +847,9 @@ impl serde::Serialize for AliasNode { if !self.metadata.is_empty() { struct_ser.serialize_field("metadata", &self.metadata)?; } + if self.is_internal { + struct_ser.serialize_field("isInternal", &self.is_internal)?; + } struct_ser.end() } } @@ -858,6 +864,8 @@ impl<'de> serde::Deserialize<'de> for AliasNode { "alias", "relation", "metadata", + "is_internal", + "isInternal", ]; #[allow(clippy::enum_variant_names)] @@ -866,6 +874,7 @@ impl<'de> serde::Deserialize<'de> for AliasNode { Alias, Relation, Metadata, + IsInternal, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -891,6 +900,7 @@ impl<'de> serde::Deserialize<'de> for AliasNode { "alias" => Ok(GeneratedField::Alias), "relation" => Ok(GeneratedField::Relation), "metadata" => Ok(GeneratedField::Metadata), + "isInternal" | "is_internal" => Ok(GeneratedField::IsInternal), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -914,6 +924,7 @@ impl<'de> serde::Deserialize<'de> for AliasNode { let mut alias__ = None; let mut relation__ = None; let mut metadata__ = None; + let mut is_internal__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Expr => { @@ -942,6 +953,12 @@ impl<'de> serde::Deserialize<'de> for AliasNode { map_.next_value::>()? ); } + GeneratedField::IsInternal => { + if is_internal__.is_some() { + return Err(serde::de::Error::duplicate_field("isInternal")); + } + is_internal__ = Some(map_.next_value()?); + } } } Ok(AliasNode { @@ -949,6 +966,7 @@ impl<'de> serde::Deserialize<'de> for AliasNode { alias: alias__.unwrap_or_default(), relation: relation__.unwrap_or_default(), metadata: metadata__.unwrap_or_default(), + is_internal: is_internal__.unwrap_or_default(), }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index ab60c3058dbde..6e6015c74aeb9 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -776,6 +776,8 @@ pub struct AliasNode { ::prost::alloc::string::String, ::prost::alloc::string::String, >, + #[prost(bool, tag = "5")] + pub is_internal: bool, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct BinaryExprNode { diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index ed33d9fab1820..3636a673f27d3 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -365,15 +365,25 @@ pub fn parse_expr( builder.build().map_err(Error::DataFusionError) } - ExprType::Alias(alias) => Ok(Expr::Alias(Alias::new( - parse_required_expr(alias.expr.as_deref(), registry, "expr", codec)?, - alias - .relation - .first() - .map(|r| TableReference::try_from(r.clone())) - .transpose()?, - alias.alias.clone(), - ))), + ExprType::Alias(alias) => { + let metadata = + (!alias.metadata.is_empty()).then(|| alias.metadata.clone().into()); + let is_internal = alias.is_internal; + let expr_alias = Alias::new( + parse_required_expr(alias.expr.as_deref(), registry, "expr", codec)?, + alias + .relation + .first() + .map(|r| TableReference::try_from(r.clone())) + .transpose()?, + alias.alias.clone(), + ) + .with_metadata(metadata); + Ok(Expr::Alias(Alias { + is_internal, + ..expr_alias + })) + } ExprType::IsNullExpr(is_null) => Ok(Expr::IsNull(Box::new(parse_required_expr( is_null.expr.as_deref(), registry, diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 6fcb7389922ad..3a3a12d44d706 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -25,8 +25,8 @@ use datafusion_common::{NullEquality, TableReference, UnnestOptions}; use datafusion_expr::WriteOp; use datafusion_expr::dml::InsertOp; use datafusion_expr::expr::{ - self, AggregateFunctionParams, Alias, Between, BinaryExpr, Cast, GroupingSet, InList, - Like, NullTreatment, Placeholder, ScalarFunction, Unnest, + self, AggregateFunctionParams, Between, BinaryExpr, Cast, GroupingSet, InList, Like, + NullTreatment, Placeholder, ScalarFunction, Unnest, }; use datafusion_expr::{ Expr, JoinConstraint, JoinType, SortExpr, TryCast, WindowFrame, WindowFrameBound, @@ -197,23 +197,21 @@ pub fn serialize_expr( Expr::Column(c) => protobuf::LogicalExprNode { expr_type: Some(ExprType::Column(c.into())), }, - Expr::Alias(Alias { - expr, - relation, - name, - metadata, - }) => { + Expr::Alias(alias) => { let alias = Box::new(protobuf::AliasNode { - expr: Some(Box::new(serialize_expr(expr.as_ref(), codec)?)), - relation: relation + expr: Some(Box::new(serialize_expr(alias.expr.as_ref(), codec)?)), + relation: alias + .relation .to_owned() .map(|r| vec![r.into()]) .unwrap_or(vec![]), - alias: name.to_owned(), - metadata: metadata + alias: alias.name.to_owned(), + metadata: alias + .metadata .as_ref() .map(|m| m.to_hashmap()) .unwrap_or(HashMap::new()), + is_internal: alias.is_internal, }); protobuf::LogicalExprNode { expr_type: Some(ExprType::Alias(alias)), diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 206f4378d3d3b..004c6ea4a9224 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -1255,15 +1255,17 @@ impl protobuf::PhysicalPlanNode { })?, }; - AggregateExprBuilder::new(agg_udf, input_phy_expr) - .schema(Arc::clone(&physical_schema)) - .alias(name) - .human_display(agg_node.human_display.clone()) - .with_ignore_nulls(agg_node.ignore_nulls) - .with_distinct(agg_node.distinct) - .order_by(order_bys) - .build() - .map(Arc::new) + let builder = AggregateExprBuilder::new( + agg_udf, + input_phy_expr, + ) + .schema(Arc::clone(&physical_schema)) + .alias(name) + .with_ignore_nulls(agg_node.ignore_nulls) + .with_distinct(agg_node.distinct) + .order_by(order_bys) + .human_display(agg_node.human_display.clone()); + builder.build().map(Arc::new) } }) .transpose()? diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index f0eb6d27aac30..4262cb90c0f43 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -80,7 +80,10 @@ pub fn serialize_physical_aggr_expr( distinct: aggr_expr.is_distinct(), ignore_nulls: aggr_expr.ignore_nulls(), fun_definition: (!buf.is_empty()).then_some(buf), - human_display: aggr_expr.human_display().to_string(), + human_display: aggr_expr + .human_display() + .unwrap_or_default() + .to_string(), }, )), }) diff --git a/datafusion/proto/tests/cases/serialize.rs b/datafusion/proto/tests/cases/serialize.rs index bb955a426ca78..19a0b45a619e1 100644 --- a/datafusion/proto/tests/cases/serialize.rs +++ b/datafusion/proto/tests/cases/serialize.rs @@ -22,6 +22,7 @@ use arrow::datatypes::{DataType, Field}; use datafusion::execution::FunctionRegistry; use datafusion::prelude::SessionContext; +use datafusion_common::metadata::FieldMetadata; use datafusion_expr::expr::Placeholder; use datafusion_expr::{ColumnarValue, col, create_udf, lit}; use datafusion_expr::{Expr, Volatility}; @@ -29,6 +30,7 @@ use datafusion_functions::string; use datafusion_proto::bytes::Serializeable; use datafusion_proto::logical_plan::DefaultLogicalExtensionCodec; use datafusion_proto::logical_plan::to_proto::serialize_expr; +use datafusion_proto::protobuf::logical_expr_node::ExprType; #[test] #[should_panic( @@ -137,6 +139,53 @@ fn roundtrip_qualified_alias() { assert_eq!(qual_alias, roundtrip_expr(&qual_alias)); } +#[test] +fn roundtrip_alias_with_metadata() { + let expr = col("c1").alias_with_metadata( + "my_column", + Some(FieldMetadata::from(std::collections::HashMap::from([( + "some_key".to_string(), + "some_value".to_string(), + )]))), + ); + assert_eq!(expr, roundtrip_expr(&expr)); +} + +#[test] +fn internal_alias_roundtrip_preserves_metadata_and_internal_flag() { + let Expr::Alias(alias) = col("c1").alias_internal("my_column") else { + unreachable!(); + }; + let expr = Expr::Alias(alias.with_metadata(Some(FieldMetadata::from( + std::collections::HashMap::from([( + "some_key".to_string(), + "some_value".to_string(), + )]), + )))); + + let extension_codec = DefaultLogicalExtensionCodec {}; + let proto = serialize_expr(&expr, &extension_codec).unwrap(); + let Some(ExprType::Alias(alias)) = proto.expr_type else { + unreachable!(); + }; + + assert_eq!( + alias.metadata.get("some_key"), + Some(&"some_value".to_string()) + ); + assert!(alias.is_internal); + + let decoded = roundtrip_expr(&expr); + let Expr::Alias(alias) = decoded else { + unreachable!(); + }; + assert!(alias.is_internal); + assert_eq!( + alias.metadata.unwrap().to_hashmap().get("some_key"), + Some(&"some_value".to_string()) + ); +} + #[test] fn roundtrip_placeholder_with_metadata() { let expr = Expr::Placeholder(Placeholder::new_with_field( diff --git a/datafusion/sql/src/unparser/rewrite.rs b/datafusion/sql/src/unparser/rewrite.rs index 723cdcd687f77..41f6775674461 100644 --- a/datafusion/sql/src/unparser/rewrite.rs +++ b/datafusion/sql/src/unparser/rewrite.rs @@ -440,6 +440,7 @@ pub(super) fn inject_column_aliases( relation, name: col_alias.value, metadata: None, + is_internal: false, }) }) .collect::>(); diff --git a/datafusion/sqllogictest/test_files/agg_func_substitute.slt b/datafusion/sqllogictest/test_files/agg_func_substitute.slt index e0199c82501e4..dd89e83492f6b 100644 --- a/datafusion/sqllogictest/test_files/agg_func_substitute.slt +++ b/datafusion/sqllogictest/test_files/agg_func_substitute.slt @@ -79,9 +79,9 @@ logical_plan 03)----TableScan: multiple_ordered_table projection=[a, c] physical_plan 01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result] -02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted +02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c, 101) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] as nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted 03)----RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST -04)------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted +04)------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c, 101) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] as nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true 06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 00ca0482a31e1..60f68745c1bad 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -3903,9 +3903,9 @@ logical_plan 01)Aggregate: groupBy=[[]], aggr=[[min(CAST(aggregate_test_100.c2 AS Float64)) AS percentile_cont(Float64(0)) WITHIN GROUP [aggregate_test_100.c2 ASC NULLS LAST]]] 02)--TableScan: aggregate_test_100 projection=[c2] physical_plan -01)AggregateExec: mode=Final, gby=[], aggr=[percentile_cont(Float64(0)) WITHIN GROUP [aggregate_test_100.c2 ASC NULLS LAST]] +01)AggregateExec: mode=Final, gby=[], aggr=[min(aggregate_test_100.c2) as percentile_cont(Float64(0)) WITHIN GROUP [aggregate_test_100.c2 ASC NULLS LAST]] 02)--CoalescePartitionsExec -03)----AggregateExec: mode=Partial, gby=[], aggr=[percentile_cont(Float64(0)) WITHIN GROUP [aggregate_test_100.c2 ASC NULLS LAST]] +03)----AggregateExec: mode=Partial, gby=[], aggr=[min(aggregate_test_100.c2) as percentile_cont(Float64(0)) WITHIN GROUP [aggregate_test_100.c2 ASC NULLS LAST]] 04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c2], file_type=csv, has_header=true @@ -3916,9 +3916,9 @@ logical_plan 01)Aggregate: groupBy=[[]], aggr=[[max(CAST(aggregate_test_100.c2 AS Float64)) AS percentile_cont(Float64(0)) WITHIN GROUP [aggregate_test_100.c2 DESC NULLS FIRST]]] 02)--TableScan: aggregate_test_100 projection=[c2] physical_plan -01)AggregateExec: mode=Final, gby=[], aggr=[percentile_cont(Float64(0)) WITHIN GROUP [aggregate_test_100.c2 DESC NULLS FIRST]] +01)AggregateExec: mode=Final, gby=[], aggr=[max(aggregate_test_100.c2) as percentile_cont(Float64(0)) WITHIN GROUP [aggregate_test_100.c2 DESC NULLS FIRST]] 02)--CoalescePartitionsExec -03)----AggregateExec: mode=Partial, gby=[], aggr=[percentile_cont(Float64(0)) WITHIN GROUP [aggregate_test_100.c2 DESC NULLS FIRST]] +03)----AggregateExec: mode=Partial, gby=[], aggr=[max(aggregate_test_100.c2) as percentile_cont(Float64(0)) WITHIN GROUP [aggregate_test_100.c2 DESC NULLS FIRST]] 04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c2], file_type=csv, has_header=true @@ -3929,9 +3929,9 @@ logical_plan 01)Aggregate: groupBy=[[]], aggr=[[min(CAST(aggregate_test_100.c2 AS Float64)) AS percentile_cont(aggregate_test_100.c2,Float64(0))]] 02)--TableScan: aggregate_test_100 projection=[c2] physical_plan -01)AggregateExec: mode=Final, gby=[], aggr=[percentile_cont(aggregate_test_100.c2,Float64(0))] +01)AggregateExec: mode=Final, gby=[], aggr=[min(aggregate_test_100.c2) as percentile_cont(aggregate_test_100.c2,Float64(0))] 02)--CoalescePartitionsExec -03)----AggregateExec: mode=Partial, gby=[], aggr=[percentile_cont(aggregate_test_100.c2,Float64(0))] +03)----AggregateExec: mode=Partial, gby=[], aggr=[min(aggregate_test_100.c2) as percentile_cont(aggregate_test_100.c2,Float64(0))] 04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c2], file_type=csv, has_header=true @@ -3942,9 +3942,9 @@ logical_plan 01)Aggregate: groupBy=[[]], aggr=[[max(CAST(aggregate_test_100.c2 AS Float64)) AS percentile_cont(aggregate_test_100.c2,Float64(1))]] 02)--TableScan: aggregate_test_100 projection=[c2] physical_plan -01)AggregateExec: mode=Final, gby=[], aggr=[percentile_cont(aggregate_test_100.c2,Float64(1))] +01)AggregateExec: mode=Final, gby=[], aggr=[max(aggregate_test_100.c2) as percentile_cont(aggregate_test_100.c2,Float64(1))] 02)--CoalescePartitionsExec -03)----AggregateExec: mode=Partial, gby=[], aggr=[percentile_cont(aggregate_test_100.c2,Float64(1))] +03)----AggregateExec: mode=Partial, gby=[], aggr=[max(aggregate_test_100.c2) as percentile_cont(aggregate_test_100.c2,Float64(1))] 04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c2], file_type=csv, has_header=true @@ -7553,6 +7553,18 @@ SELECT FIRST_VALUE(column1 ORDER BY column2) IGNORE NULLS FROM t; ---- 4 +query TT +EXPLAIN SELECT FIRST_VALUE(column1 ORDER BY column2 ASC NULLS FIRST) RESPECT NULLS AS agg FROM t; +---- +logical_plan +01)Projection: first_value(t.column1) RESPECT NULLS ORDER BY [t.column2 ASC NULLS FIRST] AS agg +02)--Aggregate: groupBy=[[]], aggr=[[first_value(t.column1) RESPECT NULLS ORDER BY [t.column2 ASC NULLS FIRST]]] +03)----TableScan: t projection=[column1, column2] +physical_plan +01)ProjectionExec: expr=[first_value(t.column1) RESPECT NULLS ORDER BY [t.column2 ASC NULLS FIRST]@0 as agg] +02)--AggregateExec: mode=Single, gby=[], aggr=[first_value(t.column1) RESPECT NULLS ORDER BY [t.column2 ASC NULLS FIRST]] +03)----DataSourceExec: partitions=1, partition_sizes=[1] + statement ok DROP TABLE t; diff --git a/datafusion/sqllogictest/test_files/aggregates_simplify.slt b/datafusion/sqllogictest/test_files/aggregates_simplify.slt index 9aa3ecf7a29f8..c4055d17396c5 100644 --- a/datafusion/sqllogictest/test_files/aggregates_simplify.slt +++ b/datafusion/sqllogictest/test_files/aggregates_simplify.slt @@ -91,7 +91,7 @@ logical_plan 03)----TableScan: sum_simplify_t projection=[] physical_plan 01)ProjectionExec: expr=[__common_expr_1@0 as sum(Int64(2) + Int64(1)), __common_expr_1@0 as sum(Int64(3))] -02)--AggregateExec: mode=Single, gby=[], aggr=[__common_expr_1] +02)--AggregateExec: mode=Single, gby=[], aggr=[sum(3) as __common_expr_1] 03)----DataSourceExec: partitions=1, partition_sizes=[1] @@ -170,9 +170,9 @@ physical_plan 02)--AggregateExec: mode=Final, gby=[], aggr=[sum(alias1), sum(alias2)] 03)----CoalescePartitionsExec 04)------AggregateExec: mode=Partial, gby=[], aggr=[sum(alias1), sum(alias2)] -05)--------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[alias2] +05)--------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[sum(__common_expr_1) as alias2] 06)----------RepartitionExec: partitioning=Hash([alias1@0], 4), input_partitions=1 -07)------------AggregateExec: mode=Partial, gby=[__common_expr_1@0 as alias1], aggr=[alias2] +07)------------AggregateExec: mode=Partial, gby=[__common_expr_1@0 as alias1], aggr=[sum(__common_expr_1) as alias2] 08)--------------ProjectionExec: expr=[column1@0 + 1 as __common_expr_1] 09)----------------DataSourceExec: partitions=1, partition_sizes=[1] @@ -249,7 +249,7 @@ logical_plan 03)----TableScan: sum_simplify_t projection=[] physical_plan 01)ProjectionExec: expr=[sum(random() + Int64(2))@1 > sum(random() + Int64(1))@0 as sum(random() + Int64(1)) < sum(random() + Int64(2))] -02)--AggregateExec: mode=Single, gby=[], aggr=[sum(random() + Int64(1)), sum(random() + Int64(2))] +02)--AggregateExec: mode=Single, gby=[], aggr=[sum(random() + 1) as sum(random() + Int64(1)), sum(random() + 2) as sum(random() + Int64(2))] 03)----DataSourceExec: partitions=1, partition_sizes=[1] # Checks grouped aggregates with explicit ORDER BY return deterministic row order. diff --git a/datafusion/sqllogictest/test_files/explain_analyze.slt b/datafusion/sqllogictest/test_files/explain_analyze.slt index 561c1a9dcbc87..712fdcb3c70f6 100644 --- a/datafusion/sqllogictest/test_files/explain_analyze.slt +++ b/datafusion/sqllogictest/test_files/explain_analyze.slt @@ -94,7 +94,8 @@ STORED AS PARQUET; ---- 0 -# All partition's output_rows: [4, 0, 1, 0] +# Mixed files can be redistributed across scan partitions by shared work +# stealing, so the exact skew value is not stable across environments. statement ok CREATE EXTERNAL TABLE skew_parquet STORED AS PARQUET @@ -103,7 +104,7 @@ LOCATION 'test_files/scratch/explain_analyze/output_rows_skew'; query TT EXPLAIN ANALYZE SELECT * FROM skew_parquet; ---- -Plan with Metrics DataSourceExec: output_rows_skew=84.31% +Plan with Metrics DataSourceExec: output_rows_skew=% # All partition's output_rows: [0] statement ok diff --git a/datafusion/sqllogictest/test_files/expr.slt b/datafusion/sqllogictest/test_files/expr.slt index 163730baae9e8..51b7591b41199 100644 --- a/datafusion/sqllogictest/test_files/expr.slt +++ b/datafusion/sqllogictest/test_files/expr.slt @@ -2436,7 +2436,7 @@ logical_plan 03)----TableScan: t projection=[a] physical_plan 01)ProjectionExec: expr=[min(t.a) FILTER (WHERE t.a > Int64(1))@0 as x] -02)--AggregateExec: mode=Single, gby=[], aggr=[min(t.a) FILTER (WHERE t.a > Int64(1))] +02)--AggregateExec: mode=Single, gby=[], aggr=[min(t.a) FILTER (WHERE t.a > Float32(1)) as min(t.a) FILTER (WHERE t.a > Int64(1))] 03)----DataSourceExec: partitions=1, partition_sizes=[1] diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index b313424951532..8c055c25caeb2 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -4443,9 +4443,9 @@ physical_plan 04)------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[count(alias1), min(alias1), sum(alias2), max(alias3)] 05)--------RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8 06)----------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[count(alias1), min(alias1), sum(alias2), max(alias3)] -07)------------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1, alias1@1 as alias1], aggr=[alias2, alias3] +07)------------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1, alias1@1 as alias1], aggr=[sum(aggregate_test_100.c3) as alias2, max(aggregate_test_100.c4) as alias3] 08)--------------RepartitionExec: partitioning=Hash([c1@0, alias1@1], 8), input_partitions=8 -09)----------------AggregateExec: mode=Partial, gby=[c1@0 as c1, c2@1 as alias1], aggr=[alias2, alias3] +09)----------------AggregateExec: mode=Partial, gby=[c1@0 as c1, c2@1 as alias1], aggr=[sum(aggregate_test_100.c3) as alias2, max(aggregate_test_100.c4) as alias3] 10)------------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 11)--------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4], file_type=csv, has_header=true diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index 368d252e25006..fcfcd00b34fff 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -237,9 +237,9 @@ physical_plan 02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(t2_id@1, t1_id@0)], projection=[t1_id@2, sum(t2.t2_int * Float64(1)) + Int64(1)@0] 03)----CoalescePartitionsExec 04)------ProjectionExec: expr=[sum(t2.t2_int * Float64(1))@1 + 1 as sum(t2.t2_int * Float64(1)) + Int64(1), t2_id@0 as t2_id] -05)--------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], aggr=[sum(t2.t2_int * Float64(1))] +05)--------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], aggr=[sum(t2.t2_int) as sum(t2.t2_int * Float64(1))] 06)----------RepartitionExec: partitioning=Hash([t2_id@0], 4), input_partitions=4 -07)------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[sum(t2.t2_int * Float64(1))] +07)------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[sum(t2.t2_int) as sum(t2.t2_int * Float64(1))] 08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 09)----------------DataSourceExec: partitions=1, partition_sizes=[2] 10)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q1.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q1.slt.part index 7e3617b1d597c..10c229546b93b 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q1.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q1.slt.part @@ -50,9 +50,9 @@ physical_plan 01)SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST, l_linestatus@1 ASC NULLS LAST] 02)--SortExec: expr=[l_returnflag@0 ASC NULLS LAST, l_linestatus@1 ASC NULLS LAST], preserve_partitioning=[true] 03)----ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, sum(lineitem.l_quantity)@2 as sum_qty, sum(lineitem.l_extendedprice)@3 as sum_base_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@4 as sum_disc_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax)@5 as sum_charge, avg(lineitem.l_quantity)@6 as avg_qty, avg(lineitem.l_extendedprice)@7 as avg_price, avg(lineitem.l_discount)@8 as avg_disc, count(Int64(1))@9 as count_order] -04)------AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(Int64(1))] +04)------AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(__common_expr_1) as sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(__common_expr_1 * Some(1),20,0 + lineitem.l_tax) as sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(Int64(1))] 05)--------RepartitionExec: partitioning=Hash([l_returnflag@0, l_linestatus@1], 4), input_partitions=4 -06)----------AggregateExec: mode=Partial, gby=[l_returnflag@5 as l_returnflag, l_linestatus@6 as l_linestatus], aggr=[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(Int64(1))] +06)----------AggregateExec: mode=Partial, gby=[l_returnflag@5 as l_returnflag, l_linestatus@6 as l_linestatus], aggr=[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(__common_expr_1) as sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(__common_expr_1 * Some(1),20,0 + lineitem.l_tax) as sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(Int64(1))] 07)------------ProjectionExec: expr=[l_extendedprice@0 * (Some(1),20,0 - l_discount@1) as __common_expr_1, l_quantity@2 as l_quantity, l_extendedprice@0 as l_extendedprice, l_discount@1 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus] 08)--------------FilterExec: l_shipdate@6 <= 1998-09-02, projection=[l_extendedprice@1, l_discount@2, l_quantity@0, l_tax@3, l_returnflag@4, l_linestatus@5] 09)----------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]}, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], file_type=csv, has_header=false diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q10.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q10.slt.part index 62649148bf058..33d5e273a0d37 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q10.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q10.slt.part @@ -72,9 +72,9 @@ physical_plan 01)SortPreservingMergeExec: [revenue@2 DESC], fetch=10 02)--SortExec: TopK(fetch=10), expr=[revenue@2 DESC], preserve_partitioning=[true] 03)----ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@7 as revenue, c_acctbal@2 as c_acctbal, n_name@4 as n_name, c_address@5 as c_address, c_phone@3 as c_phone, c_comment@6 as c_comment] -04)------AggregateExec: mode=FinalPartitioned, gby=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_acctbal@2 as c_acctbal, c_phone@3 as c_phone, n_name@4 as n_name, c_address@5 as c_address, c_comment@6 as c_comment], aggr=[sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] +04)------AggregateExec: mode=FinalPartitioned, gby=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_acctbal@2 as c_acctbal, c_phone@3 as c_phone, n_name@4 as n_name, c_address@5 as c_address, c_comment@6 as c_comment], aggr=[sum(lineitem.l_extendedprice * Some(1),20,0 - lineitem.l_discount) as sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] 05)--------RepartitionExec: partitioning=Hash([c_custkey@0, c_name@1, c_acctbal@2, c_phone@3, n_name@4, c_address@5, c_comment@6], 4), input_partitions=4 -06)----------AggregateExec: mode=Partial, gby=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_acctbal@4 as c_acctbal, c_phone@3 as c_phone, n_name@8 as n_name, c_address@2 as c_address, c_comment@5 as c_comment], aggr=[sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] +06)----------AggregateExec: mode=Partial, gby=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_acctbal@4 as c_acctbal, c_phone@3 as c_phone, n_name@8 as n_name, c_address@2 as c_address, c_comment@5 as c_comment], aggr=[sum(lineitem.l_extendedprice * Some(1),20,0 - lineitem.l_discount) as sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] 07)------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c_nationkey@3, n_nationkey@0)], projection=[c_custkey@0, c_name@1, c_address@2, c_phone@4, c_acctbal@5, c_comment@6, l_extendedprice@7, l_discount@8, n_name@10] 08)--------------RepartitionExec: partitioning=Hash([c_nationkey@3], 4), input_partitions=4 09)----------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(o_orderkey@7, l_orderkey@0)], projection=[c_custkey@0, c_name@1, c_address@2, c_nationkey@3, c_phone@4, c_acctbal@5, c_comment@6, l_extendedprice@9, l_discount@10] diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q14.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q14.slt.part index a9ac517f287d0..d391afefa20bb 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q14.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q14.slt.part @@ -42,9 +42,9 @@ logical_plan 08)--------TableScan: part projection=[p_partkey, p_type] physical_plan 01)ProjectionExec: expr=[100 * CAST(sum(CASE WHEN part.p_type LIKE Utf8("PROMO%") THEN lineitem.l_extendedprice * Int64(1) - lineitem.l_discount ELSE Int64(0) END)@0 AS Float64) / CAST(sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@1 AS Float64) as promo_revenue] -02)--AggregateExec: mode=Final, gby=[], aggr=[sum(CASE WHEN part.p_type LIKE Utf8("PROMO%") THEN lineitem.l_extendedprice * Int64(1) - lineitem.l_discount ELSE Int64(0) END), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] +02)--AggregateExec: mode=Final, gby=[], aggr=[sum(CASE WHEN part.p_type LIKE Utf8("PROMO%") THEN __common_expr_1 ELSE Some(0),38,4 END) as sum(CASE WHEN part.p_type LIKE Utf8("PROMO%") THEN lineitem.l_extendedprice * Int64(1) - lineitem.l_discount ELSE Int64(0) END), sum(__common_expr_1) as sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] 03)----CoalescePartitionsExec -04)------AggregateExec: mode=Partial, gby=[], aggr=[sum(CASE WHEN part.p_type LIKE Utf8("PROMO%") THEN lineitem.l_extendedprice * Int64(1) - lineitem.l_discount ELSE Int64(0) END), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] +04)------AggregateExec: mode=Partial, gby=[], aggr=[sum(CASE WHEN part.p_type LIKE Utf8("PROMO%") THEN __common_expr_1 ELSE Some(0),38,4 END) as sum(CASE WHEN part.p_type LIKE Utf8("PROMO%") THEN lineitem.l_extendedprice * Int64(1) - lineitem.l_discount ELSE Int64(0) END), sum(__common_expr_1) as sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] 05)--------ProjectionExec: expr=[l_extendedprice@0 * (Some(1),20,0 - l_discount@1) as __common_expr_1, p_type@2 as p_type] 06)----------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(l_partkey@0, p_partkey@0)], projection=[l_extendedprice@1, l_discount@2, p_type@4] 07)------------RepartitionExec: partitioning=Hash([l_partkey@0], 4), input_partitions=4 diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q15.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q15.slt.part index ae0c0a93a3552..b483bf1cf583e 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q15.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q15.slt.part @@ -78,17 +78,17 @@ physical_plan 05)--------CoalescePartitionsExec 06)----------AggregateExec: mode=Partial, gby=[], aggr=[max(revenue0.total_revenue)] 07)------------ProjectionExec: expr=[sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@1 as total_revenue] -08)--------------AggregateExec: mode=FinalPartitioned, gby=[l_suppkey@0 as l_suppkey], aggr=[sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] +08)--------------AggregateExec: mode=FinalPartitioned, gby=[l_suppkey@0 as l_suppkey], aggr=[sum(lineitem.l_extendedprice * Some(1),20,0 - lineitem.l_discount) as sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] 09)----------------RepartitionExec: partitioning=Hash([l_suppkey@0], 4), input_partitions=4 -10)------------------AggregateExec: mode=Partial, gby=[l_suppkey@0 as l_suppkey], aggr=[sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] +10)------------------AggregateExec: mode=Partial, gby=[l_suppkey@0 as l_suppkey], aggr=[sum(lineitem.l_extendedprice * Some(1),20,0 - lineitem.l_discount) as sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] 11)--------------------FilterExec: l_shipdate@3 >= 1996-01-01 AND l_shipdate@3 < 1996-04-01, projection=[l_suppkey@0, l_extendedprice@1, l_discount@2] 12)----------------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]}, projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate], file_type=csv, has_header=false 13)------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(s_suppkey@0, supplier_no@0)], projection=[s_suppkey@0, s_name@1, s_address@2, s_phone@3, total_revenue@5] 14)--------RepartitionExec: partitioning=Hash([s_suppkey@0], 4), input_partitions=1 15)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]}, projection=[s_suppkey, s_name, s_address, s_phone], file_type=csv, has_header=false 16)--------ProjectionExec: expr=[l_suppkey@0 as supplier_no, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@1 as total_revenue] -17)----------AggregateExec: mode=FinalPartitioned, gby=[l_suppkey@0 as l_suppkey], aggr=[sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] +17)----------AggregateExec: mode=FinalPartitioned, gby=[l_suppkey@0 as l_suppkey], aggr=[sum(lineitem.l_extendedprice * Some(1),20,0 - lineitem.l_discount) as sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] 18)------------RepartitionExec: partitioning=Hash([l_suppkey@0], 4), input_partitions=4 -19)--------------AggregateExec: mode=Partial, gby=[l_suppkey@0 as l_suppkey], aggr=[sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] +19)--------------AggregateExec: mode=Partial, gby=[l_suppkey@0 as l_suppkey], aggr=[sum(lineitem.l_extendedprice * Some(1),20,0 - lineitem.l_discount) as sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] 20)----------------FilterExec: l_shipdate@3 >= 1996-01-01 AND l_shipdate@3 < 1996-04-01, projection=[l_suppkey@0, l_extendedprice@1, l_discount@2] 21)------------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]}, projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate], file_type=csv, has_header=false diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q19.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q19.slt.part index 72c21e060fa66..8e38ca5a56d73 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q19.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q19.slt.part @@ -65,9 +65,9 @@ logical_plan 09)----------TableScan: part projection=[p_partkey, p_brand, p_size, p_container], partial_filters=[part.p_size >= Int32(1), part.p_brand = Utf8View("Brand#12") AND part.p_container IN ([Utf8View("SM CASE"), Utf8View("SM BOX"), Utf8View("SM PACK"), Utf8View("SM PKG")]) AND part.p_size <= Int32(5) OR part.p_brand = Utf8View("Brand#23") AND part.p_container IN ([Utf8View("MED BAG"), Utf8View("MED BOX"), Utf8View("MED PKG"), Utf8View("MED PACK")]) AND part.p_size <= Int32(10) OR part.p_brand = Utf8View("Brand#34") AND part.p_container IN ([Utf8View("LG CASE"), Utf8View("LG BOX"), Utf8View("LG PACK"), Utf8View("LG PKG")]) AND part.p_size <= Int32(15)] physical_plan 01)ProjectionExec: expr=[sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@0 as revenue] -02)--AggregateExec: mode=Final, gby=[], aggr=[sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] +02)--AggregateExec: mode=Final, gby=[], aggr=[sum(lineitem.l_extendedprice * Some(1),20,0 - lineitem.l_discount) as sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] 03)----CoalescePartitionsExec -04)------AggregateExec: mode=Partial, gby=[], aggr=[sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] +04)------AggregateExec: mode=Partial, gby=[], aggr=[sum(lineitem.l_extendedprice * Some(1),20,0 - lineitem.l_discount) as sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] 05)--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(l_partkey@0, p_partkey@0)], filter=p_brand@1 = Brand#12 AND p_container@3 IN (SET) ([SM CASE, SM BOX, SM PACK, SM PKG]) AND l_quantity@0 >= Some(100),15,2 AND l_quantity@0 <= Some(1100),15,2 AND p_size@2 <= 5 OR p_brand@1 = Brand#23 AND p_container@3 IN (SET) ([MED BAG, MED BOX, MED PKG, MED PACK]) AND l_quantity@0 >= Some(1000),15,2 AND l_quantity@0 <= Some(2000),15,2 AND p_size@2 <= 10 OR p_brand@1 = Brand#34 AND p_container@3 IN (SET) ([LG CASE, LG BOX, LG PACK, LG PKG]) AND l_quantity@0 >= Some(2000),15,2 AND l_quantity@0 <= Some(3000),15,2 AND p_size@2 <= 15, projection=[l_extendedprice@2, l_discount@3] 06)----------RepartitionExec: partitioning=Hash([l_partkey@0], 4), input_partitions=4 07)------------FilterExec: (l_quantity@1 >= Some(100),15,2 AND l_quantity@1 <= Some(1100),15,2 OR l_quantity@1 >= Some(1000),15,2 AND l_quantity@1 <= Some(2000),15,2 OR l_quantity@1 >= Some(2000),15,2 AND l_quantity@1 <= Some(3000),15,2) AND (l_shipmode@5 = AIR OR l_shipmode@5 = AIR REG) AND l_shipinstruct@4 = DELIVER IN PERSON, projection=[l_partkey@0, l_quantity@1, l_extendedprice@2, l_discount@3] diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q3.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q3.slt.part index 7fec4e5f5d624..ba56f10fab25f 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q3.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q3.slt.part @@ -61,7 +61,7 @@ physical_plan 01)SortPreservingMergeExec: [revenue@1 DESC, o_orderdate@2 ASC NULLS LAST], fetch=10 02)--SortExec: TopK(fetch=10), expr=[revenue@1 DESC, o_orderdate@2 ASC NULLS LAST], preserve_partitioning=[true] 03)----ProjectionExec: expr=[l_orderkey@0 as l_orderkey, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@3 as revenue, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority] -04)------AggregateExec: mode=SinglePartitioned, gby=[l_orderkey@2 as l_orderkey, o_orderdate@0 as o_orderdate, o_shippriority@1 as o_shippriority], aggr=[sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] +04)------AggregateExec: mode=SinglePartitioned, gby=[l_orderkey@2 as l_orderkey, o_orderdate@0 as o_orderdate, o_shippriority@1 as o_shippriority], aggr=[sum(lineitem.l_extendedprice * Some(1),20,0 - lineitem.l_discount) as sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] 05)--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(o_orderkey@0, l_orderkey@0)], projection=[o_orderdate@1, o_shippriority@2, l_orderkey@3, l_extendedprice@4, l_discount@5] 06)----------RepartitionExec: partitioning=Hash([o_orderkey@0], 4), input_partitions=4 07)------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c_custkey@0, o_custkey@1)], projection=[o_orderkey@1, o_orderdate@3, o_shippriority@4] diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q5.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q5.slt.part index d854001f3cc4c..bda0586963159 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q5.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q5.slt.part @@ -70,9 +70,9 @@ physical_plan 01)SortPreservingMergeExec: [revenue@1 DESC] 02)--SortExec: expr=[revenue@1 DESC], preserve_partitioning=[true] 03)----ProjectionExec: expr=[n_name@0 as n_name, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@1 as revenue] -04)------AggregateExec: mode=FinalPartitioned, gby=[n_name@0 as n_name], aggr=[sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] +04)------AggregateExec: mode=FinalPartitioned, gby=[n_name@0 as n_name], aggr=[sum(lineitem.l_extendedprice * Some(1),20,0 - lineitem.l_discount) as sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] 05)--------RepartitionExec: partitioning=Hash([n_name@0], 4), input_partitions=4 -06)----------AggregateExec: mode=Partial, gby=[n_name@2 as n_name], aggr=[sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] +06)----------AggregateExec: mode=Partial, gby=[n_name@2 as n_name], aggr=[sum(lineitem.l_extendedprice * Some(1),20,0 - lineitem.l_discount) as sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] 07)------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(n_regionkey@3, r_regionkey@0)], projection=[l_extendedprice@0, l_discount@1, n_name@2] 08)--------------RepartitionExec: partitioning=Hash([n_regionkey@3], 4), input_partitions=4 09)----------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(s_nationkey@2, n_nationkey@0)], projection=[l_extendedprice@0, l_discount@1, n_name@4, n_regionkey@5] diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q8.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q8.slt.part index 7f160ce072c4d..9c55c16de759b 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q8.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q8.slt.part @@ -93,9 +93,9 @@ physical_plan 01)SortPreservingMergeExec: [o_year@0 ASC NULLS LAST] 02)--SortExec: expr=[o_year@0 ASC NULLS LAST], preserve_partitioning=[true] 03)----ProjectionExec: expr=[o_year@0 as o_year, CAST(CAST(sum(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END)@1 AS Decimal128(12, 2)) / CAST(sum(all_nations.volume)@2 AS Decimal128(12, 2)) AS Decimal128(15, 2)) as mkt_share] -04)------AggregateExec: mode=FinalPartitioned, gby=[o_year@0 as o_year], aggr=[sum(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END), sum(all_nations.volume)] +04)------AggregateExec: mode=FinalPartitioned, gby=[o_year@0 as o_year], aggr=[sum(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Some(0),38,4 END) as sum(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END), sum(all_nations.volume)] 05)--------RepartitionExec: partitioning=Hash([o_year@0], 4), input_partitions=4 -06)----------AggregateExec: mode=Partial, gby=[o_year@0 as o_year], aggr=[sum(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END), sum(all_nations.volume)] +06)----------AggregateExec: mode=Partial, gby=[o_year@0 as o_year], aggr=[sum(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Some(0),38,4 END) as sum(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END), sum(all_nations.volume)] 07)------------ProjectionExec: expr=[date_part(YEAR, o_orderdate@0) as o_year, l_extendedprice@1 * (Some(1),20,0 - l_discount@2) as volume, n_name@3 as nation] 08)--------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(n_regionkey@3, r_regionkey@0)], projection=[o_orderdate@2, l_extendedprice@0, l_discount@1, n_name@4] 09)----------------RepartitionExec: partitioning=Hash([n_regionkey@3], 4), input_partitions=4 diff --git a/docs/source/library-user-guide/upgrading/54.0.0.md b/docs/source/library-user-guide/upgrading/54.0.0.md index 030ca729f265a..85136cf210459 100644 --- a/docs/source/library-user-guide/upgrading/54.0.0.md +++ b/docs/source/library-user-guide/upgrading/54.0.0.md @@ -25,6 +25,57 @@ in this section pertains to features and changes that have already been merged to the main branch and are awaiting release in this version. +### `expr::Alias` has a new `is_internal` field + +`datafusion_expr::expr::Alias` now stores whether an alias was created by +DataFusion internally. + +If your code builds `Alias` with a struct literal, add the new field: + +```rust +Alias { + expr, + relation, + name, + metadata, + is_internal: false, +} +``` + +Use `Alias::new_internal(...)`, `Expr::alias_internal(...)`, or +`Expr::alias_qualified_internal(...)` when you need an internal alias. + +If your code pattern matches `Alias { ... }` without `..`, add `is_internal` +to the pattern or ignore it with `..`. + +### `AggregateFunctionExpr::human_display()` now returns `Option<&str>` + +`datafusion_physical_expr::aggregate::AggregateFunctionExpr::human_display()` +now returns `Option<&str>` instead of `&str`. + +If your code read the display text directly, handle the `None` case and fall +back to `name()` when needed: + +```rust +let display = agg_expr.human_display().unwrap_or(agg_expr.name()); +``` + +### Physical `EXPLAIN` now shows lowered aggregate execution forms + +Physical `EXPLAIN` now prefers the lowered aggregate expression that the +engine executes, while still keeping the visible output alias. + +Examples: + +- `count(*)` may now appear as `count(1) as count(*)` +- simplified aggregates may show the lowered implementation, such as + `min(...) as percentile_cont(...)` +- internal aggregate aliases may now show the underlying expression instead of + only the alias name + +If you have tooling that parses the `aggr=[...]` text from physical `EXPLAIN`, +update it for the new output format. + ### String/numeric comparison coercion now prefers numeric types Previously, comparing a numeric column with a string value (e.g.,