Skip to content

Commit c042c08

Browse files
proto: serialize and dedupe dynamic filters
Informs: datafusion-contrib/datafusion-distributed#180 Closes: #20418 Consider you have a plan with a `HashJoinExec` and `DataSourceExec` ``` HashJoinExec(dynamic_filter_1 on a@0) (...left side of join) ProjectionExec(a := Column("a", source_index)) DataSourceExec ParquetSource(predicate = dynamic_filter_2) ``` You serialize the plan, deserialize it, and execute it. What should happen is that the dynamic filter should "work", meaning: 1. When you deserialize the plan, both the `HashJoinExec` and `DataSourceExec` should have pointers to the same `DynamicFilterPhysicalExpr` 2. The `DynamicFilterPhysicalExpr` should be updated during execution by the `HashJoinExec` and the `DataSourceExec` should filter out rows This does not happen today for a few reasons, a couple of which this PR aims to address 1. `DynamicFilterPhysicalExpr` is not survive round-tripping. The internal exprs get inlined (ex. it may be serialized as `Literal`) due to the `PhysicalExpr::snapshot()` API 2. Even if `DynamicFilterPhysicalExpr` survives round-tripping, the one pushed down to the `DataSourceExec` often has different children. In this case, you have two `DynamicFilterPhysicalExpr` which do not survive deduping, causing referential integrity to be lost. This PR aims to fix those problems by: 1. Removing the `snapshot()` call from the serialization process 2. Adding protos for `DynamicFilterPhysicalExpr` so it can be serialized and deserialized 3. Removing `Arc`-based deduplication. We now only dedupe on `expression_id` if the `PhysicalExpr` reports a `expression_id`. After this change, only `DynamicFilterPhysicalExpr` reports an `expression_id` to be deduped. 4. `expression_id` is now just a random u64. Since a given query likely only has a few `DynamicFilterPhysicalExpr` instances, the odds of a collision are very low 5. There's no need for a `DedupingSerializer` anymore since the `expression_id` is already stored in the dynamic filter proto itself Testing - adds tests which roundtrip dynamic filters and assert that referential integrity is maintained - removes tests that test `Arc`-based deduplication and session id rotation
1 parent e7f7fa9 commit c042c08

13 files changed

Lines changed: 875 additions & 560 deletions

File tree

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/physical-expr-common/src/physical_expr.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,9 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
163163
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>>;
164164

165165
/// Returns a new PhysicalExpr where all children were replaced by new exprs.
166+
///
167+
/// If the implementation returns a [`PhysicalExpr::expression_id`], then
168+
/// the identifier should be preserved by the new expression.
166169
fn with_new_children(
167170
self: Arc<Self>,
168171
children: Vec<Arc<dyn PhysicalExpr>>,
@@ -438,6 +441,23 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
438441
fn placement(&self) -> ExpressionPlacement {
439442
ExpressionPlacement::KeepInPlace
440443
}
444+
445+
/// Return a stable, globally-unique identifier for this [`PhysicalExpr`], if it
446+
/// has one.
447+
///
448+
/// This identifier tracks which expressions which are connected (e.g. `DynamicFilterPhysicalExpr`
449+
/// where two expressions may be different but store the same mutable inner state). Tracking
450+
/// connected expressions helps preserve referential integrity within plan nodes
451+
/// during serialization and deserialization.
452+
///
453+
/// This id must be preserved across [`PhysicalExpr::with_new_children`] or any other
454+
/// methods which may want to preserve identity.
455+
///
456+
/// Default is `None`: the expression has no identity worth preserving across a
457+
/// serialization boundary.
458+
fn expression_id(&self) -> Option<u64> {
459+
None
460+
}
441461
}
442462

443463
#[deprecated(

datafusion/physical-expr/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ indexmap = { workspace = true }
5555
itertools = { workspace = true, features = ["use_std"] }
5656
parking_lot = { workspace = true }
5757
petgraph = "0.8.3"
58+
rand = { workspace = true }
5859
recursive = { workspace = true, optional = true }
5960
tokio = { workspace = true }
6061
half = { workspace = true }
@@ -64,7 +65,6 @@ arrow = { workspace = true, features = ["test_utils"] }
6465
criterion = { workspace = true }
6566
datafusion-functions = { workspace = true }
6667
insta = { workspace = true }
67-
rand = { workspace = true }
6868
rstest = { workspace = true }
6969

