diff --git a/Cargo.lock b/Cargo.lock index 5a76c063bbfad..24b50f7f7dd62 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2568,7 +2568,6 @@ dependencies = [ "pbjson 0.9.0", "pretty_assertions", "prost", - "rand 0.9.4", "serde", "serde_json", "tokio", diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index a42a1560cb769..f03b4f2649981 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -163,6 +163,9 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { fn children(&self) -> Vec<&Arc>; /// Returns a new PhysicalExpr where all children were replaced by new exprs. + /// + /// If the implementation returns a [`PhysicalExpr::expression_id`], then + /// the identifier should be preserved by the new expression. fn with_new_children( self: Arc, children: Vec>, @@ -438,6 +441,23 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { fn placement(&self) -> ExpressionPlacement { ExpressionPlacement::KeepInPlace } + + /// Return a stable, globally-unique identifier for this [`PhysicalExpr`], if it + /// has one. + /// + /// This identifier tracks expressions which are connected (e.g. `DynamicFilterPhysicalExpr` + /// where two expressions may be different but store the same mutable inner state). Tracking + /// connected expressions helps preserve referential integrity within plan nodes + /// during serialization and deserialization. + /// + /// This id must be preserved across [`PhysicalExpr::with_new_children`] or any other + /// methods which may want to preserve identity. + /// + /// Default is `None`: the expression has no identity worth preserving across a + /// serialization boundary. + fn expression_id(&self) -> Option { + None + } } #[deprecated( diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index b755353d75658..41be7b18b9119 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -55,6 +55,7 @@ indexmap = { workspace = true } itertools = { workspace = true, features = ["use_std"] } parking_lot = { workspace = true } petgraph = "0.8.3" +rand = { workspace = true } recursive = { workspace = true, optional = true } tokio = { workspace = true } half = { workspace = true } @@ -64,7 +65,6 @@ arrow = { workspace = true, features = ["test_utils"] } criterion = { workspace = true } datafusion-functions = { workspace = true } insta = { workspace = true } -rand = { workspace = true } rstest = { workspace = true } [[bench]] diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 47398d87e26a5..6b670a8800db3 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -27,6 +27,7 @@ use datafusion_common::{ }; use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::physical_expr::DynHash; +use rand::random; /// State of a dynamic filter, tracking both updates and completion. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -55,7 +56,6 @@ impl FilterState { /// For more background, please also see the [Dynamic Filters: Passing Information Between Operators During Execution for 25x Faster Queries blog] /// /// [Dynamic Filters: Passing Information Between Operators During Execution for 25x Faster Queries blog]: https://datafusion.apache.org/blog/2025/09/10/dynamic-filters -#[derive(Debug)] pub struct DynamicFilterPhysicalExpr { /// The original children of this PhysicalExpr, if any. /// This is necessary because the dynamic filter may be initialized with a placeholder (e.g. `lit(true)`) @@ -65,6 +65,10 @@ pub struct DynamicFilterPhysicalExpr { /// If any of the children were remapped / modified (e.g. to adjust for projections) we need to keep track of the new children /// so that when we update `current()` in subsequent iterations we can re-apply the replacements. remapped_children: Option>>, + /// Unique identifier for this dynamic filter. + /// + /// Derived filters (ex. via `with_new_children`) should inherit the expression id of the source filter. + expression_id: u64, /// The source of dynamic filters. inner: Arc>, /// Broadcasts filter state (updates and completion) to all waiters. @@ -76,16 +80,40 @@ pub struct DynamicFilterPhysicalExpr { nullable: Arc>>, } -#[derive(Debug)] -struct Inner { +/// Atomic internal state of a [`DynamicFilterPhysicalExpr`]. +/// +/// **Warning:** exposed publicly solely so that proto (de)serialization in +/// `datafusion-proto` can read and rebuild this state. Do not treat this type +/// or its layout as a stable API. +#[derive(Debug, Clone)] +pub struct Inner { /// A counter that gets incremented every time the expression is updated so that we can track changes cheaply. /// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes. - generation: u64, - expr: Arc, + pub generation: u64, + pub expr: Arc, /// Flag for quick synchronous check if filter is complete. /// This is redundant with the watch channel state, but allows us to return immediately /// from `wait_complete()` without subscribing if already complete. - is_complete: bool, + pub is_complete: bool, +} + +// TODO: Include expression_id in debug output. +// +// See https://github.com/apache/datafusion/issues/20418. Currently, plan nodes +// like `HashJoinExec`, `AggregateExec`, `SortExec` do not serialize their +// dynamic filter. This causes round trips to fail on the `expression_id` +// because it is regenerated on deserialization. +impl std::fmt::Debug for DynamicFilterPhysicalExpr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DynamicFilterPhysicalExpr") + .field("children", &self.children) + .field("remapped_children", &self.remapped_children) + .field("inner", &self.inner) + .field("state_watch", &self.state_watch) + .field("data_type", &self.data_type) + .field("nullable", &self.nullable) + .finish() + } } impl Inner { @@ -173,6 +201,7 @@ impl DynamicFilterPhysicalExpr { Self { children, remapped_children: None, // Initially no remapped children + expression_id: Self::new_expression_id(), inner: Arc::new(RwLock::new(Inner::new(inner))), state_watch, data_type: Arc::new(RwLock::new(None)), @@ -346,6 +375,75 @@ impl DynamicFilterPhysicalExpr { write!(f, " ]") } + + /// Generate a new expression id for this filter. + fn new_expression_id() -> u64 { + random::() + } + + /// Return the filter's original children (before any remapping). + /// + /// **Warning:** intended only for `datafusion-proto` (de)serialization. + /// Not a stable API. + pub fn original_children(&self) -> &[Arc] { + &self.children + } + + /// Return the filter's remapped children, if any have been set via + /// [`PhysicalExpr::with_new_children`]. + /// + /// **Warning:** intended only for `datafusion-proto` (de)serialization. + /// Not a stable API. + pub fn remapped_children(&self) -> Option<&[Arc]> { + self.remapped_children.as_deref() + } + + /// Rebuild a `DynamicFilterPhysicalExpr` from its stored parts. Used by + /// proto deserialization to preserve `expression_id` across a roundtrip + /// rather than minting a fresh one. + /// + /// **Warning:** intended only for `datafusion-proto` (de)serialization. + /// Not a stable API. + pub fn from_parts( + expression_id: u64, + children: Vec>, + remapped_children: Option>>, + inner: Inner, + ) -> Self { + let state = if inner.is_complete { + FilterState::Complete { + generation: inner.generation, + } + } else { + FilterState::InProgress { + generation: inner.generation, + } + }; + let (state_watch, _) = watch::channel(state); + + Self { + children, + remapped_children, + expression_id, + inner: Arc::new(RwLock::new(inner)), + state_watch, + data_type: Arc::new(RwLock::new(None)), + nullable: Arc::new(RwLock::new(None)), + } + } + + /// Return a clone of the atomically-captured `Inner` state. + /// + /// **Warning:** intended only for `datafusion-proto` (de)serialization. + /// Not a stable API. + pub fn inner(&self) -> Inner { + let guard = self.inner.read(); + Inner { + generation: guard.generation, + expr: Arc::clone(&guard.expr), + is_complete: guard.is_complete, + } + } } impl PhysicalExpr for DynamicFilterPhysicalExpr { @@ -364,6 +462,9 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { Ok(Arc::new(Self { children: self.children.clone(), remapped_children: Some(children), + // Note that we ensure the derived expression linked to `self` + // via the unique identifier. + expression_id: self.expression_id, inner: Arc::clone(&self.inner), state_watch: self.state_watch.clone(), data_type: Arc::clone(&self.data_type), @@ -444,6 +545,10 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { // Return the current generation of the expression. self.inner.read().generation } + + fn expression_id(&self) -> Option { + Some(self.expression_id) + } } #[cfg(test)] @@ -861,4 +966,112 @@ mod test { "Hash should be stable after update (identity-based)" ); } + + /// Verifies that `from_parts` rebuilds a `DynamicFilterPhysicalExpr` + /// whose observable state (original children, remapped children, + /// expression id, inner generation/expr/is_complete) matches the source + /// filter. + #[test] + fn test_from_parts_preserves_state() { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let col_a = col("a", &schema).unwrap(); + + // Create a dynamic filter with children + let expr = Arc::new(BinaryExpr::new( + Arc::clone(&col_a), + datafusion_expr::Operator::Gt, + lit(10) as Arc, + )); + let filter = DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col_a)], + expr as Arc, + ); + + // Add remapped children. + let reassigned_schema = Arc::new(Schema::new(vec![ + Field::new("b", DataType::Int32, false), + Field::new("a", DataType::Int32, false), + ])); + let reassigned = reassign_expr_columns( + Arc::new(filter) as Arc, + &reassigned_schema, + ) + .expect("reassign_expr_columns should succeed"); + let reassigned = reassigned + .downcast_ref::() + .expect("Expected dynamic filter after reassignment"); + + reassigned + .update(lit(42) as Arc) + .expect("Update should succeed"); + reassigned.mark_complete(); + + // Capture the parts and reconstruct. + let reconstructed = DynamicFilterPhysicalExpr::from_parts( + reassigned.expression_id, + reassigned.original_children().to_vec(), + reassigned.remapped_children().map(|r| r.to_vec()), + reassigned.inner(), + ); + + assert_eq!( + reassigned.original_children(), + reconstructed.original_children(), + ); + assert_eq!( + reassigned.remapped_children(), + reconstructed.remapped_children(), + ); + assert_eq!(reassigned.expression_id(), reconstructed.expression_id()); + let r = reassigned.inner(); + let c = reconstructed.inner(); + assert_eq!(r.generation, c.generation); + assert_eq!(r.is_complete, c.is_complete); + assert_eq!(format!("{:?}", r.expr), format!("{:?}", c.expr)); + } + + #[tokio::test] + async fn test_expression_id() { + let source_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let col_a = col("a", &source_schema).unwrap(); + + // Create a source filter + let source = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col_a)], + lit(true) as Arc, + )); + let source_clone = Arc::clone(&source); + + // Create a derived filter by reassigning the source filter to a different schema. + let derived_schema = Arc::new(Schema::new(vec![ + Field::new("x", DataType::Int32, false), + Field::new("a", DataType::Int32, false), + ])); + let derived = reassign_expr_columns( + Arc::clone(&source) as Arc, + &derived_schema, + ) + .expect("reassign_expr_columns should succeed"); + + let derived_expression_id = derived + .expression_id() + .expect("derived filter should have an expression id"); + let source_expression_id = source + .expression_id() + .expect("source filter should have an expression id"); + let source_clone_expression_id = source_clone + .expression_id() + .expect("source clone should have an expression id"); + + assert_eq!( + source_clone_expression_id, source_expression_id, + "cloned filter should preserve its expression id", + ); + + assert_eq!( + derived_expression_id, source_expression_id, + "derived filters should carry forward the source expression id", + ); + } } diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index 1e082355486f6..104afbb194ed1 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -43,7 +43,7 @@ pub use case::{CaseExpr, case}; pub use cast::{CastExpr, cast}; pub use column::{Column, col, with_new_schema}; pub use datafusion_expr::utils::format_state_name; -pub use dynamic_filters::DynamicFilterPhysicalExpr; +pub use dynamic_filters::{DynamicFilterPhysicalExpr, Inner as DynamicFilterInner}; pub use in_list::{InListExpr, in_list}; pub use is_not_null::{IsNotNullExpr, is_not_null}; pub use is_null::{IsNullExpr, is_null}; diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml index baa7dd8770e71..4484846813296 100644 --- a/datafusion/proto/Cargo.toml +++ b/datafusion/proto/Cargo.toml @@ -66,7 +66,6 @@ datafusion-proto-common = { workspace = true } object_store = { workspace = true } pbjson = { workspace = true, optional = true } prost = { workspace = true } -rand = { workspace = true } serde = { version = "1.0", optional = true } serde_json = { workspace = true, optional = true } diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index c61226fb526f6..6fccaeb8f738f 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -876,11 +876,9 @@ message PhysicalExprNode { reserved 17; // Unique identifier for this expression to do deduplication during deserialization. - // When serializing, this is set to a unique identifier for each combination of - // expression, process and serialization run. - // When deserializing, if this ID has been seen before, the cached Arc is returned - // instead of creating a new one, enabling reconstruction of referential integrity - // across serde roundtrips. + // When serializing, this is set via `PhysicalExpr::expression_id`. When deserializing, + // this id is used by the `DeduplicatingProtoConverter` to preserve referential + // integrity across serde roundtrips for different expressions with the same id. optional uint64 expr_id = 30; oneof ExprType { @@ -920,9 +918,19 @@ message PhysicalExprNode { UnknownColumn unknown_column = 20; PhysicalHashExprNode hash_expr = 21; + + PhysicalDynamicFilterNode dynamic_filter = 22; } } +message PhysicalDynamicFilterNode { + repeated PhysicalExprNode children = 1; + repeated PhysicalExprNode remapped_children = 2; + uint64 generation = 3; + PhysicalExprNode inner_expr = 4; + bool is_complete = 5; +} + message PhysicalScalarUdfNode { string name = 1; repeated PhysicalExprNode args = 2; @@ -1477,4 +1485,4 @@ message AsyncFuncExecNode { message BufferExecNode { PhysicalPlanNode input = 1; uint64 capacity = 2; -} \ No newline at end of file +} diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 82bcdac898204..b04c0af6468b7 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -16531,6 +16531,172 @@ impl<'de> serde::Deserialize<'de> for PhysicalDateTimeIntervalExprNode { deserializer.deserialize_struct("datafusion.PhysicalDateTimeIntervalExprNode", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for PhysicalDynamicFilterNode { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.children.is_empty() { + len += 1; + } + if !self.remapped_children.is_empty() { + len += 1; + } + if self.generation != 0 { + len += 1; + } + if self.inner_expr.is_some() { + len += 1; + } + if self.is_complete { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalDynamicFilterNode", len)?; + if !self.children.is_empty() { + struct_ser.serialize_field("children", &self.children)?; + } + if !self.remapped_children.is_empty() { + struct_ser.serialize_field("remappedChildren", &self.remapped_children)?; + } + if self.generation != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("generation", ToString::to_string(&self.generation).as_str())?; + } + if let Some(v) = self.inner_expr.as_ref() { + struct_ser.serialize_field("innerExpr", v)?; + } + if self.is_complete { + struct_ser.serialize_field("isComplete", &self.is_complete)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for PhysicalDynamicFilterNode { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "children", + "remapped_children", + "remappedChildren", + "generation", + "inner_expr", + "innerExpr", + "is_complete", + "isComplete", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Children, + RemappedChildren, + Generation, + InnerExpr, + IsComplete, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl serde::de::Visitor<'_> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "children" => Ok(GeneratedField::Children), + "remappedChildren" | "remapped_children" => Ok(GeneratedField::RemappedChildren), + "generation" => Ok(GeneratedField::Generation), + "innerExpr" | "inner_expr" => Ok(GeneratedField::InnerExpr), + "isComplete" | "is_complete" => Ok(GeneratedField::IsComplete), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = PhysicalDynamicFilterNode; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.PhysicalDynamicFilterNode") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut children__ = None; + let mut remapped_children__ = None; + let mut generation__ = None; + let mut inner_expr__ = None; + let mut is_complete__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Children => { + if children__.is_some() { + return Err(serde::de::Error::duplicate_field("children")); + } + children__ = Some(map_.next_value()?); + } + GeneratedField::RemappedChildren => { + if remapped_children__.is_some() { + return Err(serde::de::Error::duplicate_field("remappedChildren")); + } + remapped_children__ = Some(map_.next_value()?); + } + GeneratedField::Generation => { + if generation__.is_some() { + return Err(serde::de::Error::duplicate_field("generation")); + } + generation__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::InnerExpr => { + if inner_expr__.is_some() { + return Err(serde::de::Error::duplicate_field("innerExpr")); + } + inner_expr__ = map_.next_value()?; + } + GeneratedField::IsComplete => { + if is_complete__.is_some() { + return Err(serde::de::Error::duplicate_field("isComplete")); + } + is_complete__ = Some(map_.next_value()?); + } + } + } + Ok(PhysicalDynamicFilterNode { + children: children__.unwrap_or_default(), + remapped_children: remapped_children__.unwrap_or_default(), + generation: generation__.unwrap_or_default(), + inner_expr: inner_expr__, + is_complete: is_complete__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.PhysicalDynamicFilterNode", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for PhysicalExprNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -16610,6 +16776,9 @@ impl serde::Serialize for PhysicalExprNode { physical_expr_node::ExprType::HashExpr(v) => { struct_ser.serialize_field("hashExpr", v)?; } + physical_expr_node::ExprType::DynamicFilter(v) => { + struct_ser.serialize_field("dynamicFilter", v)?; + } } } struct_ser.end() @@ -16656,6 +16825,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "unknownColumn", "hash_expr", "hashExpr", + "dynamic_filter", + "dynamicFilter", ]; #[allow(clippy::enum_variant_names)] @@ -16680,6 +16851,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { Extension, UnknownColumn, HashExpr, + DynamicFilter, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -16721,6 +16893,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "extension" => Ok(GeneratedField::Extension), "unknownColumn" | "unknown_column" => Ok(GeneratedField::UnknownColumn), "hashExpr" | "hash_expr" => Ok(GeneratedField::HashExpr), + "dynamicFilter" | "dynamic_filter" => Ok(GeneratedField::DynamicFilter), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -16883,6 +17056,13 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { return Err(serde::de::Error::duplicate_field("hashExpr")); } expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::HashExpr) +; + } + GeneratedField::DynamicFilter => { + if expr_type__.is_some() { + return Err(serde::de::Error::duplicate_field("dynamicFilter")); + } + expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::DynamicFilter) ; } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index ab60c3058dbde..0ab2c539c553e 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1308,16 +1308,14 @@ pub struct PhysicalExtensionNode { #[derive(Clone, PartialEq, ::prost::Message)] pub struct PhysicalExprNode { /// Unique identifier for this expression to do deduplication during deserialization. - /// When serializing, this is set to a unique identifier for each combination of - /// expression, process and serialization run. - /// When deserializing, if this ID has been seen before, the cached Arc is returned - /// instead of creating a new one, enabling reconstruction of referential integrity - /// across serde roundtrips. + /// When serializing, this is set via `PhysicalExpr::expression_id`. When deserializing, + /// this id is used by the `DeduplicatingProtoConverter` to preserve referential + /// integrity across serde roundtrips for different expressions with the same id. #[prost(uint64, optional, tag = "30")] pub expr_id: ::core::option::Option, #[prost( oneof = "physical_expr_node::ExprType", - tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20, 21" + tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20, 21, 22" )] pub expr_type: ::core::option::Option, } @@ -1370,9 +1368,24 @@ pub mod physical_expr_node { UnknownColumn(super::UnknownColumn), #[prost(message, tag = "21")] HashExpr(super::PhysicalHashExprNode), + #[prost(message, tag = "22")] + DynamicFilter(::prost::alloc::boxed::Box), } } #[derive(Clone, PartialEq, ::prost::Message)] +pub struct PhysicalDynamicFilterNode { + #[prost(message, repeated, tag = "1")] + pub children: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "2")] + pub remapped_children: ::prost::alloc::vec::Vec, + #[prost(uint64, tag = "3")] + pub generation: u64, + #[prost(message, optional, boxed, tag = "4")] + pub inner_expr: ::core::option::Option<::prost::alloc::boxed::Box>, + #[prost(bool, tag = "5")] + pub is_complete: bool, +} +#[derive(Clone, PartialEq, ::prost::Message)] pub struct PhysicalScalarUdfNode { #[prost(string, tag = "1")] pub name: ::prost::alloc::string::String, diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 61665db607757..dc58eaeccd305 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -58,6 +58,9 @@ use super::{ use crate::logical_plan::{self}; use crate::protobuf::physical_expr_node::ExprType; use crate::{convert_required, protobuf}; +use datafusion_physical_expr::expressions::{ + DynamicFilterInner, DynamicFilterPhysicalExpr, +}; impl From<&protobuf::PhysicalColumn> for Column { fn from(c: &protobuf::PhysicalColumn) -> Column { @@ -524,6 +527,56 @@ pub fn parse_physical_expr_with_converter( hash_expr.description.clone(), )) } + ExprType::DynamicFilter(dynamic_filter) => { + let children = parse_physical_exprs( + &dynamic_filter.children, + ctx, + input_schema, + codec, + proto_converter, + )?; + + let remapped_children = if !dynamic_filter.remapped_children.is_empty() { + Some(parse_physical_exprs( + &dynamic_filter.remapped_children, + ctx, + input_schema, + codec, + proto_converter, + )?) + } else { + None + }; + + let inner_expr = parse_required_physical_expr( + dynamic_filter.inner_expr.as_deref(), + ctx, + "inner_expr", + input_schema, + codec, + proto_converter, + )?; + + let expression_id = proto.expr_id.ok_or_else(|| { + proto_error( + "DynamicFilterPhysicalExpr requires PhysicalExprNode.expr_id \ + to be set by the serializer", + ) + })?; + + let base_filter: Arc = + Arc::new(DynamicFilterPhysicalExpr::from_parts( + expression_id, + children, + remapped_children, + DynamicFilterInner { + generation: dynamic_filter.generation, + expr: inner_expr, + is_complete: dynamic_filter.is_complete, + }, + )); + base_filter + } ExprType::Extension(extension) => { let inputs: Vec> = extension .inputs diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 206f4378d3d3b..40f8178869609 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -19,7 +19,6 @@ use std::any::Any; use std::cell::RefCell; use std::collections::HashMap; use std::fmt::Debug; -use std::hash::{DefaultHasher, Hash, Hasher}; use std::sync::Arc; use arrow::compute::SortOptions; @@ -3791,6 +3790,7 @@ struct DataEncoderTuple { } pub struct DefaultPhysicalProtoConverter; + impl PhysicalProtoConverterExtension for DefaultPhysicalProtoConverter { fn proto_to_execution_plan( &self, @@ -3839,85 +3839,11 @@ impl PhysicalProtoConverterExtension for DefaultPhysicalProtoConverter { } } -/// Internal serializer that adds expr_id to expressions. -/// Created fresh for each serialization operation. -struct DeduplicatingSerializer { - /// Random salt combined with pointer addresses and process ID to create globally unique expr_ids. - session_id: u64, -} - -impl DeduplicatingSerializer { - fn new() -> Self { - Self { - session_id: rand::random(), - } - } -} - -impl PhysicalProtoConverterExtension for DeduplicatingSerializer { - fn proto_to_execution_plan( - &self, - _ctx: &TaskContext, - _codec: &dyn PhysicalExtensionCodec, - _proto: &protobuf::PhysicalPlanNode, - ) -> Result> { - internal_err!("DeduplicatingSerializer cannot deserialize execution plans") - } - - fn execution_plan_to_proto( - &self, - plan: &Arc, - codec: &dyn PhysicalExtensionCodec, - ) -> Result - where - Self: Sized, - { - protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter( - Arc::clone(plan), - codec, - self, - ) - } - - fn proto_to_physical_expr( - &self, - _proto: &protobuf::PhysicalExprNode, - _ctx: &TaskContext, - _input_schema: &Schema, - _codec: &dyn PhysicalExtensionCodec, - ) -> Result> - where - Self: Sized, - { - internal_err!("DeduplicatingSerializer cannot deserialize physical expressions") - } - - fn physical_expr_to_proto( - &self, - expr: &Arc, - codec: &dyn PhysicalExtensionCodec, - ) -> Result { - let mut proto = serialize_physical_expr_with_converter(expr, codec, self)?; - - // Hash session_id, pointer address, and process ID together to create expr_id. - // - session_id: random per serializer, prevents collisions when merging serializations - // - ptr: unique address per Arc within a process - // - pid: prevents collisions if serializer is shared across processes - let mut hasher = DefaultHasher::new(); - self.session_id.hash(&mut hasher); - (Arc::as_ptr(expr) as *const () as u64).hash(&mut hasher); - std::process::id().hash(&mut hasher); - proto.expr_id = Some(hasher.finish()); - - Ok(proto) - } -} - -/// Internal deserializer that caches expressions by expr_id. -/// Created fresh for each deserialization operation. +/// Internal deserializer that caches expressions by their `expression_id()` so +/// multiple occurrences of the same expression are deduped. #[derive(Default)] struct DeduplicatingDeserializer { - /// Cache mapping expr_id to deserialized expressions. + /// Cache mapping expression_id to deserialized expressions. cache: RefCell>>, } @@ -3952,24 +3878,37 @@ impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { where Self: Sized, { - if let Some(expr_id) = proto.expr_id { - // Check cache first - if let Some(cached) = self.cache.borrow().get(&expr_id) { - return Ok(Arc::clone(cached)); - } - // Deserialize and cache - let expr = parse_physical_expr_with_converter( + // `expr_id` is the generic identity slot on `PhysicalExprNode`. + // The default serializer populates it from `PhysicalExpr::expression_id` + // A missing id means this expression type doesn't participate in deduping. + let Some(id) = proto.expr_id else { + return parse_physical_expr_with_converter( proto, ctx, input_schema, codec, self, - )?; - self.cache.borrow_mut().insert(expr_id, Arc::clone(&expr)); - Ok(expr) - } else { - parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self) + ); + }; + + let parsed = + parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self)?; + + let mut cache = self.cache.borrow_mut(); + if let Some(cached) = cache.get(&id) { + // Since expressions may manage their own internal state when deriving + // expressions via `with_new_children`, we use `with_new_children` + // to opt into the same behavior. + // + // For example, one `DynamicFilterPhysicalExpr` may be derived from + // another resulting in shared references. Using `with_new_children` + // is meant to preserve those references. + let children: Vec<_> = parsed.children().into_iter().cloned().collect(); + return Arc::clone(cached).with_new_children(children); } + + cache.insert(id, Arc::clone(&parsed)); + Ok(parsed) } fn physical_expr_to_proto( @@ -3981,19 +3920,12 @@ impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { } } -/// A proto converter that adds expression deduplication during serialization -/// and deserialization. -/// -/// During serialization, each expression's Arc pointer address is XORed with a -/// random session_id to create a salted `expr_id`. This prevents cross-process -/// collisions when serialized plans are merged. +/// A proto converter that deduplicates [`PhysicalExpr`] by [`PhysicalExpr::expression_id`]. +/// This helps preserve referential integrity when deserializing [`ExecutionPlan`]s +/// which may contain multiple occurrences of the same [`PhysicalExpr`] (ex. when +/// [`DynamicFilterPhysicalExpr`] are pushed down, it is important to preserve +/// referential integrity). /// -/// During deserialization, expressions with the same `expr_id` share the same -/// Arc, reducing memory usage for plans with duplicate expressions (e.g., large -/// IN lists) and supporting correctly linking [`DynamicFilterPhysicalExpr`] instances. -/// -/// This converter is stateless - it creates internal serializers/deserializers -/// on demand for each operation. /// /// [`DynamicFilterPhysicalExpr`]: https://docs.rs/datafusion-physical-expr/latest/datafusion_physical_expr/expressions/struct.DynamicFilterPhysicalExpr.html #[derive(Debug, Default, Clone, Copy)] @@ -4018,11 +3950,10 @@ impl PhysicalProtoConverterExtension for DeduplicatingProtoConverter { where Self: Sized, { - let serializer = DeduplicatingSerializer::new(); protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter( Arc::clone(plan), codec, - &serializer, + self, ) } @@ -4045,8 +3976,7 @@ impl PhysicalProtoConverterExtension for DeduplicatingProtoConverter { expr: &Arc, codec: &dyn PhysicalExtensionCodec, ) -> Result { - let serializer = DeduplicatingSerializer::new(); - serializer.physical_expr_to_proto(expr, codec) + serialize_physical_expr_with_converter(expr, codec, self) } } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index f0eb6d27aac30..26309a7c9ce8e 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -33,11 +33,11 @@ use datafusion_datasource_parquet::file_format::ParquetSink; use datafusion_expr::WindowFrame; use datafusion_physical_expr::ScalarFunctionExpr; use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; -use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::{ - BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr, - LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, + BinaryExpr, CaseExpr, CastExpr, Column, DynamicFilterPhysicalExpr, InListExpr, + IsNotNullExpr, IsNullExpr, LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, + UnKnownColumn, }; use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr}; use datafusion_physical_plan::udaf::AggregateFunctionExpr; @@ -257,10 +257,8 @@ pub fn serialize_physical_expr_with_converter( codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result { - // Snapshot the expr in case it has dynamic predicate state so - // it can be serialized - let expr = snapshot_physical_expr(Arc::clone(value))?; - + let expr = value.as_ref(); + let expr_id = value.expression_id(); // HashTableLookupExpr is used for dynamic filter pushdown in hash joins. // It contains an Arc (the build-side hash table) which // cannot be serialized - the hash table is a runtime structure built during @@ -281,14 +279,14 @@ pub fn serialize_physical_expr_with_converter( )), }; return Ok(protobuf::PhysicalExprNode { - expr_id: None, + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(value)), }); } if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { - expr_id: None, + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::Column( protobuf::PhysicalColumn { name: expr.name().to_string(), @@ -298,7 +296,7 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { - expr_id: None, + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn( protobuf::UnknownColumn { name: expr.name().to_string(), @@ -340,14 +338,14 @@ pub fn serialize_physical_expr_with_converter( }); Ok(protobuf::PhysicalExprNode { - expr_id: None, + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr( binary_expr, )), }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { - expr_id: None, + expr_id, expr_type: Some( protobuf::physical_expr_node::ExprType::Case( Box::new( @@ -390,7 +388,7 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { - expr_id: None, + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new( protobuf::PhysicalNot { expr: Some(Box::new( @@ -401,7 +399,7 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { - expr_id: None, + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr( Box::new(protobuf::PhysicalIsNull { expr: Some(Box::new( @@ -412,7 +410,7 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { - expr_id: None, + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr( Box::new(protobuf::PhysicalIsNotNull { expr: Some(Box::new( @@ -423,7 +421,7 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { - expr_id: None, + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::InList(Box::new( protobuf::PhysicalInListNode { expr: Some(Box::new( @@ -436,7 +434,7 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { - expr_id: None, + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::Negative(Box::new( protobuf::PhysicalNegativeNode { expr: Some(Box::new( @@ -447,14 +445,14 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(lit) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { - expr_id: None, + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::Literal( lit.value().try_into()?, )), }) } else if let Some(cast) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { - expr_id: None, + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::Cast(Box::new( protobuf::PhysicalCastNode { expr: Some(Box::new( @@ -466,7 +464,7 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(cast) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { - expr_id: None, + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new( protobuf::PhysicalTryCastNode { expr: Some(Box::new( @@ -480,7 +478,7 @@ pub fn serialize_physical_expr_with_converter( let mut buf = Vec::new(); codec.try_encode_udf(expr.fun(), &mut buf)?; Ok(protobuf::PhysicalExprNode { - expr_id: None, + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::ScalarUdf( protobuf::PhysicalScalarUdfNode { name: expr.name().to_string(), @@ -497,7 +495,7 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { - expr_id: None, + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::LikeExpr(Box::new( protobuf::PhysicalLikeExprNode { negated: expr.negated(), @@ -513,7 +511,7 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { - expr_id: None, + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::HashExpr( protobuf::PhysicalHashExprNode { on_columns: serialize_physical_exprs( @@ -526,6 +524,39 @@ pub fn serialize_physical_expr_with_converter( }, )), }) + } else if let Some(df) = expr.downcast_ref::() { + let children = df + .original_children() + .iter() + .map(|child| proto_converter.physical_expr_to_proto(child, codec)) + .collect::>>()?; + + let remapped_children = if let Some(remapped) = df.remapped_children() { + remapped + .iter() + .map(|child| proto_converter.physical_expr_to_proto(child, codec)) + .collect::>>()? + } else { + vec![] + }; + + // Atomic snapshot of inner state. + let inner = df.inner(); + let inner_expr = + Box::new(proto_converter.physical_expr_to_proto(&inner.expr, codec)?); + + Ok(protobuf::PhysicalExprNode { + expr_id, + expr_type: Some(protobuf::physical_expr_node::ExprType::DynamicFilter( + Box::new(protobuf::PhysicalDynamicFilterNode { + children, + remapped_children, + generation: inner.generation, + inner_expr: Some(inner_expr), + is_complete: inner.is_complete, + }), + )), + }) } else { let mut buf: Vec = vec![]; match codec.try_encode_expr(value, &mut buf) { @@ -536,7 +567,7 @@ pub fn serialize_physical_expr_with_converter( .map(|e| proto_converter.physical_expr_to_proto(e, codec)) .collect::>()?; Ok(protobuf::PhysicalExprNode { - expr_id: None, + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::Extension( protobuf::PhysicalExtensionExprNode { expr: buf, inputs }, )), diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 3f1c306603bc1..96e7b79da501c 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -128,6 +128,8 @@ use crate::cases::{ CustomUDWF, CustomUDWFNode, MyAggregateUDF, MyAggregateUdfNode, MyRegexUdf, MyRegexUdfNode, }; +use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; +use datafusion_physical_expr::utils::reassign_expr_columns; /// Perform a serde roundtrip and assert that the string representation of the before and after plans /// are identical. Note that this often isn't sufficient to guarantee that no information is @@ -2683,442 +2685,295 @@ fn roundtrip_call_null_scalar_struct_dict() -> Result<()> { roundtrip_test(filter) } -/// Test that expression deduplication works during deserialization. -/// When the same expression Arc is serialized multiple times, it should be -/// deduplicated on deserialization (sharing the same Arc). -#[test] -fn test_expression_deduplication() -> Result<()> { - let field_a = Field::new("a", DataType::Int64, false); - let schema = Arc::new(Schema::new(vec![field_a])); - - // Create a shared expression that will be used multiple times - let shared_col: Arc = Arc::new(Column::new("a", 0)); - - // Create an InList expression that uses the same column Arc multiple times - // This simulates a real-world scenario where expressions are shared - let in_list_expr = in_list( - Arc::clone(&shared_col), - vec![lit(1i64), lit(2i64), lit(3i64)], - &false, - &schema, - )?; - - // Create a binary expression that uses the shared column and the in_list result - let binary_expr: Arc = Arc::new(BinaryExpr::new( - Arc::clone(&shared_col), - Operator::Eq, - lit(42i64), - )); - - // Create a plan that has both expressions (they share the `shared_col` Arc) - let input = Arc::new(EmptyExec::new(schema.clone())); - let filter = FilterExecBuilder::new(in_list_expr, input).build()?; - let projection_exprs = vec![ProjectionExpr { - expr: binary_expr, - alias: "result".to_string(), - }]; - let exec_plan = - Arc::new(ProjectionExec::try_new(projection_exprs, Arc::new(filter))?); - - let ctx = SessionContext::new(); - let codec = DefaultPhysicalExtensionCodec {}; - let proto_converter = DeduplicatingProtoConverter {}; - - // Perform roundtrip - let bytes = physical_plan_to_bytes_with_proto_converter( - Arc::clone(&exec_plan) as Arc, - &codec, - &proto_converter, - )?; - - // Create a new converter for deserialization (fresh cache) - let deser_converter = DeduplicatingProtoConverter {}; - let result_plan = physical_plan_from_bytes_with_proto_converter( - bytes.as_ref(), - ctx.task_ctx().as_ref(), - &codec, - &deser_converter, - )?; - - // Verify the plan structure is correct - pretty_assertions::assert_eq!(format!("{exec_plan:?}"), format!("{result_plan:?}")); - - Ok(()) +/// Create a [`DynamicFilterPhysicalExpr`] with child column expression "a" @ index 0. +fn make_dynamic_filter() -> Arc { + Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::new(Column::new("a", 0)) as Arc], + lit(true), + )) as Arc } -/// Test that expression deduplication correctly shares Arcs for identical expressions. -/// This test verifies the core deduplication behavior. -#[test] -fn test_expression_deduplication_arc_sharing() -> Result<()> { - use datafusion_proto::bytes::{ - physical_plan_from_bytes_with_proto_converter, - physical_plan_to_bytes_with_proto_converter, - }; - - let field_a = Field::new("a", DataType::Int64, false); - let schema = Arc::new(Schema::new(vec![field_a])); - - // Create a column expression - let col_expr: Arc = Arc::new(Column::new("a", 0)); - - // Create a projection that uses the SAME Arc twice - // After roundtrip, both should point to the same Arc - let projection_exprs = vec![ - ProjectionExpr { - expr: Arc::clone(&col_expr), - alias: "a1".to_string(), - }, - ProjectionExpr { - expr: Arc::clone(&col_expr), // Same Arc! - alias: "a2".to_string(), - }, - ]; - - let input = Arc::new(EmptyExec::new(schema)); - let exec_plan = Arc::new(ProjectionExec::try_new(projection_exprs, input)?); - - let ctx = SessionContext::new(); - let codec = DefaultPhysicalExtensionCodec {}; - let proto_converter = DeduplicatingProtoConverter {}; - - // Serialize - let bytes = physical_plan_to_bytes_with_proto_converter( - Arc::clone(&exec_plan) as Arc, - &codec, - &proto_converter, - )?; - - // Deserialize with a fresh converter - let deser_converter = DeduplicatingProtoConverter {}; - let result_plan = physical_plan_from_bytes_with_proto_converter( - bytes.as_ref(), - ctx.task_ctx().as_ref(), - &codec, - &deser_converter, - )?; - - // Get the projection from the result - let projection = result_plan - .downcast_ref::() - .expect("Expected ProjectionExec"); - - let exprs: Vec<_> = projection.expr().iter().collect(); - assert_eq!(exprs.len(), 2); - - // The key test: both expressions should point to the same Arc after deduplication - // This is because they were the same Arc before serialization - assert!( - Arc::ptr_eq(&exprs[0].expr, &exprs[1].expr), - "Expected both expressions to share the same Arc after deduplication" - ); - - Ok(()) +/// Update a [`DynamicFilterPhysicalExpr`]'s children to support child schema "b" @ 0, "a" @ 1. +fn make_reassigned_dynamic_filter( + filter: Arc, +) -> Result<(Arc, Arc)> { + let schema = Arc::new(Schema::new(vec![ + Field::new("b", DataType::Int64, false), + Field::new("a", DataType::Int64, false), + ])); + let reassigned = reassign_expr_columns(filter, &schema)?; + Ok((schema, reassigned)) } -/// Test backward compatibility: protos without expr_id should still deserialize correctly. -#[test] -fn test_backward_compatibility_no_expr_id() -> Result<()> { - let field_a = Field::new("a", DataType::Int64, false); - let schema = Arc::new(Schema::new(vec![field_a])); +/// Extract the expression id from a [`PhysicalExpr`] proto. Populated by the +/// default serializer from `PhysicalExpr::expression_id`. +fn proto_expression_id(expr: &PhysicalExprNode) -> u64 { + expr.expr_id + .expect("expected PhysicalExprNode.expr_id to be populated") +} - // Manually create a proto without expr_id set - let proto = PhysicalExprNode { - expr_id: None, // Simulating old proto without this field - expr_type: Some( - datafusion_proto::protobuf::physical_expr_node::ExprType::Column( - datafusion_proto::protobuf::PhysicalColumn { - name: "a".to_string(), - index: 0, - }, - ), - ), - }; +/// Roundtrip a single physical expression shaped like so: +/// +/// ```text +/// BinaryExpr(AND) +/// / \ +/// filter_expr_1 filter_expr_2 +/// ``` +/// +/// Returns filter_expr_1 and filter_expr_2 after deserialization. +fn roundtrip_dynamic_filter_expr_pair( + filter_expr_1: Arc, + filter_expr_2: Arc, + schema: Arc, +) -> Result<(Arc, Arc)> { + let pair_expr = Arc::new(BinaryExpr::new( + Arc::clone(&filter_expr_1), + Operator::And, + Arc::clone(&filter_expr_2), + )) as Arc; - let ctx = SessionContext::new(); let codec = DefaultPhysicalExtensionCodec {}; - let proto_converter = DefaultPhysicalProtoConverter {}; - - // Should deserialize without error - let result = proto_converter.proto_to_physical_expr( + let converter = DeduplicatingProtoConverter {}; + let proto = converter.physical_expr_to_proto(&pair_expr, &codec)?; + let ctx = SessionContext::new(); + let deserialized_expr = converter.proto_to_physical_expr( &proto, ctx.task_ctx().as_ref(), &schema, &codec, )?; - // Verify the result is correct - let col = result.downcast_ref::().expect("Expected Column"); - assert_eq!(col.name(), "a"); - assert_eq!(col.index(), 0); + let binary = deserialized_expr + .downcast_ref::() + .expect("Expected BinaryExpr"); - Ok(()) + Ok((Arc::clone(binary.left()), Arc::clone(binary.right()))) } -/// Test that deduplication works within a single plan deserialization and that -/// separate deserializations produce independent expressions (no cross-operation sharing). -#[test] -fn test_deduplication_within_plan_deserialization() -> Result<()> { - use datafusion_proto::bytes::{ - physical_plan_from_bytes_with_proto_converter, - physical_plan_to_bytes_with_proto_converter, - }; +/// Roundtrip an execution plan shaped like so: +/// +/// ```text +/// FilterExec(dynamic_filter_1 on a@0) +/// ProjectionExec(a := Column("a", source_index)) +/// DataSourceExec +/// ParquetSource(predicate = dynamic_filter_2) +/// ``` +/// +/// `dynamic_filter_1` and `dynamic_filter_2` are the same dynamic filter, except with +/// different children. +/// +/// Returns +/// - `dynamic_filter_1` before serialization +/// - `dynamic_filter_2` before serialization +/// - `dynamic_filter_1` after serialization +/// - `dynamic_filter_2` after serialization +#[allow(clippy::type_complexity)] +fn roundtrip_dynamic_filter_plan_pair() -> Result<( + Arc, + Arc, + Arc, + Arc, +)> { + let filter_expr_1 = make_dynamic_filter(); + let (data_source_schema, filter_expr_2) = + make_reassigned_dynamic_filter(Arc::clone(&filter_expr_1))?; + let left_before = Arc::clone(&filter_expr_1); + let right_before = Arc::clone(&filter_expr_2); + let file_source = Arc::new( + ParquetSource::new(Arc::clone(&data_source_schema)) + .with_predicate(Arc::clone(&filter_expr_2)), + ); + let scan_config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new( + "/path/to/file.parquet".to_string(), + 1024, + )])]) + .build(); + let data_source_exec = + DataSourceExec::from_data_source(scan_config) as Arc; - let field_a = Field::new("a", DataType::Int64, false); - let schema = Arc::new(Schema::new(vec![field_a])); + let projection_exec = Arc::new(ProjectionExec::try_new( + vec![ProjectionExpr { + expr: Arc::new(Column::new("a", 1)) as Arc, + alias: "a".to_string(), + }], + data_source_exec, + )?) as Arc; + let filter_exec = Arc::new(FilterExec::try_new( + Arc::clone(&filter_expr_1), + projection_exec, + )?) as Arc; - // Create a plan with expressions that will be deduplicated - let col_expr: Arc = Arc::new(Column::new("a", 0)); - let projection_exprs = vec![ - ProjectionExpr { - expr: Arc::clone(&col_expr), - alias: "a1".to_string(), - }, - ProjectionExpr { - expr: Arc::clone(&col_expr), // Same Arc - will be deduplicated - alias: "a2".to_string(), - }, - ]; - let exec_plan = Arc::new(ProjectionExec::try_new( - projection_exprs, - Arc::new(EmptyExec::new(schema)), - )?); + let codec = DefaultPhysicalExtensionCodec {}; + let converter = DeduplicatingProtoConverter {}; + let proto = converter.execution_plan_to_proto(&filter_exec, &codec)?; let ctx = SessionContext::new(); - let codec = DefaultPhysicalExtensionCodec {}; - let proto_converter = DeduplicatingProtoConverter {}; + let deserialized_plan = + converter.proto_to_execution_plan(ctx.task_ctx().as_ref(), &codec, &proto)?; + + let outer_filter = deserialized_plan + .downcast_ref::() + .expect("Expected outer FilterExec"); + let left_filter = Arc::clone(outer_filter.predicate()); + let projection = outer_filter.children()[0] + .downcast_ref::() + .expect("Expected ProjectionExec"); + let data_source = projection + .input() + .downcast_ref::() + .expect("Expected DataSourceExec"); + let scan_config = data_source + .data_source() + .downcast_ref::() + .expect("Expected FileScanConfig"); + let right_filter = scan_config + .file_source() + .filter() + .expect("Expected pushed-down predicate"); - // Serialize - let bytes = physical_plan_to_bytes_with_proto_converter( - Arc::clone(&exec_plan) as Arc, - &codec, - &proto_converter, - )?; + Ok((left_before, right_before, left_filter, right_filter)) +} - // First deserialization - let plan1 = physical_plan_from_bytes_with_proto_converter( - bytes.as_ref(), - ctx.task_ctx().as_ref(), - &codec, - &proto_converter, - )?; +/// Takes two [`DynamicFilterPhysicalExpr`] and asserts that updates to one are visible +/// via the other. This helps assert that referential integrity is maintained after +/// deserializing. +fn assert_dynamic_filter_update_is_visible( + left_filter: &Arc, + right_filter: &Arc, +) -> Result<()> { + let left_filter = left_filter + .downcast_ref::() + .expect("Expected dynamic filter"); + let right_filter = right_filter + .downcast_ref::() + .expect("Expected dynamic filter"); - // Check that the plan was deserialized correctly with deduplication - let projection1 = plan1 - .downcast_ref::() - .expect("Expected ProjectionExec"); - let exprs1: Vec<_> = projection1.expr().iter().collect(); - assert_eq!(exprs1.len(), 2); - assert!( - Arc::ptr_eq(&exprs1[0].expr, &exprs1[1].expr), - "Expected both expressions to share the same Arc after deduplication" - ); + // Sanity check that the filters have the same generation. + let original_generation = left_filter.snapshot_generation(); + assert_eq!(original_generation, right_filter.snapshot_generation(),); - // Second deserialization - let plan2 = physical_plan_from_bytes_with_proto_converter( - bytes.as_ref(), - ctx.task_ctx().as_ref(), - &codec, - &proto_converter, - )?; + left_filter.update(lit(123_i64))?; - // Check that the second plan was also deserialized correctly - let projection2 = plan2 - .downcast_ref::() - .expect("Expected ProjectionExec"); - let exprs2: Vec<_> = projection2.expr().iter().collect(); - assert_eq!(exprs2.len(), 2); - assert!( - Arc::ptr_eq(&exprs2[0].expr, &exprs2[1].expr), - "Expected both expressions to share the same Arc after deduplication" + // Assert that both generations updated. + assert_eq!(original_generation + 1, right_filter.snapshot_generation(),); + assert_eq!( + left_filter.snapshot_generation(), + right_filter.snapshot_generation(), ); - // Check that there was no deduplication across deserializations - assert!( - !Arc::ptr_eq(&exprs1[0].expr, &exprs2[0].expr), - "Expected expressions from different deserializations to be different Arcs" - ); - assert!( - !Arc::ptr_eq(&exprs1[1].expr, &exprs2[1].expr), - "Expected expressions from different deserializations to be different Arcs" - ); + // Ensure both filters have the updated expr. + let expected_current = r#"Literal { value: Int64(123), field: Field { name: "lit", data_type: Int64 } }"#; + assert_eq!(expected_current, format!("{:?}", left_filter.current()?),); + assert_eq!(expected_current, format!("{:?}", right_filter.current()?),); Ok(()) } -/// Test that deduplication works within direct expression deserialization and that -/// separate deserializations produce independent expressions (no cross-operation sharing). -#[test] -fn test_deduplication_within_expr_deserialization() -> Result<()> { - let field_a = Field::new("a", DataType::Int64, false); - let schema = Arc::new(Schema::new(vec![field_a])); - - // Create a binary expression where both sides are the same Arc - // This allows us to test deduplication within a single deserialization - let col_expr: Arc = Arc::new(Column::new("a", 0)); - let binary_expr: Arc = Arc::new(BinaryExpr::new( - Arc::clone(&col_expr), - Operator::Plus, - Arc::clone(&col_expr), // Same Arc - will be deduplicated - )); - - let ctx = SessionContext::new(); - let codec = DefaultPhysicalExtensionCodec {}; - let proto_converter = DeduplicatingProtoConverter {}; +/// Assert that two dynamic filters are equal both structurally (Debug output) +/// and by identity (`expression_id`). +/// +fn assert_dynamic_filters_equal( + expected: &Arc, + actual: &Arc, +) { + // TODO: Debug currently omits `expression_id` so the id has to be checked + // separately here. Once plan nodes like `SortExec` / `AggregateExec` / + // `HashJoinExec` serialize their own dynamic filter, Debug can include + // `expression_id`. + // + // See https://github.com/apache/datafusion/issues/20418 + assert_eq!(expected.expression_id(), actual.expression_id()); + + // Structural. + let expected_dbg = format!("{expected:?}"); + let actual_dbg = format!("{actual:?}"); + if expected_dbg == actual_dbg { + return; + } - // Serialize the expression - let proto = proto_converter.physical_expr_to_proto(&binary_expr, &codec)?; + // Note that the `DeduplicatingDeserializer` routes every cache hit through + // `with_new_children`. This produces an equivalent expression, but with + // remapped children that are equal to the original. Handle that case here. + let rewritten = Arc::clone(expected) + .with_new_children(expected.children().iter().map(|c| Arc::clone(c)).collect()) + .expect("with_new_children on a dynamic filter should not fail"); + assert_eq!(format!("{rewritten:?}"), actual_dbg); +} - // First expression deserialization - let expr1 = proto_converter.proto_to_physical_expr( - &proto, - ctx.task_ctx().as_ref(), - &schema, - &codec, - )?; +// Two clones of a dynamic filter expression should be deduped to the exact same expression. +#[test] +fn test_dynamic_filter_roundtrip_dedupe() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let filter_expr_1 = make_dynamic_filter(); + let filter_expr_2 = Arc::clone(&filter_expr_1); + + let (filter_expr_1_after_roundtrip, filter_expr_2_after_roundtrip) = + roundtrip_dynamic_filter_expr_pair( + Arc::clone(&filter_expr_1), + Arc::clone(&filter_expr_2), + schema, + )?; - // Check that deduplication worked within the deserialization - let binary1 = expr1 - .downcast_ref::() - .expect("Expected BinaryExpr"); - assert!( - Arc::ptr_eq(binary1.left(), binary1.right()), - "Expected both sides to share the same Arc after deduplication" + // Assert the filters are not modified during roundtrip. + assert_dynamic_filters_equal(&filter_expr_1, &filter_expr_1_after_roundtrip); + assert_dynamic_filters_equal(&filter_expr_2, &filter_expr_2_after_roundtrip); + assert_dynamic_filters_equal( + &filter_expr_1_after_roundtrip, + &filter_expr_2_after_roundtrip, ); - // Second expression deserialization - let expr2 = proto_converter.proto_to_physical_expr( - &proto, - ctx.task_ctx().as_ref(), - &schema, - &codec, + // Assert referential integrity. + assert_dynamic_filter_update_is_visible( + &filter_expr_1_after_roundtrip, + &filter_expr_2_after_roundtrip, )?; - // Check that the second expression was also deserialized correctly - let binary2 = expr2 - .downcast_ref::() - .expect("Expected BinaryExpr"); - assert!( - Arc::ptr_eq(binary2.left(), binary2.right()), - "Expected both sides to share the same Arc after deduplication" - ); - - // Check that there was no deduplication across deserializations - assert!( - !Arc::ptr_eq(binary1.left(), binary2.left()), - "Expected expressions from different deserializations to be different Arcs" - ); - assert!( - !Arc::ptr_eq(binary1.right(), binary2.right()), - "Expected expressions from different deserializations to be different Arcs" - ); - Ok(()) } -/// Test that session_id rotates between top-level serialization operations. -/// This verifies that each top-level serialization gets a fresh session_id, -/// which prevents cross-process collisions when serialized plans are merged. +/// Roundtrip test for an execution plan where there are multiple instances of a dynamic filter +/// with different children. #[test] -fn test_session_id_rotation_between_serializations() -> Result<()> { - let field_a = Field::new("a", DataType::Int64, false); - let _schema = Arc::new(Schema::new(vec![field_a])); - - // Create a simple expression - let col_expr: Arc = Arc::new(Column::new("a", 0)); - - let codec = DefaultPhysicalExtensionCodec {}; - let proto_converter = DeduplicatingProtoConverter {}; - - // First serialization - let proto1 = proto_converter.physical_expr_to_proto(&col_expr, &codec)?; - let expr_id1 = proto1.expr_id.expect("Expected expr_id to be set"); - - // Second serialization with the same converter - // The session_id should have rotated, so the expr_id should be different - // even though we're serializing the same expression (same pointer address) - let proto2 = proto_converter.physical_expr_to_proto(&col_expr, &codec)?; - let expr_id2 = proto2.expr_id.expect("Expected expr_id to be set"); - - // The expr_ids should be different because session_id rotated - assert_ne!( - expr_id1, expr_id2, - "Expected different expr_ids due to session_id rotation between serializations" - ); - - // Also test that serializing the same expression multiple times within - // the same top-level operation would give the same expr_id (not testable - // here directly since each physical_expr_to_proto is a top-level operation, - // but the deduplication tests verify this indirectly) +fn test_dynamic_filter_plan_roundtrip_dedupe() -> Result<()> { + let ( + filter_expr_1, + filter_expr_2, + filter_expr_1_after_roundtrip, + filter_expr_2_after_roundtrip, + ) = roundtrip_dynamic_filter_plan_pair()?; + + // Assert the filters are not modified during roundtrip. + assert_dynamic_filters_equal(&filter_expr_1, &filter_expr_1_after_roundtrip); + assert_dynamic_filters_equal(&filter_expr_2, &filter_expr_2_after_roundtrip); + + // Assert referential integrity. + assert_dynamic_filter_update_is_visible( + &filter_expr_1_after_roundtrip, + &filter_expr_2_after_roundtrip, + )?; Ok(()) } -/// Test that session_id rotation works correctly with execution plans. -/// This verifies the end-to-end behavior with plan serialization. #[test] -fn test_session_id_rotation_with_execution_plans() -> Result<()> { - use datafusion_proto::bytes::physical_plan_to_bytes_with_proto_converter; - - let field_a = Field::new("a", DataType::Int64, false); - let schema = Arc::new(Schema::new(vec![field_a])); - - // Create a simple plan - let col_expr: Arc = Arc::new(Column::new("a", 0)); - let projection_exprs = vec![ProjectionExpr { - expr: Arc::clone(&col_expr), - alias: "a1".to_string(), - }]; - let exec_plan = Arc::new(ProjectionExec::try_new( - projection_exprs.clone(), - Arc::new(EmptyExec::new(Arc::clone(&schema))), - )?); - +fn test_dynamic_filter_expression_id_is_stable_between_serializations() -> Result<()> { + let filter_expr = make_dynamic_filter(); let codec = DefaultPhysicalExtensionCodec {}; let proto_converter = DeduplicatingProtoConverter {}; - // First serialization - let bytes1 = physical_plan_to_bytes_with_proto_converter( - Arc::clone(&exec_plan) as Arc, - &codec, - &proto_converter, - )?; + let proto1 = proto_converter.physical_expr_to_proto(&filter_expr, &codec)?; + let expr_id1 = proto_expression_id(&proto1); - // Second serialization with the same converter - let bytes2 = physical_plan_to_bytes_with_proto_converter( - Arc::clone(&exec_plan) as Arc, - &codec, - &proto_converter, - )?; + let proto2 = proto_converter.physical_expr_to_proto(&filter_expr, &codec)?; + let expr_id2 = proto_expression_id(&proto2); - // The serialized bytes should be different due to different session_ids - // (specifically, the expr_id values embedded in the protobuf will differ) - assert_ne!( - bytes1.as_ref(), - bytes2.as_ref(), - "Expected different serialized bytes due to session_id rotation" + assert_eq!( + expr_id1, expr_id2, + "Expected the same dynamic filter expression id across serializations" ); - // But both should deserialize correctly - let ctx = SessionContext::new(); - let deser_converter = DeduplicatingProtoConverter {}; - - let plan1 = datafusion_proto::bytes::physical_plan_from_bytes_with_proto_converter( - bytes1.as_ref(), - ctx.task_ctx().as_ref(), - &codec, - &deser_converter, - )?; - - let plan2 = datafusion_proto::bytes::physical_plan_from_bytes_with_proto_converter( - bytes2.as_ref(), - ctx.task_ctx().as_ref(), - &codec, - &deser_converter, - )?; - - // Verify both plans have the expected structure - assert_eq!(plan1.schema(), plan2.schema()); - Ok(()) }