Skip to content

Commit 99fd030

Browse files
proto: serialize and dedupe dynamic filters
Informs: datafusion-contrib/datafusion-distributed#180 Closes: #20418 Consider you have a plan with a `HashJoinExec` and `DataSourceExec` ``` HashJoinExec(dynamic_filter_1 on a@0) (...left side of join) ProjectionExec(a := Column("a", source_index)) DataSourceExec ParquetSource(predicate = dynamic_filter_2) ``` You serialize the plan, deserialize it, and execute it. What should happen is that the dynamic filter should "work", meaning: 1. When you deserialize the plan, both the `HashJoinExec` and `DataSourceExec` should have pointers to the same `DynamicFilterPhysicalExpr` 2. The `DynamicFilterPhysicalExpr` should be updated during execution by the `HashJoinExec` and the `DataSourceExec` should filter out rows This does not happen today for a few reasons, a couple of which this PR aims to address 1. `DynamicFilterPhysicalExpr` is not survive round-tripping. The internal exprs get inlined (ex. it may be serialized as `Literal`) due to the `PhysicalExpr::snapshot()` API 2. Even if `DynamicFilterPhysicalExpr` survives round-tripping, the one pushed down to the `DataSourceExec` often has different children. In this case, you have two `DynamicFilterPhysicalExpr` which do not survive deduping, causing referential integrity to be lost. This PR aims to fix those problems by: 1. Removing the `snapshot()` call from the serialization process 2. Adding protos for `DynamicFilterPhysicalExpr` so it can be serialized and deserialized 3. Adding a new concept, a `PhysicalExprId`, which has two identifiers, a "shallow" identifier to indicate two equal expressions which may have different children, and an "exact" identifier to indicate two exprs that are exactly the same. 4. Updating the deduping deserializer and protos to now be aware of the new "shallow" id, deduping exprs which are the same but have different children accordingly. This change adds tests which roundtrip dynamic filters and assert that referential integrity is maintained.
1 parent e7f7fa9 commit 99fd030

11 files changed

Lines changed: 875 additions & 558 deletions

File tree

datafusion/physical-expr-common/src/physical_expr.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,9 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
163163
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>>;
164164

165165
/// Returns a new PhysicalExpr where all children were replaced by new exprs.
166+
///
167+
/// If the implementation returns a [`PhysicalExpr::expression_id`], then
168+
/// the identifier should be preserved by the new expression.
166169
fn with_new_children(
167170
self: Arc<Self>,
168171
children: Vec<Arc<dyn PhysicalExpr>>,
@@ -438,6 +441,23 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
438441
fn placement(&self) -> ExpressionPlacement {
439442
ExpressionPlacement::KeepInPlace
440443
}
444+
445+
/// Return a stable, globally-unique identifier for this [`PhysicalExpr`], if it
446+
/// has one.
447+
///
448+
/// This identifier tracks which expressions which are connected (e.g. `DynamicFilterPhysicalExpr`
449+
/// where two expressions may be different but store the same mutable inner state). Tracking
450+
/// connected expressions helps preserve referential integrity within plan nodes
451+
/// during serialization and deserialization.
452+
///
453+
/// This id must be preserved across [`PhysicalExpr::with_new_children`] or any other
454+
/// methods which may want to preserve identity.
455+
///
456+
/// Default is `None`: the expression has no identity worth preserving across a
457+
/// serialization boundary.
458+
fn expression_id(&self) -> Option<u64> {
459+
None
460+
}
441461
}
442462

443463
#[deprecated(

datafusion/physical-expr/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ indexmap = { workspace = true }
5555
itertools = { workspace = true, features = ["use_std"] }
5656
parking_lot = { workspace = true }
5757
petgraph = "0.8.3"
58+
rand = { workspace = true }
5859
recursive = { workspace = true, optional = true }
5960
tokio = { workspace = true }
6061
half = { workspace = true }
@@ -64,7 +65,6 @@ arrow = { workspace = true, features = ["test_utils"] }
6465
criterion = { workspace = true }
6566
datafusion-functions = { workspace = true }
6667
insta = { workspace = true }
67-
rand = { workspace = true }
6868
rstest = { workspace = true }
6969