7070
[[bench]]

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 252 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use datafusion_common::{
2727
};
2828
use datafusion_expr::ColumnarValue;
2929
use datafusion_physical_expr_common::physical_expr::DynHash;
30+
use rand::random;
3031

3132
/// State of a dynamic filter, tracking both updates and completion.
3233
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -55,7 +56,6 @@ impl FilterState {
5556
/// For more background, please also see the [Dynamic Filters: Passing Information Between Operators During Execution for 25x Faster Queries blog]
5657
///
5758
/// [Dynamic Filters: Passing Information Between Operators During Execution for 25x Faster Queries blog]: https://datafusion.apache.org/blog/2025/09/10/dynamic-filters
58-
#[derive(Debug)]
5959
pub struct DynamicFilterPhysicalExpr {
6060
/// The original children of this PhysicalExpr, if any.
6161
/// This is necessary because the dynamic filter may be initialized with a placeholder (e.g. `lit(true)`)
@@ -65,6 +65,10 @@ pub struct DynamicFilterPhysicalExpr {
6565
/// If any of the children were remapped / modified (e.g. to adjust for projections) we need to keep track of the new children
6666
/// so that when we update `current()` in subsequent iterations we can re-apply the replacements.
6767
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
68+
/// Unique identifier for this dynamic filter.
69+
///
70+
/// Derived filters (ex. via `with_new_children`) should inherit the expression id of the source filter.
71+
expression_id: u64,
6872
/// The source of dynamic filters.
6973
inner: Arc<RwLock<Inner>>,
7074
/// Broadcasts filter state (updates and completion) to all waiters.
@@ -88,6 +92,148 @@ struct Inner {
8892
is_complete: bool,
8993
}
9094

95+
/// An atomic snapshot of a [`DynamicFilterPhysicalExpr`] used to reconstruct the expression during
96+
/// serialization / deserialization.
97+
pub struct DynamicFilterSnapshot {
98+
children: Vec<Arc<dyn PhysicalExpr>>,
99+
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
100+
expression_id: u64,
101+
// Inner state.
102+
generation: u64,
103+
inner_expr: Arc<dyn PhysicalExpr>,
104+
is_complete: bool,
105+
}
106+
107+
impl DynamicFilterSnapshot {
108+
pub fn new(
109+
expression_id: u64,
110+
children: Vec<Arc<dyn PhysicalExpr>>,
111+
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
112+
generation: u64,
113+
inner_expr: Arc<dyn PhysicalExpr>,
114+
is_complete: bool,
115+
) -> Self {
116+
Self {
117+
children,
118+
remapped_children,
119+
expression_id,
120+
generation,
121+
inner_expr,
122+
is_complete,
123+
}
124+
}
125+
126+
pub fn children(&self) -> &[Arc<dyn PhysicalExpr>] {
127+
&self.children
128+
}
129+
130+
pub fn expression_id(&self) -> u64 {
131+
self.expression_id
132+
}
133+
134+
pub fn remapped_children(&self) -> Option<&[Arc<dyn PhysicalExpr>]> {
135+
self.remapped_children.as_deref()
136+
}
137+
138+
pub fn generation(&self) -> u64 {
139+
self.generation
140+
}
141+
142+
pub fn inner_expr(&self) -> &Arc<dyn PhysicalExpr> {
143+
&self.inner_expr
144+
}
145+
146+
pub fn is_complete(&self) -> bool {
147+
self.is_complete
148+
}
149+
}
150+
151+
// TODO: Include expression_id in debug output.
152+
//
153+
// See https://github.com/apache/datafusion/issues/20418. Currently, plan nodes
154+
// like `HashJoinExec`, `AggregateExec`, `SortExec` do not serialize their
155+
// dynamic filter. This causes round trips to fail on the `expression_id`
156+
// because it is regenerated on deserialization.
157+
impl std::fmt::Debug for DynamicFilterPhysicalExpr {
158+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159+
f.debug_struct("DynamicFilterPhysicalExpr")
160+
.field("children", &self.children)
161+
.field("remapped_children", &self.remapped_children)
162+
.field("inner", &self.inner)
163+
.field("state_watch", &self.state_watch)
164+
.field("data_type", &self.data_type)
165+
.field("nullable", &self.nullable)
166+
.finish()
167+
}
168+
}
169+
170+
impl Display for DynamicFilterSnapshot {
171+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172+
write!(
173+
f,
174+
"DynamicFilterSnapshot {{ children: {:?}, remapped_children: {:?}, expression_id: {}, generation: {}, inner_expr: {:?}, is_complete: {} }}",
175+
self.children,
176+
self.remapped_children,
177+
self.expression_id,
178+
self.generation,
179+
self.inner_expr,
180+
self.is_complete
181+
)
182+
}
183+
}
184+
185+
impl From<DynamicFilterSnapshot> for DynamicFilterPhysicalExpr {
186+
fn from(snapshot: DynamicFilterSnapshot) -> Self {
187+
let DynamicFilterSnapshot {
188+
children,
189+
remapped_children,
190+
expression_id,
191+
generation,
192+
inner_expr,
193+
is_complete,
194+
} = snapshot;
195+
196+
let state = if is_complete {
197+
FilterState::Complete { generation }
198+
} else {
199+
FilterState::InProgress { generation }
200+
};
201+
let (state_watch, _) = watch::channel(state);
202+
203+
Self {
204+
children,
205+
remapped_children,
206+
expression_id,
207+
inner: Arc::new(RwLock::new(Inner {
208+
generation,
209+
expr: inner_expr,
210+
is_complete,
211+
})),
212+
state_watch,
213+
data_type: Arc::new(RwLock::new(None)),
214+
nullable: Arc::new(RwLock::new(None)),
215+
}
216+
}
217+
}
218+
219+
impl From<&DynamicFilterPhysicalExpr> for DynamicFilterSnapshot {
220+
fn from(expr: &DynamicFilterPhysicalExpr) -> Self {
221+
// Snapshot everything in the mutex atomically.
222+
let (generation, inner_expr, is_complete) = {
223+
let inner = expr.inner.read();
224+
(inner.generation, Arc::clone(&inner.expr), inner.is_complete)
225+
};
226+
DynamicFilterSnapshot {
227+
children: expr.children.clone(),
228+
remapped_children: expr.remapped_children.clone(),
229+
expression_id: expr.expression_id,
230+
generation,
231+
inner_expr,
232+
is_complete,
233+
}
234+
}
235+
}
236+
91237
impl Inner {
92238
fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
93239
Self {
@@ -173,6 +319,7 @@ impl DynamicFilterPhysicalExpr {
173319
Self {
174320
children,
175321
remapped_children: None, // Initially no remapped children
322+
expression_id: Self::new_expression_id(),
176323
inner: Arc::new(RwLock::new(Inner::new(inner))),
177324
state_watch,
178325
data_type: Arc::new(RwLock::new(None)),
@@ -346,6 +493,11 @@ impl DynamicFilterPhysicalExpr {
346493

347494
write!(f, " ]")
348495
}
496+
497+
/// Generate a new expression id for this filter.
498+
pub fn new_expression_id() -> u64 {
499+
random::<u64>()
500+
}
349501
}
350502

351503
impl PhysicalExpr for DynamicFilterPhysicalExpr {
@@ -364,6 +516,9 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
364516
Ok(Arc::new(Self {
365517
children: self.children.clone(),
366518
remapped_children: Some(children),
519+
// Note that we ensure the derived expression linked to `self`
520+
// via the unique identifier.
521+
expression_id: self.expression_id,
367522
inner: Arc::clone(&self.inner),
368523
state_watch: self.state_watch.clone(),
369524
data_type: Arc::clone(&self.data_type),
@@ -444,6 +599,10 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
444599
// Return the current generation of the expression.
445600
self.inner.read().generation
446601
}
602+
603+
fn expression_id(&self) -> Option<u64> {
604+
Some(self.expression_id)
605+
}
447606
}
448607

449608
#[cfg(test)]
@@ -861,4 +1020,96 @@ mod test {
8611020
"Hash should be stable after update (identity-based)"
8621021
);
8631022
}
1023+
1024+
#[test]
1025+
fn test_current_snapshot_roundtrip() {
1026+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1027+
let col_a = col("a", &schema).unwrap();
1028+
1029+
// Create a dynamic filter with children
1030+
let expr = Arc::new(BinaryExpr::new(
1031+
Arc::clone(&col_a),
1032+
datafusion_expr::Operator::Gt,
1033+
lit(10) as Arc<dyn PhysicalExpr>,
1034+
));
1035+
let filter = DynamicFilterPhysicalExpr::new(
1036+
vec![Arc::clone(&col_a)],
1037+
expr as Arc<dyn PhysicalExpr>,
1038+
);
1039+
1040+
// Update expression and mark complete
1041+
filter
1042+
.update(lit(42) as Arc<dyn PhysicalExpr>)
1043+
.expect("Update should succeed");
1044+
filter.mark_complete();
1045+
1046+
// Change the children of the expr.
1047+
let reassigned_schema = Arc::new(Schema::new(vec![
1048+
Field::new("b", DataType::Int32, false),
1049+
Field::new("a", DataType::Int32, false),
1050+
]));
1051+
let reassigned = reassign_expr_columns(
1052+
Arc::new(filter) as Arc<dyn PhysicalExpr>,
1053+
&reassigned_schema,
1054+
)
1055+
.expect("reassign_expr_columns should succeed");
1056+
let reassigned = reassigned
1057+
.downcast_ref::<DynamicFilterPhysicalExpr>()
1058+
.expect("Expected dynamic filter after reassignment");
1059+
1060+
// Take a snapshot and reconstruct
1061+
let snapshot = DynamicFilterSnapshot::from(reassigned);
1062+
let reconstructed = DynamicFilterPhysicalExpr::from(snapshot);
1063+
1064+
// Assert snapshots are equal.
1065+
assert_eq!(
1066+
DynamicFilterSnapshot::from(reassigned).to_string(),
1067+
DynamicFilterSnapshot::from(&reconstructed).to_string(),
1068+
);
1069+
}
1070+
1071+
#[tokio::test]
1072+
async fn test_expression_id() {
1073+
let source_schema =
1074+
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1075+
let col_a = col("a", &source_schema).unwrap();
1076+
1077+
// Create a source filter
1078+
let source = Arc::new(DynamicFilterPhysicalExpr::new(
1079+
vec![Arc::clone(&col_a)],
1080+
lit(true) as Arc<dyn PhysicalExpr>,
1081+
));
1082+
let source_clone = Arc::clone(&source);
1083+
1084+
// Create a derived filter by reassigning the source filter to a different schema.
1085+
let derived_schema = Arc::new(Schema::new(vec![
1086+
Field::new("x", DataType::Int32, false),
1087+
Field::new("a", DataType::Int32, false),
1088+
]));
1089+
let derived = reassign_expr_columns(
1090+
Arc::clone(&source) as Arc<dyn PhysicalExpr>,
1091+
&derived_schema,
1092+
)
1093+
.expect("reassign_expr_columns should succeed");
1094+
1095+
let derived_expression_id = derived
1096+
.expression_id()
1097+
.expect("derived filter should have an expression id");
1098+
let source_expression_id = source
1099+
.expression_id()
1100+
.expect("source filter should have an expression id");
1101+
let source_clone_expression_id = source_clone
1102+
.expression_id()
1103+
.expect("source clone should have an expression id");
1104+
1105+
assert_eq!(
1106+
source_clone_expression_id, source_expression_id,
1107+
"cloned filter should preserve its expression id",
1108+
);
1109+
1110+
assert_eq!(
1111+
derived_expression_id, source_expression_id,
1112+
"derived filters should carry forward the source expression id",
1113+
);
1114+
}
8641115
}

datafusion/physical-expr/src/expressions/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ pub use case::{CaseExpr, case};
4343
pub use cast::{CastExpr, cast};
4444
pub use column::{Column, col, with_new_schema};
4545
pub use datafusion_expr::utils::format_state_name;
46-
pub use dynamic_filters::DynamicFilterPhysicalExpr;
46+
pub use dynamic_filters::{DynamicFilterPhysicalExpr, DynamicFilterSnapshot};
4747
pub use in_list::{InListExpr, in_list};
4848
pub use is_not_null::{IsNotNullExpr, is_not_null};
4949
pub use is_null::{IsNullExpr, is_null};

datafusion/proto/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ datafusion-proto-common = { workspace = true }
6666
object_store = { workspace = true }
6767
pbjson = { workspace = true, optional = true }
6868
prost = { workspace = true }
69-
rand = { workspace = true }
7069
serde = { version = "1.0", optional = true }
7170
serde_json = { workspace = true, optional = true }
7271

0 commit comments

Comments
 (0)