From 936c4bd3f1b3b01ec0b89c0685f5601f23196a72 Mon Sep 17 00:00:00 2001 From: Shehab Ali Date: Mon, 30 Mar 2026 16:50:44 -0400 Subject: [PATCH] Fix non-deterministic iteration in SessionStateBuilder --- .../core/src/execution/session_state.rs | 62 +++++++++++++++++-- 1 file changed, 57 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index f0888e01049ad..3d0b5d9ea466a 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -975,6 +975,20 @@ impl SessionState { } } +/// Deduplicates function-registry map entries by keeping only entries whose key +/// matches the canonical name. The session stores one hash map entry per alias +/// plus the canonical name; filtering to canonical-name entries yields exactly +/// one [`Arc`] per logical function. +fn dedup_function_registry_by_canonical_name( + map: &HashMap>, + canonical_name: impl Fn(&T) -> &str, +) -> Vec> { + map.iter() + .filter(|(key, udf)| key.as_str() == canonical_name(udf.as_ref())) + .map(|(_, udf)| Arc::clone(udf)) + .collect() +} + /// A builder to be used for building [`SessionState`]'s. Defaults will /// be used for all values unless explicitly provided. /// @@ -1088,11 +1102,18 @@ impl SessionStateBuilder { query_planner: Some(existing.query_planner), catalog_list: Some(existing.catalog_list), table_functions: Some(existing.table_functions), - scalar_functions: Some(existing.scalar_functions.into_values().collect_vec()), - aggregate_functions: Some( - existing.aggregate_functions.into_values().collect_vec(), - ), - window_functions: Some(existing.window_functions.into_values().collect_vec()), + scalar_functions: Some(dedup_function_registry_by_canonical_name( + &existing.scalar_functions, + |u| u.name(), + )), + aggregate_functions: Some(dedup_function_registry_by_canonical_name( + &existing.aggregate_functions, + |u| u.name(), + )), + window_functions: Some(dedup_function_registry_by_canonical_name( + &existing.window_functions, + |u| u.name(), + )), extension_types: Some(existing.extension_types), serializer_registry: Some(existing.serializer_registry), file_formats: Some(existing.file_formats.into_values().collect_vec()), @@ -2340,6 +2361,37 @@ mod tests { Ok(()) } + #[test] + fn new_from_existing_preserves_scalar_udf_aliases() -> Result<()> { + use arrow::datatypes::DataType; + use datafusion_common::ScalarValue; + use datafusion_expr::registry::FunctionRegistry; + use datafusion_expr::{ColumnarValue, Volatility, create_udf}; + + let udf = create_udf( + "postgres_to_char", + vec![DataType::Utf8], + DataType::Utf8, + Volatility::Immutable, + Arc::new(|_args| Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None)))), + ) + .with_aliases(["to_char"]); + + let mut state = SessionStateBuilder::new().build(); + state.register_udf(Arc::new(udf))?; + + assert_eq!(state.udf("postgres_to_char")?.name(), "postgres_to_char"); + assert_eq!(state.udf("to_char")?.name(), "postgres_to_char"); + + let roundtrip = SessionStateBuilder::new_from_existing(state).build(); + assert_eq!(roundtrip.udf("to_char")?.name(), "postgres_to_char"); + assert_eq!( + roundtrip.udf("postgres_to_char")?.name(), + "postgres_to_char" + ); + Ok(()) + } + #[test] fn test_session_state_with_optimizer_rules() { #[derive(Default, Debug)]