7070
[[bench]]

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 252 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use datafusion_common::{
2727
};
2828
use datafusion_expr::ColumnarValue;
2929
use datafusion_physical_expr_common::physical_expr::DynHash;
30+
use rand::random;
3031

3132
/// State of a dynamic filter, tracking both updates and completion.
3233
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -55,7 +56,6 @@ impl FilterState {
5556
/// For more background, please also see the [Dynamic Filters: Passing Information Between Operators During Execution for 25x Faster Queries blog]
5657
///
5758
/// [Dynamic Filters: Passing Information Between Operators During Execution for 25x Faster Queries blog]: https://datafusion.apache.org/blog/2025/09/10/dynamic-filters
58-
#[derive(Debug)]
5959
pub struct DynamicFilterPhysicalExpr {
6060
/// The original children of this PhysicalExpr, if any.
6161
/// This is necessary because the dynamic filter may be initialized with a placeholder (e.g. `lit(true)`)
@@ -65,6 +65,10 @@ pub struct DynamicFilterPhysicalExpr {
6565
/// If any of the children were remapped / modified (e.g. to adjust for projections) we need to keep track of the new children
6666
/// so that when we update `current()` in subsequent iterations we can re-apply the replacements.
6767
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
68+
/// Unique identifier for this dynamic filter.
69+
///
70+
/// Derived filters (ex. via `with_new_children`) should inherit the expression id of the source filter.
71+
expression_id: u64,
6872
/// The source of dynamic filters.
6973
inner: Arc<RwLock<Inner>>,
7074
/// Broadcasts filter state (updates and completion) to all waiters.
@@ -88,6 +92,148 @@ struct Inner {
8892
is_complete: bool,
8993
}
9094

95+
/// An atomic snapshot of a [`DynamicFilterPhysicalExpr`] used to reconstruct the expression during
96+
/// serialization / deserialization.
97+
pub struct DynamicFilterSnapshot {
98+
children: Vec<Arc<dyn PhysicalExpr>>,
99+
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
100+
expression_id: u64,
101+
// Inner state.
102+
generation: u64,
103+
inner_expr: Arc<dyn PhysicalExpr>,
104+
is_complete: bool,
105+
}
106+
107+
impl DynamicFilterSnapshot {
108+
pub fn new(
109+
expression_id: u64,
110+
children: Vec<Arc<dyn PhysicalExpr>>,
111+
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
112+
generation: u64,
113+
inner_expr: Arc<dyn PhysicalExpr>,
114+
is_complete: bool,
115+
) -> Self {
116+
Self {
117+
children,
118+
remapped_children,
119+
expression_id,
120+
generation,
121+
inner_expr,
122+
is_complete,
123+
}
124+
}
125+
126+
pub fn children(&self) -> &[Arc<dyn PhysicalExpr>] {
127+
&self.children
128+
}
129+
130+
pub fn expression_id(&self) -> u64 {
131+
self.expression_id
132+
}
133+
134+
pub fn remapped_children(&self) -> Option<&[Arc<dyn PhysicalExpr>]> {
135+
self.remapped_children.as_deref()
136+
}
137+
138+
pub fn generation(&self) -> u64 {
139+
self.generation
140+
}
141+
142+
pub fn inner_expr(&self) -> &Arc<dyn PhysicalExpr> {
143+
&self.inner_expr
144+
}
145+
146+
pub fn is_complete(&self) -> bool {
147+
self.is_complete
148+
}
149+
}
150+
151+
// TODO: Include expression_id in debug output.
152+
//
153+
// See https://github.com/apache/datafusion/issues/20418. Currently, plan nodes
154+
// like `HashJoinExec`, `AggregateExec`, `SortExec` do not serialize their
155+
// dynamic filter. This causes round trips to fail on the `expression_id`
156+
// because it is regenerated on deserialization.
157+
impl std::fmt::Debug for DynamicFilterPhysicalExpr {
158+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159+
f.debug_struct("DynamicFilterPhysicalExpr")
160+
.field("children", &self.children)
161+
.field("remapped_children", &self.remapped_children)
162+
.field("inner", &self.inner)
163+
.field("state_watch", &self.state_watch)
164+
.field("data_type", &self.data_type)
165+
.field("nullable", &self.nullable)
166+
.finish()
167+
}
168+
}
169+
170+
impl Display for DynamicFilterSnapshot {
171+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172+
write!(
173+
f,
174+
"DynamicFilterSnapshot {{ children: {:?}, remapped_children: {:?}, expression_id: {}, generation: {}, inner_expr: {:?}, is_complete: {} }}",
175+
self.children,
176+
self.remapped_children,
177+
self.expression_id,
178+
self.generation,
179+
self.inner_expr,
180+
self.is_complete
181+
)
182+
}
183+
}
184+
185+
impl From<DynamicFilterSnapshot> for DynamicFilterPhysicalExpr {
186+
fn from(snapshot: DynamicFilterSnapshot) -> Self {
187+
let DynamicFilterSnapshot {
188+
children,
189+
remapped_children,
190+
expression_id,
191+
generation,
192+
inner_expr,
193+
is_complete,
194+
} = snapshot;
195+
196+
let state = if is_complete {
197+
FilterState::Complete { generation }
198+
} else {
199+
FilterState::InProgress { generation }
200+
};
201+
let (state_watch, _) = watch::channel(state);
202+
203+
Self {
204+
children,
205+
remapped_children,
206+
expression_id,
207+
inner: Arc::new(RwLock::new(Inner {
208+
generation,
209+
expr: inner_expr,
210+
is_complete,
211+
})),
212+
state_watch,
213+
data_type: Arc::new(RwLock::new(None)),
214+
nullable: Arc::new(RwLock::new(None)),
215+
}
216+
}
217+
}
218+
219+
impl From<&DynamicFilterPhysicalExpr> for DynamicFilterSnapshot {
220+
fn from(expr: &DynamicFilterPhysicalExpr) -> Self {
221+
// Snapshot everything in the mutex atomically.
222+
let (generation, inner_expr, is_complete) = {
223+
let inner = expr.inner.read();
224+
(inner.generation, Arc::clone(&inner.expr), inner.is_complete)
225+
};
226+
DynamicFilterSnapshot {
227+
children: expr.children.clone(),
228+
remapped_children: expr.remapped_children.clone(),
229+
expression_id: expr.expression_id,
230+
generation,
231+
inner_expr,
232+
is_complete,
233+
}
234+
}
235+
}
236+
91237
impl Inner {
92238
fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
93239
Self {
@@ -173,6 +319,7 @@ impl DynamicFilterPhysicalExpr {
173319
Self {
174320
children,
175321
remapped_children: None, // Initially no remapped children
322+
expression_id: Self::new_expression_id(),
176323
inner: Arc::new(RwLock::new(Inner::new(inner))),
177324
state_watch,
178325
data_type: Arc::new(RwLock::new(None)),
@@ -346,6 +493,11 @@ impl DynamicFilterPhysicalExpr {
346493

347494
write!(f, " ]")
348495
}
496+
497+
/// Generate a new expression id for this filter.
498+
pub fn new_expression_id() -> u64 {
499+
random::<u64>()
500+
}
349501
}
350502

351503
impl PhysicalExpr for DynamicFilterPhysicalExpr {
@@ -364,6 +516,9 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
364516
Ok(Arc::new(Self {
365517
children: self.children.clone(),
366518
remapped_children: Some(children),
519+
// Note that we ensure the derived expression linked to `self`
520+
// via the unique identifier.
521+
expression_id: self.expression_id,
367522
inner: Arc::clone(&self.inner),
368523
state_watch: self.state_watch.clone(),
369524
data_type: Arc::clone(&self.data_type),
@@ -444,6 +599,10 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
444599
// Return the current generation of the expression.
445600
self.inner.read().generation
446601
}
602+
603+
fn expression_id(&self) -> Option<u64> {
604+
Some(self.expression_id)
605+
}
447606
}
448607

449608
#[cfg(test)]
@@ -861,4 +1020,96 @@ mod test {
8611020
"Hash should be stable after update (identity-based)"
8621021
);
8631022
}
1023+
1024+
#[test]
1025+
fn test_current_snapshot_roundtrip() {
1026+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1027+
let col_a = col("a", &schema).unwrap();
1028+
1029+
// Create a dynamic filter with children
1030+
let expr = Arc::new(BinaryExpr::new(
1031+
Arc::clone(&col_a),
1032+
datafusion_expr::Operator::Gt,
1033+
lit(10) as Arc<dyn PhysicalExpr>,
1034+
));
1035+
let filter = DynamicFilterPhysicalExpr::new(
1036+
vec![Arc::clone(&col_a)],
1037+
expr as Arc<dyn PhysicalExpr>,
1038+
);
1039+
1040+
// Update expression and mark complete
1041+
filter
1042+
.update(lit(42) as Arc<dyn PhysicalExpr>)
1043+
.expect("Update should succeed");
1044+
filter.mark_complete();
1045+
1046+
// Change the children of the expr.
1047+
let reassigned_schema = Arc::new(Schema::new(vec![
1048+
Field::new("b", DataType::Int32, false),
1049+
Field::new("a", DataType::Int32, false),
1050+
]));
1051+
let reassigned = reassign_expr_columns(
1052+
Arc::new(filter) as Arc<dyn PhysicalExpr>,
1053+
&reassigned_schema,
1054+
)
1055+
.expect("reassign_expr_columns should succeed");
1056+
let reassigned = reassigned
1057+
.downcast_ref::<DynamicFilterPhysicalExpr>()
1058+
.expect("Expected dynamic filter after reassignment");
1059+
1060+
// Take a snapshot and reconstruct
1061+
let snapshot = DynamicFilterSnapshot::from(reassigned);
1062+
let reconstructed = DynamicFilterPhysicalExpr::from(snapshot);
1063+
1064+
// Assert snapshots are equal.
1065+
assert_eq!(
1066+
DynamicFilterSnapshot::from(reassigned).to_string(),
1067+
DynamicFilterSnapshot::from(&reconstructed).to_string(),
1068+
);
1069+
}
1070+
1071+
#[tokio::test]
1072+
async fn test_expression_id() {
1073+
let source_schema =
1074+
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1075+
let col_a = col("a", &source_schema).unwrap();
1076+
1077+
// Create a source filter
1078+
let source = Arc::new(DynamicFilterPhysicalExpr::new(
1079+
vec![Arc::clone(&col_a)],
1080+
lit(true) as Arc<dyn PhysicalExpr>,
1081+
));
1082+
let source_clone = Arc::clone(&source);
1083+
1084+
// Create a derived filter by reassigning the source filter to a different schema.
1085+
let derived_schema = Arc::new(Schema::new(vec![
1086+
Field::new("x", DataType::Int32, false),
1087+
Field::new("a", DataType::Int32, false),
1088+
]));
1089+
let derived = reassign_expr_columns(
1090+
Arc::clone(&source) as Arc<dyn PhysicalExpr>,
1091+
&derived_schema,
1092+
)
1093+
.expect("reassign_expr_columns should succeed");
1094+
1095+
let derived_expression_id = derived
1096+
.expression_id()
1097+
.expect("derived filter should have an expression id");
1098+
let source_expression_id = source
1099+
.expression_id()
1100+
.expect("source filter should have an expression id");
1101+
let source_clone_expression_id = source_clone
1102+
.expression_id()
1103+
.expect("source clone should have an expression id");
1104+
1105+
assert_eq!(
1106+
source_clone_expression_id, source_expression_id,
1107+
"cloned filter should preserve its expression id",
1108+
);
1109+
1110+
assert_eq!(
1111+
derived_expression_id, source_expression_id,
1112+
"derived filters should carry forward the source expression id",
1113+
);
1114+
}
8641115
}

datafusion/physical-expr/src/expressions/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ pub use case::{CaseExpr, case};
4343
pub use cast::{CastExpr, cast};
4444
pub use column::{Column, col, with_new_schema};
4545
pub use datafusion_expr::utils::format_state_name;
46-
pub use dynamic_filters::DynamicFilterPhysicalExpr;
46+
pub use dynamic_filters::{DynamicFilterPhysicalExpr, DynamicFilterSnapshot};
4747
pub use in_list::{InListExpr, in_list};
4848
pub use is_not_null::{IsNotNullExpr, is_not_null};
4949
pub use is_null::{IsNullExpr, is_null};

0 commit comments

Comments
 (0)