From 4c86e131d5c894ce268af622f144978a4c6926fa Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Tue, 17 Feb 2026 21:48:32 +0000 Subject: [PATCH 1/6] proto: serialize and dedupe dynamic filters Informs: https://github.com/datafusion-contrib/datafusion-distributed/issues/180 Closes: https://github.com/apache/datafusion/issues/20418 Consider you have a plan with a `HashJoinExec` and `DataSourceExec` ``` HashJoinExec(dynamic_filter_1 on a@0) (...left side of join) ProjectionExec(a := Column("a", source_index)) DataSourceExec ParquetSource(predicate = dynamic_filter_2) ``` You serialize the plan, deserialize it, and execute it. What should happen is that the dynamic filter should "work", meaning: 1. When you deserialize the plan, both the `HashJoinExec` and `DataSourceExec` should have pointers to the same `DynamicFilterPhysicalExpr` 2. The `DynamicFilterPhysicalExpr` should be updated during execution by the `HashJoinExec` and the `DataSourceExec` should filter out rows This does not happen today for a few reasons, a couple of which this PR aims to address 1. `DynamicFilterPhysicalExpr` is not survive round-tripping. The internal exprs get inlined (ex. it may be serialized as `Literal`) due to the `PhysicalExpr::snapshot()` API 2. Even if `DynamicFilterPhysicalExpr` survives round-tripping, the one pushed down to the `DataSourceExec` often has different children. In this case, you have two `DynamicFilterPhysicalExpr` which do not survive deduping, causing referential integrity to be lost. This PR aims to fix those problems by: 1. Removing the `snapshot()` call from the serialization process 2. Adding protos for `DynamicFilterPhysicalExpr` so it can be serialized and deserialized 3. Removing `Arc`-based deduplication. We now only dedupe on `expression_id` if the `PhysicalExpr` reports a `expression_id`. After this change, only `DynamicFilterPhysicalExpr` reports an `expression_id` to be deduped. 4. `expression_id` is now just a random u64. Since a given query likely only has a few `DynamicFilterPhysicalExpr` instances, the odds of a collision are very low 5. There's no need for a `DedupingSerializer` anymore since the `expression_id` is already stored in the dynamic filter proto itself Testing - adds tests which roundtrip dynamic filters and assert that referential integrity is maintained - removes tests that test `Arc`-based deduplication and session id rotation --- Cargo.lock | 1 - .../physical-expr-common/src/physical_expr.rs | 20 + datafusion/physical-expr/Cargo.toml | 2 +- .../src/expressions/dynamic_filters.rs | 253 ++++++- .../physical-expr/src/expressions/mod.rs | 2 +- datafusion/proto/Cargo.toml | 1 - datafusion/proto/proto/datafusion.proto | 22 +- datafusion/proto/src/generated/pbjson.rs | 220 +++++- datafusion/proto/src/generated/prost.rs | 27 +- .../proto/src/physical_plan/from_proto.rs | 48 ++ datafusion/proto/src/physical_plan/mod.rs | 143 +--- .../proto/src/physical_plan/to_proto.rs | 64 +- .../tests/cases/roundtrip_physical_plan.rs | 632 +++++++----------- 13 files changed, 875 insertions(+), 560 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5a76c063bbfad..24b50f7f7dd62 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2568,7 +2568,6 @@ dependencies = [ "pbjson 0.9.0", "pretty_assertions", "prost", - "rand 0.9.4", "serde", "serde_json", "tokio", diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index a42a1560cb769..311941aedca3d 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -163,6 +163,9 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { fn children(&self) -> Vec<&Arc>; /// Returns a new PhysicalExpr where all children were replaced by new exprs. + /// + /// If the implementation returns a [`PhysicalExpr::expression_id`], then + /// the identifier should be preserved by the new expression. fn with_new_children( self: Arc, children: Vec>, @@ -438,6 +441,23 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { fn placement(&self) -> ExpressionPlacement { ExpressionPlacement::KeepInPlace } + + /// Return a stable, globally-unique identifier for this [`PhysicalExpr`], if it + /// has one. + /// + /// This identifier tracks which expressions which are connected (e.g. `DynamicFilterPhysicalExpr` + /// where two expressions may be different but store the same mutable inner state). Tracking + /// connected expressions helps preserve referential integrity within plan nodes + /// during serialization and deserialization. + /// + /// This id must be preserved across [`PhysicalExpr::with_new_children`] or any other + /// methods which may want to preserve identity. + /// + /// Default is `None`: the expression has no identity worth preserving across a + /// serialization boundary. + fn expression_id(&self) -> Option { + None + } } #[deprecated( diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index b755353d75658..41be7b18b9119 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -55,6 +55,7 @@ indexmap = { workspace = true } itertools = { workspace = true, features = ["use_std"] } parking_lot = { workspace = true } petgraph = "0.8.3" +rand = { workspace = true } recursive = { workspace = true, optional = true } tokio = { workspace = true } half = { workspace = true } @@ -64,7 +65,6 @@ arrow = { workspace = true, features = ["test_utils"] } criterion = { workspace = true } datafusion-functions = { workspace = true } insta = { workspace = true } -rand = { workspace = true } rstest = { workspace = true } [[bench]] diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 47398d87e26a5..c60c4029814aa 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -27,6 +27,7 @@ use datafusion_common::{ }; use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::physical_expr::DynHash; +use rand::random; /// State of a dynamic filter, tracking both updates and completion. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -55,7 +56,6 @@ impl FilterState { /// For more background, please also see the [Dynamic Filters: Passing Information Between Operators During Execution for 25x Faster Queries blog] /// /// [Dynamic Filters: Passing Information Between Operators During Execution for 25x Faster Queries blog]: https://datafusion.apache.org/blog/2025/09/10/dynamic-filters -#[derive(Debug)] pub struct DynamicFilterPhysicalExpr { /// The original children of this PhysicalExpr, if any. /// This is necessary because the dynamic filter may be initialized with a placeholder (e.g. `lit(true)`) @@ -65,6 +65,10 @@ pub struct DynamicFilterPhysicalExpr { /// If any of the children were remapped / modified (e.g. to adjust for projections) we need to keep track of the new children /// so that when we update `current()` in subsequent iterations we can re-apply the replacements. remapped_children: Option>>, + /// Unique identifier for this dynamic filter. + /// + /// Derived filters (ex. via `with_new_children`) should inherit the expression id of the source filter. + expression_id: u64, /// The source of dynamic filters. inner: Arc>, /// Broadcasts filter state (updates and completion) to all waiters. @@ -88,6 +92,148 @@ struct Inner { is_complete: bool, } +/// An atomic snapshot of a [`DynamicFilterPhysicalExpr`] used to reconstruct the expression during +/// serialization / deserialization. +pub struct DynamicFilterSnapshot { + children: Vec>, + remapped_children: Option>>, + expression_id: u64, + // Inner state. + generation: u64, + inner_expr: Arc, + is_complete: bool, +} + +impl DynamicFilterSnapshot { + pub fn new( + expression_id: u64, + children: Vec>, + remapped_children: Option>>, + generation: u64, + inner_expr: Arc, + is_complete: bool, + ) -> Self { + Self { + children, + remapped_children, + expression_id, + generation, + inner_expr, + is_complete, + } + } + + pub fn children(&self) -> &[Arc] { + &self.children + } + + pub fn expression_id(&self) -> u64 { + self.expression_id + } + + pub fn remapped_children(&self) -> Option<&[Arc]> { + self.remapped_children.as_deref() + } + + pub fn generation(&self) -> u64 { + self.generation + } + + pub fn inner_expr(&self) -> &Arc { + &self.inner_expr + } + + pub fn is_complete(&self) -> bool { + self.is_complete + } +} + +// TODO: Include expression_id in debug output. +// +// See https://github.com/apache/datafusion/issues/20418. Currently, plan nodes +// like `HashJoinExec`, `AggregateExec`, `SortExec` do not serialize their +// dynamic filter. This causes round trips to fail on the `expression_id` +// because it is regenerated on deserialization. +impl std::fmt::Debug for DynamicFilterPhysicalExpr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DynamicFilterPhysicalExpr") + .field("children", &self.children) + .field("remapped_children", &self.remapped_children) + .field("inner", &self.inner) + .field("state_watch", &self.state_watch) + .field("data_type", &self.data_type) + .field("nullable", &self.nullable) + .finish() + } +} + +impl Display for DynamicFilterSnapshot { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "DynamicFilterSnapshot {{ children: {:?}, remapped_children: {:?}, expression_id: {}, generation: {}, inner_expr: {:?}, is_complete: {} }}", + self.children, + self.remapped_children, + self.expression_id, + self.generation, + self.inner_expr, + self.is_complete + ) + } +} + +impl From for DynamicFilterPhysicalExpr { + fn from(snapshot: DynamicFilterSnapshot) -> Self { + let DynamicFilterSnapshot { + children, + remapped_children, + expression_id, + generation, + inner_expr, + is_complete, + } = snapshot; + + let state = if is_complete { + FilterState::Complete { generation } + } else { + FilterState::InProgress { generation } + }; + let (state_watch, _) = watch::channel(state); + + Self { + children, + remapped_children, + expression_id, + inner: Arc::new(RwLock::new(Inner { + generation, + expr: inner_expr, + is_complete, + })), + state_watch, + data_type: Arc::new(RwLock::new(None)), + nullable: Arc::new(RwLock::new(None)), + } + } +} + +impl From<&DynamicFilterPhysicalExpr> for DynamicFilterSnapshot { + fn from(expr: &DynamicFilterPhysicalExpr) -> Self { + // Snapshot everything in the mutex atomically. + let (generation, inner_expr, is_complete) = { + let inner = expr.inner.read(); + (inner.generation, Arc::clone(&inner.expr), inner.is_complete) + }; + DynamicFilterSnapshot { + children: expr.children.clone(), + remapped_children: expr.remapped_children.clone(), + expression_id: expr.expression_id, + generation, + inner_expr, + is_complete, + } + } +} + impl Inner { fn new(expr: Arc) -> Self { Self { @@ -173,6 +319,7 @@ impl DynamicFilterPhysicalExpr { Self { children, remapped_children: None, // Initially no remapped children + expression_id: Self::new_expression_id(), inner: Arc::new(RwLock::new(Inner::new(inner))), state_watch, data_type: Arc::new(RwLock::new(None)), @@ -346,6 +493,11 @@ impl DynamicFilterPhysicalExpr { write!(f, " ]") } + + /// Generate a new expression id for this filter. + pub fn new_expression_id() -> u64 { + random::() + } } impl PhysicalExpr for DynamicFilterPhysicalExpr { @@ -364,6 +516,9 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { Ok(Arc::new(Self { children: self.children.clone(), remapped_children: Some(children), + // Note that we ensure the derived expression linked to `self` + // via the unique identifier. + expression_id: self.expression_id, inner: Arc::clone(&self.inner), state_watch: self.state_watch.clone(), data_type: Arc::clone(&self.data_type), @@ -444,6 +599,10 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { // Return the current generation of the expression. self.inner.read().generation } + + fn expression_id(&self) -> Option { + Some(self.expression_id) + } } #[cfg(test)] @@ -861,4 +1020,96 @@ mod test { "Hash should be stable after update (identity-based)" ); } + + #[test] + fn test_current_snapshot_roundtrip() { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let col_a = col("a", &schema).unwrap(); + + // Create a dynamic filter with children + let expr = Arc::new(BinaryExpr::new( + Arc::clone(&col_a), + datafusion_expr::Operator::Gt, + lit(10) as Arc, + )); + let filter = DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col_a)], + expr as Arc, + ); + + // Update expression and mark complete + filter + .update(lit(42) as Arc) + .expect("Update should succeed"); + filter.mark_complete(); + + // Change the children of the expr. + let reassigned_schema = Arc::new(Schema::new(vec![ + Field::new("b", DataType::Int32, false), + Field::new("a", DataType::Int32, false), + ])); + let reassigned = reassign_expr_columns( + Arc::new(filter) as Arc, + &reassigned_schema, + ) + .expect("reassign_expr_columns should succeed"); + let reassigned = reassigned + .downcast_ref::() + .expect("Expected dynamic filter after reassignment"); + + // Take a snapshot and reconstruct + let snapshot = DynamicFilterSnapshot::from(reassigned); + let reconstructed = DynamicFilterPhysicalExpr::from(snapshot); + + // Assert snapshots are equal. + assert_eq!( + DynamicFilterSnapshot::from(reassigned).to_string(), + DynamicFilterSnapshot::from(&reconstructed).to_string(), + ); + } + + #[tokio::test] + async fn test_expression_id() { + let source_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let col_a = col("a", &source_schema).unwrap(); + + // Create a source filter + let source = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col_a)], + lit(true) as Arc, + )); + let source_clone = Arc::clone(&source); + + // Create a derived filter by reassigning the source filter to a different schema. + let derived_schema = Arc::new(Schema::new(vec![ + Field::new("x", DataType::Int32, false), + Field::new("a", DataType::Int32, false), + ])); + let derived = reassign_expr_columns( + Arc::clone(&source) as Arc, + &derived_schema, + ) + .expect("reassign_expr_columns should succeed"); + + let derived_expression_id = derived + .expression_id() + .expect("derived filter should have an expression id"); + let source_expression_id = source + .expression_id() + .expect("source filter should have an expression id"); + let source_clone_expression_id = source_clone + .expression_id() + .expect("source clone should have an expression id"); + + assert_eq!( + source_clone_expression_id, source_expression_id, + "cloned filter should preserve its expression id", + ); + + assert_eq!( + derived_expression_id, source_expression_id, + "derived filters should carry forward the source expression id", + ); + } } diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index 1e082355486f6..b40848828fa41 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -43,7 +43,7 @@ pub use case::{CaseExpr, case}; pub use cast::{CastExpr, cast}; pub use column::{Column, col, with_new_schema}; pub use datafusion_expr::utils::format_state_name; -pub use dynamic_filters::DynamicFilterPhysicalExpr; +pub use dynamic_filters::{DynamicFilterPhysicalExpr, DynamicFilterSnapshot}; pub use in_list::{InListExpr, in_list}; pub use is_not_null::{IsNotNullExpr, is_not_null}; pub use is_null::{IsNullExpr, is_null}; diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml index baa7dd8770e71..4484846813296 100644 --- a/datafusion/proto/Cargo.toml +++ b/datafusion/proto/Cargo.toml @@ -66,7 +66,6 @@ datafusion-proto-common = { workspace = true } object_store = { workspace = true } pbjson = { workspace = true, optional = true } prost = { workspace = true } -rand = { workspace = true } serde = { version = "1.0", optional = true } serde_json = { workspace = true, optional = true } diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index c61226fb526f6..f0cba40a5ae31 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -875,13 +875,8 @@ message PhysicalExprNode { // Was date_time_interval_expr reserved 17; - // Unique identifier for this expression to do deduplication during deserialization. - // When serializing, this is set to a unique identifier for each combination of - // expression, process and serialization run. - // When deserializing, if this ID has been seen before, the cached Arc is returned - // instead of creating a new one, enabling reconstruction of referential integrity - // across serde roundtrips. - optional uint64 expr_id = 30; + // Was expr_id + reserved 30; oneof ExprType { // column references @@ -920,9 +915,20 @@ message PhysicalExprNode { UnknownColumn unknown_column = 20; PhysicalHashExprNode hash_expr = 21; + + PhysicalDynamicFilterNode dynamic_filter = 22; } } +message PhysicalDynamicFilterNode { + repeated PhysicalExprNode children = 1; + repeated PhysicalExprNode remapped_children = 2; + uint64 generation = 3; + PhysicalExprNode inner_expr = 4; + bool is_complete = 5; + uint64 expression_id = 6; +} + message PhysicalScalarUdfNode { string name = 1; repeated PhysicalExprNode args = 2; @@ -1477,4 +1483,4 @@ message AsyncFuncExecNode { message BufferExecNode { PhysicalPlanNode input = 1; uint64 capacity = 2; -} \ No newline at end of file +} diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 82bcdac898204..5db47a2ecff4f 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -16531,7 +16531,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalDateTimeIntervalExprNode { deserializer.deserialize_struct("datafusion.PhysicalDateTimeIntervalExprNode", FIELDS, GeneratedVisitor) } } -impl serde::Serialize for PhysicalExprNode { +impl serde::Serialize for PhysicalDynamicFilterNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result where @@ -16539,18 +16539,198 @@ impl serde::Serialize for PhysicalExprNode { { use serde::ser::SerializeStruct; let mut len = 0; - if self.expr_id.is_some() { + if !self.children.is_empty() { len += 1; } - if self.expr_type.is_some() { + if !self.remapped_children.is_empty() { len += 1; } - let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalExprNode", len)?; - if let Some(v) = self.expr_id.as_ref() { + if self.generation != 0 { + len += 1; + } + if self.inner_expr.is_some() { + len += 1; + } + if self.is_complete { + len += 1; + } + if self.expression_id != 0 { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalDynamicFilterNode", len)?; + if !self.children.is_empty() { + struct_ser.serialize_field("children", &self.children)?; + } + if !self.remapped_children.is_empty() { + struct_ser.serialize_field("remappedChildren", &self.remapped_children)?; + } + if self.generation != 0 { #[allow(clippy::needless_borrow)] #[allow(clippy::needless_borrows_for_generic_args)] - struct_ser.serialize_field("exprId", ToString::to_string(&v).as_str())?; + struct_ser.serialize_field("generation", ToString::to_string(&self.generation).as_str())?; + } + if let Some(v) = self.inner_expr.as_ref() { + struct_ser.serialize_field("innerExpr", v)?; + } + if self.is_complete { + struct_ser.serialize_field("isComplete", &self.is_complete)?; + } + if self.expression_id != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("expressionId", ToString::to_string(&self.expression_id).as_str())?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for PhysicalDynamicFilterNode { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "children", + "remapped_children", + "remappedChildren", + "generation", + "inner_expr", + "innerExpr", + "is_complete", + "isComplete", + "expression_id", + "expressionId", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Children, + RemappedChildren, + Generation, + InnerExpr, + IsComplete, + ExpressionId, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl serde::de::Visitor<'_> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "children" => Ok(GeneratedField::Children), + "remappedChildren" | "remapped_children" => Ok(GeneratedField::RemappedChildren), + "generation" => Ok(GeneratedField::Generation), + "innerExpr" | "inner_expr" => Ok(GeneratedField::InnerExpr), + "isComplete" | "is_complete" => Ok(GeneratedField::IsComplete), + "expressionId" | "expression_id" => Ok(GeneratedField::ExpressionId), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = PhysicalDynamicFilterNode; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.PhysicalDynamicFilterNode") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut children__ = None; + let mut remapped_children__ = None; + let mut generation__ = None; + let mut inner_expr__ = None; + let mut is_complete__ = None; + let mut expression_id__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Children => { + if children__.is_some() { + return Err(serde::de::Error::duplicate_field("children")); + } + children__ = Some(map_.next_value()?); + } + GeneratedField::RemappedChildren => { + if remapped_children__.is_some() { + return Err(serde::de::Error::duplicate_field("remappedChildren")); + } + remapped_children__ = Some(map_.next_value()?); + } + GeneratedField::Generation => { + if generation__.is_some() { + return Err(serde::de::Error::duplicate_field("generation")); + } + generation__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::InnerExpr => { + if inner_expr__.is_some() { + return Err(serde::de::Error::duplicate_field("innerExpr")); + } + inner_expr__ = map_.next_value()?; + } + GeneratedField::IsComplete => { + if is_complete__.is_some() { + return Err(serde::de::Error::duplicate_field("isComplete")); + } + is_complete__ = Some(map_.next_value()?); + } + GeneratedField::ExpressionId => { + if expression_id__.is_some() { + return Err(serde::de::Error::duplicate_field("expressionId")); + } + expression_id__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + } + } + Ok(PhysicalDynamicFilterNode { + children: children__.unwrap_or_default(), + remapped_children: remapped_children__.unwrap_or_default(), + generation: generation__.unwrap_or_default(), + inner_expr: inner_expr__, + is_complete: is_complete__.unwrap_or_default(), + expression_id: expression_id__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.PhysicalDynamicFilterNode", FIELDS, GeneratedVisitor) + } +} +impl serde::Serialize for PhysicalExprNode { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.expr_type.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalExprNode", len)?; if let Some(v) = self.expr_type.as_ref() { match v { physical_expr_node::ExprType::Column(v) => { @@ -16610,6 +16790,9 @@ impl serde::Serialize for PhysicalExprNode { physical_expr_node::ExprType::HashExpr(v) => { struct_ser.serialize_field("hashExpr", v)?; } + physical_expr_node::ExprType::DynamicFilter(v) => { + struct_ser.serialize_field("dynamicFilter", v)?; + } } } struct_ser.end() @@ -16622,8 +16805,6 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { D: serde::Deserializer<'de>, { const FIELDS: &[&str] = &[ - "expr_id", - "exprId", "column", "literal", "binary_expr", @@ -16656,11 +16837,12 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "unknownColumn", "hash_expr", "hashExpr", + "dynamic_filter", + "dynamicFilter", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { - ExprId, Column, Literal, BinaryExpr, @@ -16680,6 +16862,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { Extension, UnknownColumn, HashExpr, + DynamicFilter, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -16701,7 +16884,6 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { E: serde::de::Error, { match value { - "exprId" | "expr_id" => Ok(GeneratedField::ExprId), "column" => Ok(GeneratedField::Column), "literal" => Ok(GeneratedField::Literal), "binaryExpr" | "binary_expr" => Ok(GeneratedField::BinaryExpr), @@ -16721,6 +16903,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "extension" => Ok(GeneratedField::Extension), "unknownColumn" | "unknown_column" => Ok(GeneratedField::UnknownColumn), "hashExpr" | "hash_expr" => Ok(GeneratedField::HashExpr), + "dynamicFilter" | "dynamic_filter" => Ok(GeneratedField::DynamicFilter), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -16740,18 +16923,9 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { where V: serde::de::MapAccess<'de>, { - let mut expr_id__ = None; let mut expr_type__ = None; while let Some(k) = map_.next_key()? { match k { - GeneratedField::ExprId => { - if expr_id__.is_some() { - return Err(serde::de::Error::duplicate_field("exprId")); - } - expr_id__ = - map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) - ; - } GeneratedField::Column => { if expr_type__.is_some() { return Err(serde::de::Error::duplicate_field("column")); @@ -16883,12 +17057,18 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { return Err(serde::de::Error::duplicate_field("hashExpr")); } expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::HashExpr) +; + } + GeneratedField::DynamicFilter => { + if expr_type__.is_some() { + return Err(serde::de::Error::duplicate_field("dynamicFilter")); + } + expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::DynamicFilter) ; } } } Ok(PhysicalExprNode { - expr_id: expr_id__, expr_type: expr_type__, }) } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index ab60c3058dbde..243aa187098fa 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1307,17 +1307,9 @@ pub struct PhysicalExtensionNode { /// physical expressions #[derive(Clone, PartialEq, ::prost::Message)] pub struct PhysicalExprNode { - /// Unique identifier for this expression to do deduplication during deserialization. - /// When serializing, this is set to a unique identifier for each combination of - /// expression, process and serialization run. - /// When deserializing, if this ID has been seen before, the cached Arc is returned - /// instead of creating a new one, enabling reconstruction of referential integrity - /// across serde roundtrips. - #[prost(uint64, optional, tag = "30")] - pub expr_id: ::core::option::Option, #[prost( oneof = "physical_expr_node::ExprType", - tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20, 21" + tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20, 21, 22" )] pub expr_type: ::core::option::Option, } @@ -1370,9 +1362,26 @@ pub mod physical_expr_node { UnknownColumn(super::UnknownColumn), #[prost(message, tag = "21")] HashExpr(super::PhysicalHashExprNode), + #[prost(message, tag = "22")] + DynamicFilter(::prost::alloc::boxed::Box), } } #[derive(Clone, PartialEq, ::prost::Message)] +pub struct PhysicalDynamicFilterNode { + #[prost(message, repeated, tag = "1")] + pub children: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "2")] + pub remapped_children: ::prost::alloc::vec::Vec, + #[prost(uint64, tag = "3")] + pub generation: u64, + #[prost(message, optional, boxed, tag = "4")] + pub inner_expr: ::core::option::Option<::prost::alloc::boxed::Box>, + #[prost(bool, tag = "5")] + pub is_complete: bool, + #[prost(uint64, tag = "6")] + pub expression_id: u64, +} +#[derive(Clone, PartialEq, ::prost::Message)] pub struct PhysicalScalarUdfNode { #[prost(string, tag = "1")] pub name: ::prost::alloc::string::String, diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 61665db607757..69ab9614dc022 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -58,6 +58,9 @@ use super::{ use crate::logical_plan::{self}; use crate::protobuf::physical_expr_node::ExprType; use crate::{convert_required, protobuf}; +use datafusion_physical_expr::expressions::{ + DynamicFilterPhysicalExpr, DynamicFilterSnapshot, +}; impl From<&protobuf::PhysicalColumn> for Column { fn from(c: &protobuf::PhysicalColumn) -> Column { @@ -524,6 +527,51 @@ pub fn parse_physical_expr_with_converter( hash_expr.description.clone(), )) } + ExprType::DynamicFilter(dynamic_filter) => { + let children = parse_physical_exprs( + &dynamic_filter.children, + ctx, + input_schema, + codec, + proto_converter, + )?; + + let remapped_children = if !dynamic_filter.remapped_children.is_empty() { + Some(parse_physical_exprs( + &dynamic_filter.remapped_children, + ctx, + input_schema, + codec, + proto_converter, + )?) + } else { + None + }; + + let inner_expr = parse_required_physical_expr( + dynamic_filter.inner_expr.as_deref(), + ctx, + "inner_expr", + input_schema, + codec, + proto_converter, + )?; + + // Recreate filter from snapshot. expression_id is a required wire + // field — we never generate a fresh id on decode, so multiple + // occurrences of the same dynamic filter retain their shared identity. + let snapshot = DynamicFilterSnapshot::new( + dynamic_filter.expression_id, + children, + remapped_children, + dynamic_filter.generation, + inner_expr, + dynamic_filter.is_complete, + ); + let base_filter: Arc = + Arc::new(DynamicFilterPhysicalExpr::from(snapshot)); + base_filter + } ExprType::Extension(extension) => { let inputs: Vec> = extension .inputs diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 206f4378d3d3b..563d5430318f8 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -19,7 +19,6 @@ use std::any::Any; use std::cell::RefCell; use std::collections::HashMap; use std::fmt::Debug; -use std::hash::{DefaultHasher, Hash, Hasher}; use std::sync::Arc; use arrow::compute::SortOptions; @@ -3097,7 +3096,6 @@ impl protobuf::PhysicalPlanNode { nulls_first: expr.options.nulls_first, }); Ok(protobuf::PhysicalExprNode { - expr_id: None, expr_type: Some(ExprType::Sort(sort_expr)), }) }) @@ -3183,7 +3181,6 @@ impl protobuf::PhysicalPlanNode { nulls_first: expr.options.nulls_first, }); Ok(protobuf::PhysicalExprNode { - expr_id: None, expr_type: Some(ExprType::Sort(sort_expr)), }) }) @@ -3791,6 +3788,7 @@ struct DataEncoderTuple { } pub struct DefaultPhysicalProtoConverter; + impl PhysicalProtoConverterExtension for DefaultPhysicalProtoConverter { fn proto_to_execution_plan( &self, @@ -3839,85 +3837,11 @@ impl PhysicalProtoConverterExtension for DefaultPhysicalProtoConverter { } } -/// Internal serializer that adds expr_id to expressions. -/// Created fresh for each serialization operation. -struct DeduplicatingSerializer { - /// Random salt combined with pointer addresses and process ID to create globally unique expr_ids. - session_id: u64, -} - -impl DeduplicatingSerializer { - fn new() -> Self { - Self { - session_id: rand::random(), - } - } -} - -impl PhysicalProtoConverterExtension for DeduplicatingSerializer { - fn proto_to_execution_plan( - &self, - _ctx: &TaskContext, - _codec: &dyn PhysicalExtensionCodec, - _proto: &protobuf::PhysicalPlanNode, - ) -> Result> { - internal_err!("DeduplicatingSerializer cannot deserialize execution plans") - } - - fn execution_plan_to_proto( - &self, - plan: &Arc, - codec: &dyn PhysicalExtensionCodec, - ) -> Result - where - Self: Sized, - { - protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter( - Arc::clone(plan), - codec, - self, - ) - } - - fn proto_to_physical_expr( - &self, - _proto: &protobuf::PhysicalExprNode, - _ctx: &TaskContext, - _input_schema: &Schema, - _codec: &dyn PhysicalExtensionCodec, - ) -> Result> - where - Self: Sized, - { - internal_err!("DeduplicatingSerializer cannot deserialize physical expressions") - } - - fn physical_expr_to_proto( - &self, - expr: &Arc, - codec: &dyn PhysicalExtensionCodec, - ) -> Result { - let mut proto = serialize_physical_expr_with_converter(expr, codec, self)?; - - // Hash session_id, pointer address, and process ID together to create expr_id. - // - session_id: random per serializer, prevents collisions when merging serializations - // - ptr: unique address per Arc within a process - // - pid: prevents collisions if serializer is shared across processes - let mut hasher = DefaultHasher::new(); - self.session_id.hash(&mut hasher); - (Arc::as_ptr(expr) as *const () as u64).hash(&mut hasher); - std::process::id().hash(&mut hasher); - proto.expr_id = Some(hasher.finish()); - - Ok(proto) - } -} - -/// Internal deserializer that caches expressions by expr_id. -/// Created fresh for each deserialization operation. +/// Internal deserializer that caches expressions by their `expression_id()` so +/// multiple occurrences of the same expression are deduped. #[derive(Default)] struct DeduplicatingDeserializer { - /// Cache mapping expr_id to deserialized expressions. + /// Cache mapping expression_id to deserialized expressions. cache: RefCell>>, } @@ -3952,24 +3876,26 @@ impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { where Self: Sized, { - if let Some(expr_id) = proto.expr_id { - // Check cache first - if let Some(cached) = self.cache.borrow().get(&expr_id) { - return Ok(Arc::clone(cached)); - } - // Deserialize and cache - let expr = parse_physical_expr_with_converter( - proto, - ctx, - input_schema, - codec, - self, - )?; - self.cache.borrow_mut().insert(expr_id, Arc::clone(&expr)); - Ok(expr) - } else { - parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self) + let parsed = + parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self)?; + let Some(id) = parsed.expression_id() else { + return Ok(parsed); + }; + + let mut cache = self.cache.borrow_mut(); + if let Some(cached) = cache.get(&id) { + let children: Vec<_> = parsed.children().into_iter().cloned().collect(); + let cached_children: Vec<_> = + cached.children().into_iter().cloned().collect(); + return if children == cached_children { + Ok(Arc::clone(cached)) + } else { + Arc::clone(cached).with_new_children(children) + }; } + + cache.insert(id, Arc::clone(&parsed)); + Ok(parsed) } fn physical_expr_to_proto( @@ -3981,19 +3907,12 @@ impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { } } -/// A proto converter that adds expression deduplication during serialization -/// and deserialization. -/// -/// During serialization, each expression's Arc pointer address is XORed with a -/// random session_id to create a salted `expr_id`. This prevents cross-process -/// collisions when serialized plans are merged. +/// A proto converter that deduplicates [`PhysicalExpr`] by [`PhysicalExpr::expression_id`]. +/// This helps preserve referential integrity when deserializing [`ExecutionPlan`]s +/// which may contain multiple occurrences of the same [`PhysicalExpr`] (ex. when +/// [`DynamicFilterPhysicalExpr`] are pushed down, it is important to preserve +/// referential integrity). /// -/// During deserialization, expressions with the same `expr_id` share the same -/// Arc, reducing memory usage for plans with duplicate expressions (e.g., large -/// IN lists) and supporting correctly linking [`DynamicFilterPhysicalExpr`] instances. -/// -/// This converter is stateless - it creates internal serializers/deserializers -/// on demand for each operation. /// /// [`DynamicFilterPhysicalExpr`]: https://docs.rs/datafusion-physical-expr/latest/datafusion_physical_expr/expressions/struct.DynamicFilterPhysicalExpr.html #[derive(Debug, Default, Clone, Copy)] @@ -4018,11 +3937,10 @@ impl PhysicalProtoConverterExtension for DeduplicatingProtoConverter { where Self: Sized, { - let serializer = DeduplicatingSerializer::new(); protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter( Arc::clone(plan), codec, - &serializer, + self, ) } @@ -4045,8 +3963,7 @@ impl PhysicalProtoConverterExtension for DeduplicatingProtoConverter { expr: &Arc, codec: &dyn PhysicalExtensionCodec, ) -> Result { - let serializer = DeduplicatingSerializer::new(); - serializer.physical_expr_to_proto(expr, codec) + serialize_physical_expr_with_converter(expr, codec, self) } } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index f0eb6d27aac30..4baf2ab8739c7 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -33,11 +33,11 @@ use datafusion_datasource_parquet::file_format::ParquetSink; use datafusion_expr::WindowFrame; use datafusion_physical_expr::ScalarFunctionExpr; use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; -use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::{ - BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr, - LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, + BinaryExpr, CaseExpr, CastExpr, Column, DynamicFilterPhysicalExpr, + DynamicFilterSnapshot, InListExpr, IsNotNullExpr, IsNullExpr, LikeExpr, Literal, + NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, }; use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr}; use datafusion_physical_plan::udaf::AggregateFunctionExpr; @@ -71,7 +71,6 @@ pub fn serialize_physical_aggr_expr( let mut buf = Vec::new(); codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?; Ok(protobuf::PhysicalExprNode { - expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr( protobuf::PhysicalAggregateExprNode { aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)), @@ -257,10 +256,7 @@ pub fn serialize_physical_expr_with_converter( codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result { - // Snapshot the expr in case it has dynamic predicate state so - // it can be serialized - let expr = snapshot_physical_expr(Arc::clone(value))?; - + let expr = value.as_ref(); // HashTableLookupExpr is used for dynamic filter pushdown in hash joins. // It contains an Arc (the build-side hash table) which // cannot be serialized - the hash table is a runtime structure built during @@ -281,14 +277,12 @@ pub fn serialize_physical_expr_with_converter( )), }; return Ok(protobuf::PhysicalExprNode { - expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(value)), }); } if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { - expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::Column( protobuf::PhysicalColumn { name: expr.name().to_string(), @@ -298,7 +292,6 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { - expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn( protobuf::UnknownColumn { name: expr.name().to_string(), @@ -340,14 +333,12 @@ pub fn serialize_physical_expr_with_converter( }); Ok(protobuf::PhysicalExprNode { - expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr( binary_expr, )), }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { - expr_id: None, expr_type: Some( protobuf::physical_expr_node::ExprType::Case( Box::new( @@ -390,7 +381,6 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { - expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new( protobuf::PhysicalNot { expr: Some(Box::new( @@ -401,7 +391,6 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { - expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr( Box::new(protobuf::PhysicalIsNull { expr: Some(Box::new( @@ -412,7 +401,6 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { - expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr( Box::new(protobuf::PhysicalIsNotNull { expr: Some(Box::new( @@ -423,7 +411,6 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { - expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::InList(Box::new( protobuf::PhysicalInListNode { expr: Some(Box::new( @@ -436,7 +423,6 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { - expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::Negative(Box::new( protobuf::PhysicalNegativeNode { expr: Some(Box::new( @@ -447,14 +433,12 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(lit) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { - expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::Literal( lit.value().try_into()?, )), }) } else if let Some(cast) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { - expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::Cast(Box::new( protobuf::PhysicalCastNode { expr: Some(Box::new( @@ -466,7 +450,6 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(cast) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { - expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new( protobuf::PhysicalTryCastNode { expr: Some(Box::new( @@ -480,7 +463,6 @@ pub fn serialize_physical_expr_with_converter( let mut buf = Vec::new(); codec.try_encode_udf(expr.fun(), &mut buf)?; Ok(protobuf::PhysicalExprNode { - expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::ScalarUdf( protobuf::PhysicalScalarUdfNode { name: expr.name().to_string(), @@ -497,7 +479,6 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { - expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::LikeExpr(Box::new( protobuf::PhysicalLikeExprNode { negated: expr.negated(), @@ -513,7 +494,6 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { - expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::HashExpr( protobuf::PhysicalHashExprNode { on_columns: serialize_physical_exprs( @@ -526,6 +506,41 @@ pub fn serialize_physical_expr_with_converter( }, )), }) + } else if let Some(df) = expr.downcast_ref::() { + // Capture all state atomically. + let snapshot = DynamicFilterSnapshot::from(df); + + let children = snapshot + .children() + .iter() + .map(|child| proto_converter.physical_expr_to_proto(child, codec)) + .collect::>>()?; + + let remapped_children = if let Some(remapped) = snapshot.remapped_children() { + remapped + .iter() + .map(|child| proto_converter.physical_expr_to_proto(child, codec)) + .collect::>>()? + } else { + vec![] + }; + + let inner_expr = Box::new( + proto_converter.physical_expr_to_proto(snapshot.inner_expr(), codec)?, + ); + + Ok(protobuf::PhysicalExprNode { + expr_type: Some(protobuf::physical_expr_node::ExprType::DynamicFilter( + Box::new(protobuf::PhysicalDynamicFilterNode { + children, + remapped_children, + generation: snapshot.generation(), + inner_expr: Some(inner_expr), + is_complete: snapshot.is_complete(), + expression_id: snapshot.expression_id(), + }), + )), + }) } else { let mut buf: Vec = vec![]; match codec.try_encode_expr(value, &mut buf) { @@ -536,7 +551,6 @@ pub fn serialize_physical_expr_with_converter( .map(|e| proto_converter.physical_expr_to_proto(e, codec)) .collect::>()?; Ok(protobuf::PhysicalExprNode { - expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::Extension( protobuf::PhysicalExtensionExprNode { expr: buf, inputs }, )), diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 3f1c306603bc1..7810b3143eb2c 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -128,6 +128,10 @@ use crate::cases::{ CustomUDWF, CustomUDWFNode, MyAggregateUDF, MyAggregateUdfNode, MyRegexUdf, MyRegexUdfNode, }; +use datafusion_physical_expr::expressions::{ + DynamicFilterPhysicalExpr, DynamicFilterSnapshot, +}; +use datafusion_physical_expr::utils::reassign_expr_columns; /// Perform a serde roundtrip and assert that the string representation of the before and after plans /// are identical. Note that this often isn't sufficient to guarantee that no information is @@ -2683,442 +2687,310 @@ fn roundtrip_call_null_scalar_struct_dict() -> Result<()> { roundtrip_test(filter) } -/// Test that expression deduplication works during deserialization. -/// When the same expression Arc is serialized multiple times, it should be -/// deduplicated on deserialization (sharing the same Arc). -#[test] -fn test_expression_deduplication() -> Result<()> { - let field_a = Field::new("a", DataType::Int64, false); - let schema = Arc::new(Schema::new(vec![field_a])); - - // Create a shared expression that will be used multiple times - let shared_col: Arc = Arc::new(Column::new("a", 0)); - - // Create an InList expression that uses the same column Arc multiple times - // This simulates a real-world scenario where expressions are shared - let in_list_expr = in_list( - Arc::clone(&shared_col), - vec![lit(1i64), lit(2i64), lit(3i64)], - &false, - &schema, - )?; - - // Create a binary expression that uses the shared column and the in_list result - let binary_expr: Arc = Arc::new(BinaryExpr::new( - Arc::clone(&shared_col), - Operator::Eq, - lit(42i64), - )); - - // Create a plan that has both expressions (they share the `shared_col` Arc) - let input = Arc::new(EmptyExec::new(schema.clone())); - let filter = FilterExecBuilder::new(in_list_expr, input).build()?; - let projection_exprs = vec![ProjectionExpr { - expr: binary_expr, - alias: "result".to_string(), - }]; - let exec_plan = - Arc::new(ProjectionExec::try_new(projection_exprs, Arc::new(filter))?); - - let ctx = SessionContext::new(); - let codec = DefaultPhysicalExtensionCodec {}; - let proto_converter = DeduplicatingProtoConverter {}; - - // Perform roundtrip - let bytes = physical_plan_to_bytes_with_proto_converter( - Arc::clone(&exec_plan) as Arc, - &codec, - &proto_converter, - )?; - - // Create a new converter for deserialization (fresh cache) - let deser_converter = DeduplicatingProtoConverter {}; - let result_plan = physical_plan_from_bytes_with_proto_converter( - bytes.as_ref(), - ctx.task_ctx().as_ref(), - &codec, - &deser_converter, - )?; - - // Verify the plan structure is correct - pretty_assertions::assert_eq!(format!("{exec_plan:?}"), format!("{result_plan:?}")); - - Ok(()) +/// Create a [`DynamicFilterPhysicalExpr`] with child column expression "a" @ index 0. +fn make_dynamic_filter() -> Arc { + Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::new(Column::new("a", 0)) as Arc], + lit(true), + )) as Arc } -/// Test that expression deduplication correctly shares Arcs for identical expressions. -/// This test verifies the core deduplication behavior. -#[test] -fn test_expression_deduplication_arc_sharing() -> Result<()> { - use datafusion_proto::bytes::{ - physical_plan_from_bytes_with_proto_converter, - physical_plan_to_bytes_with_proto_converter, - }; - - let field_a = Field::new("a", DataType::Int64, false); - let schema = Arc::new(Schema::new(vec![field_a])); - - // Create a column expression - let col_expr: Arc = Arc::new(Column::new("a", 0)); - - // Create a projection that uses the SAME Arc twice - // After roundtrip, both should point to the same Arc - let projection_exprs = vec![ - ProjectionExpr { - expr: Arc::clone(&col_expr), - alias: "a1".to_string(), - }, - ProjectionExpr { - expr: Arc::clone(&col_expr), // Same Arc! - alias: "a2".to_string(), - }, - ]; - - let input = Arc::new(EmptyExec::new(schema)); - let exec_plan = Arc::new(ProjectionExec::try_new(projection_exprs, input)?); - - let ctx = SessionContext::new(); - let codec = DefaultPhysicalExtensionCodec {}; - let proto_converter = DeduplicatingProtoConverter {}; - - // Serialize - let bytes = physical_plan_to_bytes_with_proto_converter( - Arc::clone(&exec_plan) as Arc, - &codec, - &proto_converter, - )?; - - // Deserialize with a fresh converter - let deser_converter = DeduplicatingProtoConverter {}; - let result_plan = physical_plan_from_bytes_with_proto_converter( - bytes.as_ref(), - ctx.task_ctx().as_ref(), - &codec, - &deser_converter, - )?; - - // Get the projection from the result - let projection = result_plan - .downcast_ref::() - .expect("Expected ProjectionExec"); - - let exprs: Vec<_> = projection.expr().iter().collect(); - assert_eq!(exprs.len(), 2); - - // The key test: both expressions should point to the same Arc after deduplication - // This is because they were the same Arc before serialization - assert!( - Arc::ptr_eq(&exprs[0].expr, &exprs[1].expr), - "Expected both expressions to share the same Arc after deduplication" - ); - - Ok(()) +/// Update a [`DynamicFilterPhysicalExpr`]'s children to support child schema "b" @ 0, "a" @ 1. +fn make_reassigned_dynamic_filter( + filter: Arc, +) -> Result<(Arc, Arc)> { + let schema = Arc::new(Schema::new(vec![ + Field::new("b", DataType::Int64, false), + Field::new("a", DataType::Int64, false), + ])); + let reassigned = reassign_expr_columns(filter, &schema)?; + Ok((schema, reassigned)) +} + +/// Extract an expression id from a [`PhysicalExpr`] proto. Only dynamic filter +/// protos carry an expression id. +fn proto_expression_id(expr: &PhysicalExprNode) -> u64 { + match expr.expr_type.as_ref() { + Some( + datafusion_proto::protobuf::physical_expr_node::ExprType::DynamicFilter(df), + ) => df.expression_id, + _ => panic!("expected PhysicalExprNode wrapping a DynamicFilter"), + } } -/// Test backward compatibility: protos without expr_id should still deserialize correctly. -#[test] -fn test_backward_compatibility_no_expr_id() -> Result<()> { - let field_a = Field::new("a", DataType::Int64, false); - let schema = Arc::new(Schema::new(vec![field_a])); - - // Manually create a proto without expr_id set - let proto = PhysicalExprNode { - expr_id: None, // Simulating old proto without this field - expr_type: Some( - datafusion_proto::protobuf::physical_expr_node::ExprType::Column( - datafusion_proto::protobuf::PhysicalColumn { - name: "a".to_string(), - index: 0, - }, - ), - ), - }; +/// Roundtrip a single physical expression shaped like so: +/// +/// ```text +/// BinaryExpr(AND) +/// / \ +/// filter_expr_1 filter_expr_2 +/// ``` +/// +/// Returns filter_expr_1 and filter_expr_2 after deserialization. +fn roundtrip_dynamic_filter_expr_pair( + filter_expr_1: Arc, + filter_expr_2: Arc, + schema: Arc, +) -> Result<(Arc, Arc)> { + let pair_expr = Arc::new(BinaryExpr::new( + Arc::clone(&filter_expr_1), + Operator::And, + Arc::clone(&filter_expr_2), + )) as Arc; - let ctx = SessionContext::new(); let codec = DefaultPhysicalExtensionCodec {}; - let proto_converter = DefaultPhysicalProtoConverter {}; - - // Should deserialize without error - let result = proto_converter.proto_to_physical_expr( + let converter = DeduplicatingProtoConverter {}; + let proto = converter.physical_expr_to_proto(&pair_expr, &codec)?; + let ctx = SessionContext::new(); + let deserialized_expr = converter.proto_to_physical_expr( &proto, ctx.task_ctx().as_ref(), &schema, &codec, )?; - // Verify the result is correct - let col = result.downcast_ref::().expect("Expected Column"); - assert_eq!(col.name(), "a"); - assert_eq!(col.index(), 0); + let binary = deserialized_expr + .downcast_ref::() + .expect("Expected BinaryExpr"); - Ok(()) + Ok((Arc::clone(binary.left()), Arc::clone(binary.right()))) } -/// Test that deduplication works within a single plan deserialization and that -/// separate deserializations produce independent expressions (no cross-operation sharing). -#[test] -fn test_deduplication_within_plan_deserialization() -> Result<()> { - use datafusion_proto::bytes::{ - physical_plan_from_bytes_with_proto_converter, - physical_plan_to_bytes_with_proto_converter, - }; +/// Roundtrip an execution plan shaped like so: +/// +/// ```text +/// FilterExec(dynamic_filter_1 on a@0) +/// ProjectionExec(a := Column("a", source_index)) +/// DataSourceExec +/// ParquetSource(predicate = dynamic_filter_2) +/// ``` +/// +/// `dynamic_filter_1` and `dynamic_filter_2` are the same dynamic filter, except with +/// different children. +/// +/// Returns +/// - `dynamic_filter_1` before serialization +/// - `dynamic_filter_2` before serialization +/// - `dynamic_filter_1` after serialization +/// - `dynamic_filter_2` after serialization +#[allow(clippy::type_complexity)] +fn roundtrip_dynamic_filter_plan_pair() -> Result<( + Arc, + Arc, + Arc, + Arc, +)> { + let filter_expr_1 = make_dynamic_filter(); + let (data_source_schema, filter_expr_2) = + make_reassigned_dynamic_filter(Arc::clone(&filter_expr_1))?; + let left_before = Arc::clone(&filter_expr_1); + let right_before = Arc::clone(&filter_expr_2); + let file_source = Arc::new( + ParquetSource::new(Arc::clone(&data_source_schema)) + .with_predicate(Arc::clone(&filter_expr_2)), + ); + let scan_config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new( + "/path/to/file.parquet".to_string(), + 1024, + )])]) + .build(); + let data_source_exec = + DataSourceExec::from_data_source(scan_config) as Arc; - let field_a = Field::new("a", DataType::Int64, false); - let schema = Arc::new(Schema::new(vec![field_a])); + let projection_exec = Arc::new(ProjectionExec::try_new( + vec![ProjectionExpr { + expr: Arc::new(Column::new("a", 1)) as Arc, + alias: "a".to_string(), + }], + data_source_exec, + )?) as Arc; + let filter_exec = Arc::new(FilterExec::try_new( + Arc::clone(&filter_expr_1), + projection_exec, + )?) as Arc; - // Create a plan with expressions that will be deduplicated - let col_expr: Arc = Arc::new(Column::new("a", 0)); - let projection_exprs = vec![ - ProjectionExpr { - expr: Arc::clone(&col_expr), - alias: "a1".to_string(), - }, - ProjectionExpr { - expr: Arc::clone(&col_expr), // Same Arc - will be deduplicated - alias: "a2".to_string(), - }, - ]; - let exec_plan = Arc::new(ProjectionExec::try_new( - projection_exprs, - Arc::new(EmptyExec::new(schema)), - )?); + let codec = DefaultPhysicalExtensionCodec {}; + let converter = DeduplicatingProtoConverter {}; + let proto = converter.execution_plan_to_proto(&filter_exec, &codec)?; let ctx = SessionContext::new(); - let codec = DefaultPhysicalExtensionCodec {}; - let proto_converter = DeduplicatingProtoConverter {}; + let deserialized_plan = + converter.proto_to_execution_plan(ctx.task_ctx().as_ref(), &codec, &proto)?; + + let outer_filter = deserialized_plan + .downcast_ref::() + .expect("Expected outer FilterExec"); + let left_filter = Arc::clone(outer_filter.predicate()); + let projection = outer_filter.children()[0] + .downcast_ref::() + .expect("Expected ProjectionExec"); + let data_source = projection + .input() + .downcast_ref::() + .expect("Expected DataSourceExec"); + let scan_config = data_source + .data_source() + .downcast_ref::() + .expect("Expected FileScanConfig"); + let right_filter = scan_config + .file_source() + .filter() + .expect("Expected pushed-down predicate"); - // Serialize - let bytes = physical_plan_to_bytes_with_proto_converter( - Arc::clone(&exec_plan) as Arc, - &codec, - &proto_converter, - )?; + Ok((left_before, right_before, left_filter, right_filter)) +} - // First deserialization - let plan1 = physical_plan_from_bytes_with_proto_converter( - bytes.as_ref(), - ctx.task_ctx().as_ref(), - &codec, - &proto_converter, - )?; +/// Takes two [`DynamicFilterPhysicalExpr`] and asserts that updates to one are visible +/// via the other. This helps assert that referential integrity is maintained after +/// deserializing. +fn assert_dynamic_filter_update_is_visible( + left_filter: &Arc, + right_filter: &Arc, +) -> Result<()> { + let left_filter = left_filter + .downcast_ref::() + .expect("Expected dynamic filter"); + let right_filter = right_filter + .downcast_ref::() + .expect("Expected dynamic filter"); - // Check that the plan was deserialized correctly with deduplication - let projection1 = plan1 - .downcast_ref::() - .expect("Expected ProjectionExec"); - let exprs1: Vec<_> = projection1.expr().iter().collect(); - assert_eq!(exprs1.len(), 2); - assert!( - Arc::ptr_eq(&exprs1[0].expr, &exprs1[1].expr), - "Expected both expressions to share the same Arc after deduplication" - ); + // Sanity check that the filters have the same generation. + let original_generation = left_filter.snapshot_generation(); + assert_eq!(original_generation, right_filter.snapshot_generation(),); - // Second deserialization - let plan2 = physical_plan_from_bytes_with_proto_converter( - bytes.as_ref(), - ctx.task_ctx().as_ref(), - &codec, - &proto_converter, - )?; + left_filter.update(lit(123_i64))?; - // Check that the second plan was also deserialized correctly - let projection2 = plan2 - .downcast_ref::() - .expect("Expected ProjectionExec"); - let exprs2: Vec<_> = projection2.expr().iter().collect(); - assert_eq!(exprs2.len(), 2); - assert!( - Arc::ptr_eq(&exprs2[0].expr, &exprs2[1].expr), - "Expected both expressions to share the same Arc after deduplication" + // Assert that both generations updated. + assert_eq!(original_generation + 1, right_filter.snapshot_generation(),); + assert_eq!( + left_filter.snapshot_generation(), + right_filter.snapshot_generation(), ); - // Check that there was no deduplication across deserializations - assert!( - !Arc::ptr_eq(&exprs1[0].expr, &exprs2[0].expr), - "Expected expressions from different deserializations to be different Arcs" - ); - assert!( - !Arc::ptr_eq(&exprs1[1].expr, &exprs2[1].expr), - "Expected expressions from different deserializations to be different Arcs" - ); + // Ensure both filters have the updated expr. + let expected_current = r#"Literal { value: Int64(123), field: Field { name: "lit", data_type: Int64 } }"#; + assert_eq!(expected_current, format!("{:?}", left_filter.current()?),); + assert_eq!(expected_current, format!("{:?}", right_filter.current()?),); Ok(()) } -/// Test that deduplication works within direct expression deserialization and that -/// separate deserializations produce independent expressions (no cross-operation sharing). -#[test] -fn test_deduplication_within_expr_deserialization() -> Result<()> { - let field_a = Field::new("a", DataType::Int64, false); - let schema = Arc::new(Schema::new(vec![field_a])); - - // Create a binary expression where both sides are the same Arc - // This allows us to test deduplication within a single deserialization - let col_expr: Arc = Arc::new(Column::new("a", 0)); - let binary_expr: Arc = Arc::new(BinaryExpr::new( - Arc::clone(&col_expr), - Operator::Plus, - Arc::clone(&col_expr), // Same Arc - will be deduplicated - )); - - let ctx = SessionContext::new(); - let codec = DefaultPhysicalExtensionCodec {}; - let proto_converter = DeduplicatingProtoConverter {}; - - // Serialize the expression - let proto = proto_converter.physical_expr_to_proto(&binary_expr, &codec)?; - - // First expression deserialization - let expr1 = proto_converter.proto_to_physical_expr( - &proto, - ctx.task_ctx().as_ref(), - &schema, - &codec, - )?; +fn assert_dynamic_filter_snapshot_matches( + expected: &Arc, + actual: &Arc, +) { + let expected = expected + .downcast_ref::() + .expect("Expected dynamic filter"); + let actual = actual + .downcast_ref::() + .expect("Expected dynamic filter"); - // Check that deduplication worked within the deserialization - let binary1 = expr1 - .downcast_ref::() - .expect("Expected BinaryExpr"); - assert!( - Arc::ptr_eq(binary1.left(), binary1.right()), - "Expected both sides to share the same Arc after deduplication" + assert_eq!( + DynamicFilterSnapshot::from(expected).to_string(), + DynamicFilterSnapshot::from(actual).to_string(), ); +} - // Second expression deserialization - let expr2 = proto_converter.proto_to_physical_expr( - &proto, - ctx.task_ctx().as_ref(), - &schema, - &codec, - )?; +// Two clones of a dynamic filter expression should be deduped to the exact same expression. +#[test] +fn test_dynamic_filter_roundtrip_dedupe() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let filter_expr_1 = make_dynamic_filter(); + let filter_expr_2 = Arc::clone(&filter_expr_1); + + let (filter_expr_1_after_roundtrip, filter_expr_2_after_roundtrip) = + roundtrip_dynamic_filter_expr_pair( + Arc::clone(&filter_expr_1), + Arc::clone(&filter_expr_2), + schema, + )?; - // Check that the second expression was also deserialized correctly - let binary2 = expr2 - .downcast_ref::() - .expect("Expected BinaryExpr"); - assert!( - Arc::ptr_eq(binary2.left(), binary2.right()), - "Expected both sides to share the same Arc after deduplication" + // Assert the filters are not modified during roundtrip. + assert_dynamic_filter_snapshot_matches( + &filter_expr_1, + &filter_expr_1_after_roundtrip, ); - - // Check that there was no deduplication across deserializations - assert!( - !Arc::ptr_eq(binary1.left(), binary2.left()), - "Expected expressions from different deserializations to be different Arcs" + assert_dynamic_filter_snapshot_matches( + &filter_expr_2, + &filter_expr_2_after_roundtrip, ); - assert!( - !Arc::ptr_eq(binary1.right(), binary2.right()), - "Expected expressions from different deserializations to be different Arcs" + assert_dynamic_filter_snapshot_matches( + &filter_expr_1_after_roundtrip, + &filter_expr_2_after_roundtrip, ); + // Assert referential integrity. + assert_dynamic_filter_update_is_visible( + &filter_expr_1_after_roundtrip, + &filter_expr_2_after_roundtrip, + )?; + Ok(()) } -/// Test that session_id rotates between top-level serialization operations. -/// This verifies that each top-level serialization gets a fresh session_id, -/// which prevents cross-process collisions when serialized plans are merged. +/// Roundtrip test for an execution plan where there are multiple instances of a dynamic filter +/// with different children. #[test] -fn test_session_id_rotation_between_serializations() -> Result<()> { - let field_a = Field::new("a", DataType::Int64, false); - let _schema = Arc::new(Schema::new(vec![field_a])); - - // Create a simple expression - let col_expr: Arc = Arc::new(Column::new("a", 0)); - - let codec = DefaultPhysicalExtensionCodec {}; - let proto_converter = DeduplicatingProtoConverter {}; - - // First serialization - let proto1 = proto_converter.physical_expr_to_proto(&col_expr, &codec)?; - let expr_id1 = proto1.expr_id.expect("Expected expr_id to be set"); - - // Second serialization with the same converter - // The session_id should have rotated, so the expr_id should be different - // even though we're serializing the same expression (same pointer address) - let proto2 = proto_converter.physical_expr_to_proto(&col_expr, &codec)?; - let expr_id2 = proto2.expr_id.expect("Expected expr_id to be set"); - - // The expr_ids should be different because session_id rotated - assert_ne!( - expr_id1, expr_id2, - "Expected different expr_ids due to session_id rotation between serializations" +fn test_dynamic_filter_plan_roundtrip_dedupe() -> Result<()> { + let ( + filter_expr_1, + filter_expr_2, + filter_expr_1_after_roundtrip, + filter_expr_2_after_roundtrip, + ) = roundtrip_dynamic_filter_plan_pair()?; + + // Assert the filters are not modified during roundtrip. + // + // There's a small technicality that `filter_expr_1` is rewritten to an equivalent expression + // during deserialization, so we capture that here by calling + // `filter.with_new_children(filter.children)`. + let filter_expr_1_children = Arc::clone(&filter_expr_1) + .children() + .iter() + .map(|child| Arc::clone(child)) + .collect::>(); + let filter_expr_1 = filter_expr_1 + .clone() + .with_new_children(filter_expr_1_children) + .unwrap(); + assert_dynamic_filter_snapshot_matches( + &filter_expr_1, + &filter_expr_1_after_roundtrip, + ); + assert_dynamic_filter_snapshot_matches( + &filter_expr_2, + &filter_expr_2_after_roundtrip, ); - // Also test that serializing the same expression multiple times within - // the same top-level operation would give the same expr_id (not testable - // here directly since each physical_expr_to_proto is a top-level operation, - // but the deduplication tests verify this indirectly) + // Assert referential integrity. + assert_dynamic_filter_update_is_visible( + &filter_expr_1_after_roundtrip, + &filter_expr_2_after_roundtrip, + )?; Ok(()) } -/// Test that session_id rotation works correctly with execution plans. -/// This verifies the end-to-end behavior with plan serialization. #[test] -fn test_session_id_rotation_with_execution_plans() -> Result<()> { - use datafusion_proto::bytes::physical_plan_to_bytes_with_proto_converter; - - let field_a = Field::new("a", DataType::Int64, false); - let schema = Arc::new(Schema::new(vec![field_a])); - - // Create a simple plan - let col_expr: Arc = Arc::new(Column::new("a", 0)); - let projection_exprs = vec![ProjectionExpr { - expr: Arc::clone(&col_expr), - alias: "a1".to_string(), - }]; - let exec_plan = Arc::new(ProjectionExec::try_new( - projection_exprs.clone(), - Arc::new(EmptyExec::new(Arc::clone(&schema))), - )?); - +fn test_dynamic_filter_expression_id_is_stable_between_serializations() -> Result<()> { + let filter_expr = make_dynamic_filter(); let codec = DefaultPhysicalExtensionCodec {}; let proto_converter = DeduplicatingProtoConverter {}; - // First serialization - let bytes1 = physical_plan_to_bytes_with_proto_converter( - Arc::clone(&exec_plan) as Arc, - &codec, - &proto_converter, - )?; + let proto1 = proto_converter.physical_expr_to_proto(&filter_expr, &codec)?; + let expr_id1 = proto_expression_id(&proto1); - // Second serialization with the same converter - let bytes2 = physical_plan_to_bytes_with_proto_converter( - Arc::clone(&exec_plan) as Arc, - &codec, - &proto_converter, - )?; + let proto2 = proto_converter.physical_expr_to_proto(&filter_expr, &codec)?; + let expr_id2 = proto_expression_id(&proto2); - // The serialized bytes should be different due to different session_ids - // (specifically, the expr_id values embedded in the protobuf will differ) - assert_ne!( - bytes1.as_ref(), - bytes2.as_ref(), - "Expected different serialized bytes due to session_id rotation" + assert_eq!( + expr_id1, expr_id2, + "Expected the same dynamic filter expression id across serializations" ); - // But both should deserialize correctly - let ctx = SessionContext::new(); - let deser_converter = DeduplicatingProtoConverter {}; - - let plan1 = datafusion_proto::bytes::physical_plan_from_bytes_with_proto_converter( - bytes1.as_ref(), - ctx.task_ctx().as_ref(), - &codec, - &deser_converter, - )?; - - let plan2 = datafusion_proto::bytes::physical_plan_from_bytes_with_proto_converter( - bytes2.as_ref(), - ctx.task_ctx().as_ref(), - &codec, - &deser_converter, - )?; - - // Verify both plans have the expected structure - assert_eq!(plan1.schema(), plan2.schema()); - Ok(()) } From 6db8205ee1734faa67223dd60df6ba3ea6755748 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Fri, 24 Apr 2026 00:37:08 +0000 Subject: [PATCH 2/6] better assertions --- datafusion/proto/src/physical_plan/mod.rs | 8 +--- .../tests/cases/roundtrip_physical_plan.rs | 42 ++++++++++--------- 2 files changed, 24 insertions(+), 26 deletions(-) diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 563d5430318f8..f807847113f58 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -3885,13 +3885,7 @@ impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { let mut cache = self.cache.borrow_mut(); if let Some(cached) = cache.get(&id) { let children: Vec<_> = parsed.children().into_iter().cloned().collect(); - let cached_children: Vec<_> = - cached.children().into_iter().cloned().collect(); - return if children == cached_children { - Ok(Arc::clone(cached)) - } else { - Arc::clone(cached).with_new_children(children) - }; + return Arc::clone(cached).with_new_children(children); } cache.insert(id, Arc::clone(&parsed)); diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 7810b3143eb2c..8ef8fc90287e9 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -2877,21 +2877,38 @@ fn assert_dynamic_filter_update_is_visible( Ok(()) } +/// Assert that the actual dynamic filter snapshot matches the expected one +/// after roundtrip. fn assert_dynamic_filter_snapshot_matches( expected: &Arc, actual: &Arc, ) { - let expected = expected + let expected_filter = expected .downcast_ref::() .expect("Expected dynamic filter"); - let actual = actual + let actual_filter = actual .downcast_ref::() .expect("Expected dynamic filter"); - assert_eq!( - DynamicFilterSnapshot::from(expected).to_string(), - DynamicFilterSnapshot::from(actual).to_string(), - ); + let actual_snapshot = DynamicFilterSnapshot::from(actual_filter).to_string(); + let expected_snapshot = DynamicFilterSnapshot::from(expected_filter).to_string(); + if expected_snapshot == actual_snapshot { + return; + } + + // Note that the `DeduplicatingDeserializer` routes every cache hit through + // `with_new_children`. This produces an equivalent expression, but with + // remapped children that are equal to the original. Handle that case here. + let rewritten = Arc::clone(expected) + .with_new_children(expected.children().iter().map(|c| Arc::clone(c)).collect()) + .expect("with_new_children on a dynamic filter should not fail"); + let rewritten_snapshot = DynamicFilterSnapshot::from( + rewritten + .downcast_ref::() + .expect("with_new_children returns a DynamicFilterPhysicalExpr"), + ) + .to_string(); + assert_eq!(rewritten_snapshot, actual_snapshot); } // Two clones of a dynamic filter expression should be deduped to the exact same expression. @@ -2943,19 +2960,6 @@ fn test_dynamic_filter_plan_roundtrip_dedupe() -> Result<()> { ) = roundtrip_dynamic_filter_plan_pair()?; // Assert the filters are not modified during roundtrip. - // - // There's a small technicality that `filter_expr_1` is rewritten to an equivalent expression - // during deserialization, so we capture that here by calling - // `filter.with_new_children(filter.children)`. - let filter_expr_1_children = Arc::clone(&filter_expr_1) - .children() - .iter() - .map(|child| Arc::clone(child)) - .collect::>(); - let filter_expr_1 = filter_expr_1 - .clone() - .with_new_children(filter_expr_1_children) - .unwrap(); assert_dynamic_filter_snapshot_matches( &filter_expr_1, &filter_expr_1_after_roundtrip, From 41273a0105aab3380b8abfa75801981d0d5d70ab Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Fri, 24 Apr 2026 18:48:56 +0000 Subject: [PATCH 3/6] address feedback --- .../src/expressions/dynamic_filters.rs | 237 ++++++++---------- .../physical-expr/src/expressions/mod.rs | 2 +- datafusion/proto/proto/datafusion.proto | 8 +- datafusion/proto/src/generated/pbjson.rs | 44 ++-- datafusion/proto/src/generated/prost.rs | 8 +- .../proto/src/physical_plan/from_proto.rs | 31 ++- datafusion/proto/src/physical_plan/mod.rs | 25 +- .../proto/src/physical_plan/to_proto.rs | 47 ++-- .../tests/cases/roundtrip_physical_plan.rs | 42 ++-- 9 files changed, 234 insertions(+), 210 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index c60c4029814aa..c17288927c70a 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -80,72 +80,21 @@ pub struct DynamicFilterPhysicalExpr { nullable: Arc>>, } -#[derive(Debug)] -struct Inner { +/// Atomic internal state of a [`DynamicFilterPhysicalExpr`]. +/// +/// **Warning:** exposed publicly solely so that proto (de)serialization in +/// `datafusion-proto` can read and rebuild this state. Do not treat this type +/// or its layout as a stable API. +#[derive(Debug, Clone)] +pub struct Inner { /// A counter that gets incremented every time the expression is updated so that we can track changes cheaply. /// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes. - generation: u64, - expr: Arc, + pub generation: u64, + pub expr: Arc, /// Flag for quick synchronous check if filter is complete. /// This is redundant with the watch channel state, but allows us to return immediately /// from `wait_complete()` without subscribing if already complete. - is_complete: bool, -} - -/// An atomic snapshot of a [`DynamicFilterPhysicalExpr`] used to reconstruct the expression during -/// serialization / deserialization. -pub struct DynamicFilterSnapshot { - children: Vec>, - remapped_children: Option>>, - expression_id: u64, - // Inner state. - generation: u64, - inner_expr: Arc, - is_complete: bool, -} - -impl DynamicFilterSnapshot { - pub fn new( - expression_id: u64, - children: Vec>, - remapped_children: Option>>, - generation: u64, - inner_expr: Arc, - is_complete: bool, - ) -> Self { - Self { - children, - remapped_children, - expression_id, - generation, - inner_expr, - is_complete, - } - } - - pub fn children(&self) -> &[Arc] { - &self.children - } - - pub fn expression_id(&self) -> u64 { - self.expression_id - } - - pub fn remapped_children(&self) -> Option<&[Arc]> { - self.remapped_children.as_deref() - } - - pub fn generation(&self) -> u64 { - self.generation - } - - pub fn inner_expr(&self) -> &Arc { - &self.inner_expr - } - - pub fn is_complete(&self) -> bool { - self.is_complete - } + pub is_complete: bool, } // TODO: Include expression_id in debug output. @@ -167,73 +116,6 @@ impl std::fmt::Debug for DynamicFilterPhysicalExpr { } } -impl Display for DynamicFilterSnapshot { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "DynamicFilterSnapshot {{ children: {:?}, remapped_children: {:?}, expression_id: {}, generation: {}, inner_expr: {:?}, is_complete: {} }}", - self.children, - self.remapped_children, - self.expression_id, - self.generation, - self.inner_expr, - self.is_complete - ) - } -} - -impl From for DynamicFilterPhysicalExpr { - fn from(snapshot: DynamicFilterSnapshot) -> Self { - let DynamicFilterSnapshot { - children, - remapped_children, - expression_id, - generation, - inner_expr, - is_complete, - } = snapshot; - - let state = if is_complete { - FilterState::Complete { generation } - } else { - FilterState::InProgress { generation } - }; - let (state_watch, _) = watch::channel(state); - - Self { - children, - remapped_children, - expression_id, - inner: Arc::new(RwLock::new(Inner { - generation, - expr: inner_expr, - is_complete, - })), - state_watch, - data_type: Arc::new(RwLock::new(None)), - nullable: Arc::new(RwLock::new(None)), - } - } -} - -impl From<&DynamicFilterPhysicalExpr> for DynamicFilterSnapshot { - fn from(expr: &DynamicFilterPhysicalExpr) -> Self { - // Snapshot everything in the mutex atomically. - let (generation, inner_expr, is_complete) = { - let inner = expr.inner.read(); - (inner.generation, Arc::clone(&inner.expr), inner.is_complete) - }; - DynamicFilterSnapshot { - children: expr.children.clone(), - remapped_children: expr.remapped_children.clone(), - expression_id: expr.expression_id, - generation, - inner_expr, - is_complete, - } - } -} - impl Inner { fn new(expr: Arc) -> Self { Self { @@ -495,9 +377,67 @@ impl DynamicFilterPhysicalExpr { } /// Generate a new expression id for this filter. - pub fn new_expression_id() -> u64 { + fn new_expression_id() -> u64 { random::() } + + /// Return the filter's original children (before any remapping). + pub fn original_children(&self) -> &[Arc] { + &self.children + } + + /// Return the filter's remapped children, if any have been set via + /// [`PhysicalExpr::with_new_children`]. + pub fn remapped_children(&self) -> Option<&[Arc]> { + self.remapped_children.as_deref() + } + + /// Rebuild a `DynamicFilterPhysicalExpr` from its stored parts. Used by + /// proto deserialization to preserve `expression_id` across a roundtrip + /// rather than minting a fresh one. + /// + /// **Warning:** intended only for `datafusion-proto` (de)serialization. + /// Not a stable API. + pub fn from_parts( + expression_id: u64, + children: Vec>, + remapped_children: Option>>, + inner: Inner, + ) -> Self { + let state = if inner.is_complete { + FilterState::Complete { + generation: inner.generation, + } + } else { + FilterState::InProgress { + generation: inner.generation, + } + }; + let (state_watch, _) = watch::channel(state); + + Self { + children, + remapped_children, + expression_id, + inner: Arc::new(RwLock::new(inner)), + state_watch, + data_type: Arc::new(RwLock::new(None)), + nullable: Arc::new(RwLock::new(None)), + } + } + + /// Return a clone of the atomically-captured `Inner` state. + /// + /// **Warning:** intended only for `datafusion-proto` (de)serialization. + /// Not a stable API. + pub fn inner(&self) -> Inner { + let guard = self.inner.read(); + Inner { + generation: guard.generation, + expr: Arc::clone(&guard.expr), + is_complete: guard.is_complete, + } + } } impl PhysicalExpr for DynamicFilterPhysicalExpr { @@ -1021,6 +961,10 @@ mod test { ); } + /// Verifies that reassigning (which triggers `with_new_children`) followed + /// by rebuilding via `from_parts` preserves the filter's observable state + /// (original children, remapped children, expression id, and inner + /// generation/expr/is_complete). #[test] fn test_current_snapshot_roundtrip() { let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); @@ -1057,14 +1001,37 @@ mod test { .downcast_ref::() .expect("Expected dynamic filter after reassignment"); - // Take a snapshot and reconstruct - let snapshot = DynamicFilterSnapshot::from(reassigned); - let reconstructed = DynamicFilterPhysicalExpr::from(snapshot); + // Capture the parts and reconstruct. + let reconstructed = DynamicFilterPhysicalExpr::from_parts( + reassigned.expression_id, + reassigned.original_children().to_vec(), + reassigned.remapped_children().map(|r| r.to_vec()), + reassigned.inner(), + ); - // Assert snapshots are equal. + // Assert the reconstructed filter matches field-by-field. + assert_eq!( + reassigned.original_children(), + reconstructed.original_children(), + ); + assert_eq!( + reassigned.remapped_children(), + reconstructed.remapped_children(), + ); + assert_eq!( + reassigned.expression_id().unwrap(), + reconstructed.expression_id().unwrap(), + ); + let reassigned_inner = reassigned.inner(); + let reconstructed_inner = reconstructed.inner(); + assert_eq!(reassigned_inner.generation, reconstructed_inner.generation); + assert_eq!( + reassigned_inner.is_complete, + reconstructed_inner.is_complete + ); assert_eq!( - DynamicFilterSnapshot::from(reassigned).to_string(), - DynamicFilterSnapshot::from(&reconstructed).to_string(), + format!("{:?}", reassigned_inner.expr), + format!("{:?}", reconstructed_inner.expr), ); } diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index b40848828fa41..104afbb194ed1 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -43,7 +43,7 @@ pub use case::{CaseExpr, case}; pub use cast::{CastExpr, cast}; pub use column::{Column, col, with_new_schema}; pub use datafusion_expr::utils::format_state_name; -pub use dynamic_filters::{DynamicFilterPhysicalExpr, DynamicFilterSnapshot}; +pub use dynamic_filters::{DynamicFilterPhysicalExpr, Inner as DynamicFilterInner}; pub use in_list::{InListExpr, in_list}; pub use is_not_null::{IsNotNullExpr, is_not_null}; pub use is_null::{IsNullExpr, is_null}; diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index f0cba40a5ae31..6fccaeb8f738f 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -875,8 +875,11 @@ message PhysicalExprNode { // Was date_time_interval_expr reserved 17; - // Was expr_id - reserved 30; + // Unique identifier for this expression to do deduplication during deserialization. + // When serializing, this is set via `PhysicalExpr::expression_id`. When deserializing, + // this id is used by the `DeduplicatingProtoConverter` to preserve referential + // integrity across serde roundtrips for different expressions with the same id. + optional uint64 expr_id = 30; oneof ExprType { // column references @@ -926,7 +929,6 @@ message PhysicalDynamicFilterNode { uint64 generation = 3; PhysicalExprNode inner_expr = 4; bool is_complete = 5; - uint64 expression_id = 6; } message PhysicalScalarUdfNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 5db47a2ecff4f..b04c0af6468b7 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -16554,9 +16554,6 @@ impl serde::Serialize for PhysicalDynamicFilterNode { if self.is_complete { len += 1; } - if self.expression_id != 0 { - len += 1; - } let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalDynamicFilterNode", len)?; if !self.children.is_empty() { struct_ser.serialize_field("children", &self.children)?; @@ -16575,11 +16572,6 @@ impl serde::Serialize for PhysicalDynamicFilterNode { if self.is_complete { struct_ser.serialize_field("isComplete", &self.is_complete)?; } - if self.expression_id != 0 { - #[allow(clippy::needless_borrow)] - #[allow(clippy::needless_borrows_for_generic_args)] - struct_ser.serialize_field("expressionId", ToString::to_string(&self.expression_id).as_str())?; - } struct_ser.end() } } @@ -16598,8 +16590,6 @@ impl<'de> serde::Deserialize<'de> for PhysicalDynamicFilterNode { "innerExpr", "is_complete", "isComplete", - "expression_id", - "expressionId", ]; #[allow(clippy::enum_variant_names)] @@ -16609,7 +16599,6 @@ impl<'de> serde::Deserialize<'de> for PhysicalDynamicFilterNode { Generation, InnerExpr, IsComplete, - ExpressionId, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -16636,7 +16625,6 @@ impl<'de> serde::Deserialize<'de> for PhysicalDynamicFilterNode { "generation" => Ok(GeneratedField::Generation), "innerExpr" | "inner_expr" => Ok(GeneratedField::InnerExpr), "isComplete" | "is_complete" => Ok(GeneratedField::IsComplete), - "expressionId" | "expression_id" => Ok(GeneratedField::ExpressionId), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -16661,7 +16649,6 @@ impl<'de> serde::Deserialize<'de> for PhysicalDynamicFilterNode { let mut generation__ = None; let mut inner_expr__ = None; let mut is_complete__ = None; - let mut expression_id__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Children => { @@ -16696,14 +16683,6 @@ impl<'de> serde::Deserialize<'de> for PhysicalDynamicFilterNode { } is_complete__ = Some(map_.next_value()?); } - GeneratedField::ExpressionId => { - if expression_id__.is_some() { - return Err(serde::de::Error::duplicate_field("expressionId")); - } - expression_id__ = - Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) - ; - } } } Ok(PhysicalDynamicFilterNode { @@ -16712,7 +16691,6 @@ impl<'de> serde::Deserialize<'de> for PhysicalDynamicFilterNode { generation: generation__.unwrap_or_default(), inner_expr: inner_expr__, is_complete: is_complete__.unwrap_or_default(), - expression_id: expression_id__.unwrap_or_default(), }) } } @@ -16727,10 +16705,18 @@ impl serde::Serialize for PhysicalExprNode { { use serde::ser::SerializeStruct; let mut len = 0; + if self.expr_id.is_some() { + len += 1; + } if self.expr_type.is_some() { len += 1; } let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalExprNode", len)?; + if let Some(v) = self.expr_id.as_ref() { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("exprId", ToString::to_string(&v).as_str())?; + } if let Some(v) = self.expr_type.as_ref() { match v { physical_expr_node::ExprType::Column(v) => { @@ -16805,6 +16791,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { D: serde::Deserializer<'de>, { const FIELDS: &[&str] = &[ + "expr_id", + "exprId", "column", "literal", "binary_expr", @@ -16843,6 +16831,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { #[allow(clippy::enum_variant_names)] enum GeneratedField { + ExprId, Column, Literal, BinaryExpr, @@ -16884,6 +16873,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { E: serde::de::Error, { match value { + "exprId" | "expr_id" => Ok(GeneratedField::ExprId), "column" => Ok(GeneratedField::Column), "literal" => Ok(GeneratedField::Literal), "binaryExpr" | "binary_expr" => Ok(GeneratedField::BinaryExpr), @@ -16923,9 +16913,18 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { where V: serde::de::MapAccess<'de>, { + let mut expr_id__ = None; let mut expr_type__ = None; while let Some(k) = map_.next_key()? { match k { + GeneratedField::ExprId => { + if expr_id__.is_some() { + return Err(serde::de::Error::duplicate_field("exprId")); + } + expr_id__ = + map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) + ; + } GeneratedField::Column => { if expr_type__.is_some() { return Err(serde::de::Error::duplicate_field("column")); @@ -17069,6 +17068,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { } } Ok(PhysicalExprNode { + expr_id: expr_id__, expr_type: expr_type__, }) } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 243aa187098fa..0ab2c539c553e 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1307,6 +1307,12 @@ pub struct PhysicalExtensionNode { /// physical expressions #[derive(Clone, PartialEq, ::prost::Message)] pub struct PhysicalExprNode { + /// Unique identifier for this expression to do deduplication during deserialization. + /// When serializing, this is set via `PhysicalExpr::expression_id`. When deserializing, + /// this id is used by the `DeduplicatingProtoConverter` to preserve referential + /// integrity across serde roundtrips for different expressions with the same id. + #[prost(uint64, optional, tag = "30")] + pub expr_id: ::core::option::Option, #[prost( oneof = "physical_expr_node::ExprType", tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20, 21, 22" @@ -1378,8 +1384,6 @@ pub struct PhysicalDynamicFilterNode { pub inner_expr: ::core::option::Option<::prost::alloc::boxed::Box>, #[prost(bool, tag = "5")] pub is_complete: bool, - #[prost(uint64, tag = "6")] - pub expression_id: u64, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct PhysicalScalarUdfNode { diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 69ab9614dc022..dc58eaeccd305 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -59,7 +59,7 @@ use crate::logical_plan::{self}; use crate::protobuf::physical_expr_node::ExprType; use crate::{convert_required, protobuf}; use datafusion_physical_expr::expressions::{ - DynamicFilterPhysicalExpr, DynamicFilterSnapshot, + DynamicFilterInner, DynamicFilterPhysicalExpr, }; impl From<&protobuf::PhysicalColumn> for Column { @@ -557,19 +557,24 @@ pub fn parse_physical_expr_with_converter( proto_converter, )?; - // Recreate filter from snapshot. expression_id is a required wire - // field — we never generate a fresh id on decode, so multiple - // occurrences of the same dynamic filter retain their shared identity. - let snapshot = DynamicFilterSnapshot::new( - dynamic_filter.expression_id, - children, - remapped_children, - dynamic_filter.generation, - inner_expr, - dynamic_filter.is_complete, - ); + let expression_id = proto.expr_id.ok_or_else(|| { + proto_error( + "DynamicFilterPhysicalExpr requires PhysicalExprNode.expr_id \ + to be set by the serializer", + ) + })?; + let base_filter: Arc = - Arc::new(DynamicFilterPhysicalExpr::from(snapshot)); + Arc::new(DynamicFilterPhysicalExpr::from_parts( + expression_id, + children, + remapped_children, + DynamicFilterInner { + generation: dynamic_filter.generation, + expr: inner_expr, + is_complete: dynamic_filter.is_complete, + }, + )); base_filter } ExprType::Extension(extension) => { diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index f807847113f58..40f8178869609 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -3096,6 +3096,7 @@ impl protobuf::PhysicalPlanNode { nulls_first: expr.options.nulls_first, }); Ok(protobuf::PhysicalExprNode { + expr_id: None, expr_type: Some(ExprType::Sort(sort_expr)), }) }) @@ -3181,6 +3182,7 @@ impl protobuf::PhysicalPlanNode { nulls_first: expr.options.nulls_first, }); Ok(protobuf::PhysicalExprNode { + expr_id: None, expr_type: Some(ExprType::Sort(sort_expr)), }) }) @@ -3876,14 +3878,31 @@ impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { where Self: Sized, { + // `expr_id` is the generic identity slot on `PhysicalExprNode`. + // The default serializer populates it from `PhysicalExpr::expression_id` + // A missing id means this expression type doesn't participate in deduping. + let Some(id) = proto.expr_id else { + return parse_physical_expr_with_converter( + proto, + ctx, + input_schema, + codec, + self, + ); + }; + let parsed = parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self)?; - let Some(id) = parsed.expression_id() else { - return Ok(parsed); - }; let mut cache = self.cache.borrow_mut(); if let Some(cached) = cache.get(&id) { + // Since expressions may manage their own internal state when deriving + // expressions via `with_new_children`, we use `with_new_children` + // to opt into the same behavior. + // + // For example, one `DynamicFilterPhysicalExpr` may be derived from + // another resulting in shared references. Using `with_new_children` + // is meant to preserve those references. let children: Vec<_> = parsed.children().into_iter().cloned().collect(); return Arc::clone(cached).with_new_children(children); } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 4baf2ab8739c7..26309a7c9ce8e 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -35,9 +35,9 @@ use datafusion_physical_expr::ScalarFunctionExpr; use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::{ - BinaryExpr, CaseExpr, CastExpr, Column, DynamicFilterPhysicalExpr, - DynamicFilterSnapshot, InListExpr, IsNotNullExpr, IsNullExpr, LikeExpr, Literal, - NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, + BinaryExpr, CaseExpr, CastExpr, Column, DynamicFilterPhysicalExpr, InListExpr, + IsNotNullExpr, IsNullExpr, LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, + UnKnownColumn, }; use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr}; use datafusion_physical_plan::udaf::AggregateFunctionExpr; @@ -71,6 +71,7 @@ pub fn serialize_physical_aggr_expr( let mut buf = Vec::new(); codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?; Ok(protobuf::PhysicalExprNode { + expr_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr( protobuf::PhysicalAggregateExprNode { aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)), @@ -257,6 +258,7 @@ pub fn serialize_physical_expr_with_converter( proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result { let expr = value.as_ref(); + let expr_id = value.expression_id(); // HashTableLookupExpr is used for dynamic filter pushdown in hash joins. // It contains an Arc (the build-side hash table) which // cannot be serialized - the hash table is a runtime structure built during @@ -277,12 +279,14 @@ pub fn serialize_physical_expr_with_converter( )), }; return Ok(protobuf::PhysicalExprNode { + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(value)), }); } if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::Column( protobuf::PhysicalColumn { name: expr.name().to_string(), @@ -292,6 +296,7 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn( protobuf::UnknownColumn { name: expr.name().to_string(), @@ -333,12 +338,14 @@ pub fn serialize_physical_expr_with_converter( }); Ok(protobuf::PhysicalExprNode { + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr( binary_expr, )), }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { + expr_id, expr_type: Some( protobuf::physical_expr_node::ExprType::Case( Box::new( @@ -381,6 +388,7 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new( protobuf::PhysicalNot { expr: Some(Box::new( @@ -391,6 +399,7 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr( Box::new(protobuf::PhysicalIsNull { expr: Some(Box::new( @@ -401,6 +410,7 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr( Box::new(protobuf::PhysicalIsNotNull { expr: Some(Box::new( @@ -411,6 +421,7 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::InList(Box::new( protobuf::PhysicalInListNode { expr: Some(Box::new( @@ -423,6 +434,7 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::Negative(Box::new( protobuf::PhysicalNegativeNode { expr: Some(Box::new( @@ -433,12 +445,14 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(lit) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::Literal( lit.value().try_into()?, )), }) } else if let Some(cast) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::Cast(Box::new( protobuf::PhysicalCastNode { expr: Some(Box::new( @@ -450,6 +464,7 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(cast) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new( protobuf::PhysicalTryCastNode { expr: Some(Box::new( @@ -463,6 +478,7 @@ pub fn serialize_physical_expr_with_converter( let mut buf = Vec::new(); codec.try_encode_udf(expr.fun(), &mut buf)?; Ok(protobuf::PhysicalExprNode { + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::ScalarUdf( protobuf::PhysicalScalarUdfNode { name: expr.name().to_string(), @@ -479,6 +495,7 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::LikeExpr(Box::new( protobuf::PhysicalLikeExprNode { negated: expr.negated(), @@ -494,6 +511,7 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::HashExpr( protobuf::PhysicalHashExprNode { on_columns: serialize_physical_exprs( @@ -507,16 +525,13 @@ pub fn serialize_physical_expr_with_converter( )), }) } else if let Some(df) = expr.downcast_ref::() { - // Capture all state atomically. - let snapshot = DynamicFilterSnapshot::from(df); - - let children = snapshot - .children() + let children = df + .original_children() .iter() .map(|child| proto_converter.physical_expr_to_proto(child, codec)) .collect::>>()?; - let remapped_children = if let Some(remapped) = snapshot.remapped_children() { + let remapped_children = if let Some(remapped) = df.remapped_children() { remapped .iter() .map(|child| proto_converter.physical_expr_to_proto(child, codec)) @@ -525,19 +540,20 @@ pub fn serialize_physical_expr_with_converter( vec![] }; - let inner_expr = Box::new( - proto_converter.physical_expr_to_proto(snapshot.inner_expr(), codec)?, - ); + // Atomic snapshot of inner state. + let inner = df.inner(); + let inner_expr = + Box::new(proto_converter.physical_expr_to_proto(&inner.expr, codec)?); Ok(protobuf::PhysicalExprNode { + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::DynamicFilter( Box::new(protobuf::PhysicalDynamicFilterNode { children, remapped_children, - generation: snapshot.generation(), + generation: inner.generation, inner_expr: Some(inner_expr), - is_complete: snapshot.is_complete(), - expression_id: snapshot.expression_id(), + is_complete: inner.is_complete, }), )), }) @@ -551,6 +567,7 @@ pub fn serialize_physical_expr_with_converter( .map(|e| proto_converter.physical_expr_to_proto(e, codec)) .collect::>()?; Ok(protobuf::PhysicalExprNode { + expr_id, expr_type: Some(protobuf::physical_expr_node::ExprType::Extension( protobuf::PhysicalExtensionExprNode { expr: buf, inputs }, )), diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 8ef8fc90287e9..80a9e30f59594 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -128,9 +128,7 @@ use crate::cases::{ CustomUDWF, CustomUDWFNode, MyAggregateUDF, MyAggregateUdfNode, MyRegexUdf, MyRegexUdfNode, }; -use datafusion_physical_expr::expressions::{ - DynamicFilterPhysicalExpr, DynamicFilterSnapshot, -}; +use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; use datafusion_physical_expr::utils::reassign_expr_columns; /// Perform a serde roundtrip and assert that the string representation of the before and after plans @@ -2707,15 +2705,11 @@ fn make_reassigned_dynamic_filter( Ok((schema, reassigned)) } -/// Extract an expression id from a [`PhysicalExpr`] proto. Only dynamic filter -/// protos carry an expression id. +/// Extract the expression id from a [`PhysicalExpr`] proto. Populated by the +/// default serializer from `PhysicalExpr::expression_id`. fn proto_expression_id(expr: &PhysicalExprNode) -> u64 { - match expr.expr_type.as_ref() { - Some( - datafusion_proto::protobuf::physical_expr_node::ExprType::DynamicFilter(df), - ) => df.expression_id, - _ => panic!("expected PhysicalExprNode wrapping a DynamicFilter"), - } + expr.expr_id + .expect("expected PhysicalExprNode.expr_id to be populated") } /// Roundtrip a single physical expression shaped like so: @@ -2877,6 +2871,23 @@ fn assert_dynamic_filter_update_is_visible( Ok(()) } +/// Format the observable state of a `DynamicFilterPhysicalExpr` for +/// equality comparison in roundtrip tests. +fn fmt_dynamic_filter(filter: &DynamicFilterPhysicalExpr) -> String { + let inner = filter.inner(); + format!( + "DynamicFilter {{ children: {:?}, remapped_children: {:?}, expression_id: {}, generation: {}, inner_expr: {:?}, is_complete: {} }}", + filter.original_children(), + filter.remapped_children(), + filter + .expression_id() + .expect("DynamicFilterPhysicalExpr always has an expression_id"), + inner.generation, + inner.expr, + inner.is_complete, + ) +} + /// Assert that the actual dynamic filter snapshot matches the expected one /// after roundtrip. fn assert_dynamic_filter_snapshot_matches( @@ -2890,8 +2901,8 @@ fn assert_dynamic_filter_snapshot_matches( .downcast_ref::() .expect("Expected dynamic filter"); - let actual_snapshot = DynamicFilterSnapshot::from(actual_filter).to_string(); - let expected_snapshot = DynamicFilterSnapshot::from(expected_filter).to_string(); + let actual_snapshot = fmt_dynamic_filter(actual_filter); + let expected_snapshot = fmt_dynamic_filter(expected_filter); if expected_snapshot == actual_snapshot { return; } @@ -2902,12 +2913,11 @@ fn assert_dynamic_filter_snapshot_matches( let rewritten = Arc::clone(expected) .with_new_children(expected.children().iter().map(|c| Arc::clone(c)).collect()) .expect("with_new_children on a dynamic filter should not fail"); - let rewritten_snapshot = DynamicFilterSnapshot::from( + let rewritten_snapshot = fmt_dynamic_filter( rewritten .downcast_ref::() .expect("with_new_children returns a DynamicFilterPhysicalExpr"), - ) - .to_string(); + ); assert_eq!(rewritten_snapshot, actual_snapshot); } From f6c34101d59042a48133f73123e35ce0dde1fd22 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Fri, 24 Apr 2026 19:36:32 +0000 Subject: [PATCH 4/6] address feedback --- .../src/expressions/dynamic_filters.rs | 45 +++++------ .../tests/cases/roundtrip_physical_plan.rs | 75 ++++++------------- 2 files changed, 39 insertions(+), 81 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index c17288927c70a..83293df669fa5 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -961,12 +961,12 @@ mod test { ); } - /// Verifies that reassigning (which triggers `with_new_children`) followed - /// by rebuilding via `from_parts` preserves the filter's observable state - /// (original children, remapped children, expression id, and inner - /// generation/expr/is_complete). + /// Verifies that `from_parts` rebuilds a `DynamicFilterPhysicalExpr` + /// whose observable state (original children, remapped children, + /// expression id, inner generation/expr/is_complete) matches the source + /// filter. #[test] - fn test_current_snapshot_roundtrip() { + fn test_from_parts_preserves_state() { let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); let col_a = col("a", &schema).unwrap(); @@ -981,13 +981,7 @@ mod test { expr as Arc, ); - // Update expression and mark complete - filter - .update(lit(42) as Arc) - .expect("Update should succeed"); - filter.mark_complete(); - - // Change the children of the expr. + // Add remapped children. let reassigned_schema = Arc::new(Schema::new(vec![ Field::new("b", DataType::Int32, false), Field::new("a", DataType::Int32, false), @@ -1001,6 +995,11 @@ mod test { .downcast_ref::() .expect("Expected dynamic filter after reassignment"); + reassigned + .update(lit(42) as Arc) + .expect("Update should succeed"); + reassigned.mark_complete(); + // Capture the parts and reconstruct. let reconstructed = DynamicFilterPhysicalExpr::from_parts( reassigned.expression_id, @@ -1009,7 +1008,6 @@ mod test { reassigned.inner(), ); - // Assert the reconstructed filter matches field-by-field. assert_eq!( reassigned.original_children(), reconstructed.original_children(), @@ -1018,21 +1016,12 @@ mod test { reassigned.remapped_children(), reconstructed.remapped_children(), ); - assert_eq!( - reassigned.expression_id().unwrap(), - reconstructed.expression_id().unwrap(), - ); - let reassigned_inner = reassigned.inner(); - let reconstructed_inner = reconstructed.inner(); - assert_eq!(reassigned_inner.generation, reconstructed_inner.generation); - assert_eq!( - reassigned_inner.is_complete, - reconstructed_inner.is_complete - ); - assert_eq!( - format!("{:?}", reassigned_inner.expr), - format!("{:?}", reconstructed_inner.expr), - ); + assert_eq!(reassigned.expression_id(), reconstructed.expression_id()); + let r = reassigned.inner(); + let c = reconstructed.inner(); + assert_eq!(r.generation, c.generation); + assert_eq!(r.is_complete, c.is_complete); + assert_eq!(format!("{:?}", r.expr), format!("{:?}", c.expr)); } #[tokio::test] diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 80a9e30f59594..96e7b79da501c 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -2871,39 +2871,25 @@ fn assert_dynamic_filter_update_is_visible( Ok(()) } -/// Format the observable state of a `DynamicFilterPhysicalExpr` for -/// equality comparison in roundtrip tests. -fn fmt_dynamic_filter(filter: &DynamicFilterPhysicalExpr) -> String { - let inner = filter.inner(); - format!( - "DynamicFilter {{ children: {:?}, remapped_children: {:?}, expression_id: {}, generation: {}, inner_expr: {:?}, is_complete: {} }}", - filter.original_children(), - filter.remapped_children(), - filter - .expression_id() - .expect("DynamicFilterPhysicalExpr always has an expression_id"), - inner.generation, - inner.expr, - inner.is_complete, - ) -} - -/// Assert that the actual dynamic filter snapshot matches the expected one -/// after roundtrip. -fn assert_dynamic_filter_snapshot_matches( +/// Assert that two dynamic filters are equal both structurally (Debug output) +/// and by identity (`expression_id`). +/// +fn assert_dynamic_filters_equal( expected: &Arc, actual: &Arc, ) { - let expected_filter = expected - .downcast_ref::() - .expect("Expected dynamic filter"); - let actual_filter = actual - .downcast_ref::() - .expect("Expected dynamic filter"); - - let actual_snapshot = fmt_dynamic_filter(actual_filter); - let expected_snapshot = fmt_dynamic_filter(expected_filter); - if expected_snapshot == actual_snapshot { + // TODO: Debug currently omits `expression_id` so the id has to be checked + // separately here. Once plan nodes like `SortExec` / `AggregateExec` / + // `HashJoinExec` serialize their own dynamic filter, Debug can include + // `expression_id`. + // + // See https://github.com/apache/datafusion/issues/20418 + assert_eq!(expected.expression_id(), actual.expression_id()); + + // Structural. + let expected_dbg = format!("{expected:?}"); + let actual_dbg = format!("{actual:?}"); + if expected_dbg == actual_dbg { return; } @@ -2913,12 +2899,7 @@ fn assert_dynamic_filter_snapshot_matches( let rewritten = Arc::clone(expected) .with_new_children(expected.children().iter().map(|c| Arc::clone(c)).collect()) .expect("with_new_children on a dynamic filter should not fail"); - let rewritten_snapshot = fmt_dynamic_filter( - rewritten - .downcast_ref::() - .expect("with_new_children returns a DynamicFilterPhysicalExpr"), - ); - assert_eq!(rewritten_snapshot, actual_snapshot); + assert_eq!(format!("{rewritten:?}"), actual_dbg); } // Two clones of a dynamic filter expression should be deduped to the exact same expression. @@ -2936,15 +2917,9 @@ fn test_dynamic_filter_roundtrip_dedupe() -> Result<()> { )?; // Assert the filters are not modified during roundtrip. - assert_dynamic_filter_snapshot_matches( - &filter_expr_1, - &filter_expr_1_after_roundtrip, - ); - assert_dynamic_filter_snapshot_matches( - &filter_expr_2, - &filter_expr_2_after_roundtrip, - ); - assert_dynamic_filter_snapshot_matches( + assert_dynamic_filters_equal(&filter_expr_1, &filter_expr_1_after_roundtrip); + assert_dynamic_filters_equal(&filter_expr_2, &filter_expr_2_after_roundtrip); + assert_dynamic_filters_equal( &filter_expr_1_after_roundtrip, &filter_expr_2_after_roundtrip, ); @@ -2970,14 +2945,8 @@ fn test_dynamic_filter_plan_roundtrip_dedupe() -> Result<()> { ) = roundtrip_dynamic_filter_plan_pair()?; // Assert the filters are not modified during roundtrip. - assert_dynamic_filter_snapshot_matches( - &filter_expr_1, - &filter_expr_1_after_roundtrip, - ); - assert_dynamic_filter_snapshot_matches( - &filter_expr_2, - &filter_expr_2_after_roundtrip, - ); + assert_dynamic_filters_equal(&filter_expr_1, &filter_expr_1_after_roundtrip); + assert_dynamic_filters_equal(&filter_expr_2, &filter_expr_2_after_roundtrip); // Assert referential integrity. assert_dynamic_filter_update_is_visible( From 09c2d342b0af9deefa74820106f41b34d4ef3978 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Fri, 24 Apr 2026 19:46:11 +0000 Subject: [PATCH 5/6] mark children apis as unstable --- datafusion/physical-expr/src/expressions/dynamic_filters.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 83293df669fa5..6b670a8800db3 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -382,12 +382,18 @@ impl DynamicFilterPhysicalExpr { } /// Return the filter's original children (before any remapping). + /// + /// **Warning:** intended only for `datafusion-proto` (de)serialization. + /// Not a stable API. pub fn original_children(&self) -> &[Arc] { &self.children } /// Return the filter's remapped children, if any have been set via /// [`PhysicalExpr::with_new_children`]. + /// + /// **Warning:** intended only for `datafusion-proto` (de)serialization. + /// Not a stable API. pub fn remapped_children(&self) -> Option<&[Arc]> { self.remapped_children.as_deref() } From 1e0ec7b06cd3734b30f8fc6c44eef230d1e353d4 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Fri, 24 Apr 2026 19:36:58 -0400 Subject: [PATCH 6/6] Update datafusion/physical-expr-common/src/physical_expr.rs Co-authored-by: Stu Hood --- datafusion/physical-expr-common/src/physical_expr.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 311941aedca3d..f03b4f2649981 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -445,7 +445,7 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { /// Return a stable, globally-unique identifier for this [`PhysicalExpr`], if it /// has one. /// - /// This identifier tracks which expressions which are connected (e.g. `DynamicFilterPhysicalExpr` + /// This identifier tracks expressions which are connected (e.g. `DynamicFilterPhysicalExpr` /// where two expressions may be different but store the same mutable inner state). Tracking /// connected expressions helps preserve referential integrity within plan nodes /// during serialization and deserialization.