Skip to content

Commit 004aa52

Browse files
wip
1 parent a4d45b5 commit 004aa52

9 files changed

Lines changed: 290 additions & 329 deletions

File tree

datafusion/physical-expr-common/src/physical_expr.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,9 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
163163
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>>;
164164

165165
/// Returns a new PhysicalExpr where all children were replaced by new exprs.
166+
///
167+
/// If the implementation returns a [`PhysicalExpr::expression_id`], then
168+
/// the identifier should be preserved by the new expression.
166169
fn with_new_children(
167170
self: Arc<Self>,
168171
children: Vec<Arc<dyn PhysicalExpr>>,
@@ -439,9 +442,18 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
439442
ExpressionPlacement::KeepInPlace
440443
}
441444

442-
/// Returns a process-local identifier used during serde round-trips to
443-
/// preserve referential integrity for special expressions like dynamic
444-
/// filters. Ordinary expressions return `None` by default.
445+
/// Return a stable, globally-unique identifier for this [`PhysicalExpr`], if it
446+
/// has one.
447+
///
448+
/// This identifier tracks which expressions which are connected (e.g. `DynamicFilterPhysicalExpr`
449+
/// where two expressions may be different but store the same mutable inner state). Tracking
450+
/// connected expressions helps preserve referential integrity within plan nodes
451+
/// during serialization and deserialization.
452+
///
453+
/// This id must be preserved across [`PhysicalExpr::with_new_children`].
454+
///
455+
/// Default is `None`: the expression has no identity worth preserving across a
456+
/// serialization boundary.
445457
fn expression_id(&self) -> Option<u64> {
446458
None
447459
}

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,7 @@
1616
// under the License.
1717

1818
use parking_lot::RwLock;
19-
use std::{
20-
fmt::Display,
21-
hash::Hash,
22-
sync::{
23-
Arc, LazyLock,
24-
atomic::{AtomicU64, Ordering},
25-
},
26-
};
19+
use std::{fmt::Display, hash::Hash, sync::Arc};
2720
use tokio::sync::watch;
2821

2922
use crate::PhysicalExpr;
@@ -63,9 +56,7 @@ impl FilterState {
6356
/// For more background, please also see the [Dynamic Filters: Passing Information Between Operators During Execution for 25x Faster Queries blog]
6457
///
6558
/// [Dynamic Filters: Passing Information Between Operators During Execution for 25x Faster Queries blog]: https://datafusion.apache.org/blog/2025/09/10/dynamic-filters
66-
#[derive(Debug)]
6759
pub struct DynamicFilterPhysicalExpr {
68-
expression_id: u64,
6960
/// The original children of this PhysicalExpr, if any.
7061
/// This is necessary because the dynamic filter may be initialized with a placeholder (e.g. `lit(true)`)
7162
/// and later remapped to the actual expressions that are being filtered.
@@ -74,6 +65,10 @@ pub struct DynamicFilterPhysicalExpr {
7465
/// If any of the children were remapped / modified (e.g. to adjust for projections) we need to keep track of the new children
7566
/// so that when we update `current()` in subsequent iterations we can re-apply the replacements.
7667
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
68+
/// Unique identifier for this dynamic filter.
69+
///
70+
/// Derived filters (ex. via `with_new_children`) should inherit the expression id of the source filter.
71+
expression_id: u64,
7772
/// The source of dynamic filters.
7873
inner: Arc<RwLock<Inner>>,
7974
/// Broadcasts filter state (updates and completion) to all waiters.
@@ -100,9 +95,9 @@ struct Inner {
10095
/// An atomic snapshot of a [`DynamicFilterPhysicalExpr`] used to reconstruct the expression during
10196
/// serialization / deserialization.
10297
pub struct DynamicFilterSnapshot {
103-
expression_id: u64,
10498
children: Vec<Arc<dyn PhysicalExpr>>,
10599
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
100+
expression_id: u64,
106101
// Inner state.
107102
generation: u64,
108103
inner_expr: Arc<dyn PhysicalExpr>,
@@ -119,9 +114,9 @@ impl DynamicFilterSnapshot {
119114
is_complete: bool,
120115
) -> Self {
121116
Self {
122-
expression_id,
123117
children,
124118
remapped_children,
119+
expression_id,
125120
generation,
126121
inner_expr,
127122
is_complete,
@@ -153,14 +148,33 @@ impl DynamicFilterSnapshot {
153148
}
154149
}
155150

151+
// TODO: once <https://github.com/apache/datafusion/pull/20416> lands and every
152+
// plan node serializes its dynamic filter through proto, drop this manual
153+
// impl and go back to `#[derive(Debug)]`. The id is excluded here so that
154+
// plan nodes which still auto-create a dynamic filter on decode
155+
// (`AggregateExec`, `HashJoinExec`) don't fail plan-equivalence assertions
156+
// purely on the fresh `expression_id`.
157+
impl std::fmt::Debug for DynamicFilterPhysicalExpr {
158+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159+
f.debug_struct("DynamicFilterPhysicalExpr")
160+
.field("children", &self.children)
161+
.field("remapped_children", &self.remapped_children)
162+
.field("inner", &self.inner)
163+
.field("state_watch", &self.state_watch)
164+
.field("data_type", &self.data_type)
165+
.field("nullable", &self.nullable)
166+
.finish()
167+
}
168+
}
169+
156170
impl Display for DynamicFilterSnapshot {
157171
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158172
write!(
159173
f,
160-
"DynamicFilterSnapshot {{ expression_id: {}, children: {:?}, remapped_children: {:?}, generation: {}, inner_expr: {:?}, is_complete: {} }}",
161-
self.expression_id,
174+
"DynamicFilterSnapshot {{ children: {:?}, remapped_children: {:?}, expression_id: {}, generation: {}, inner_expr: {:?}, is_complete: {} }}",
162175
self.children,
163176
self.remapped_children,
177+
self.expression_id,
164178
self.generation,
165179
self.inner_expr,
166180
self.is_complete
@@ -171,9 +185,9 @@ impl Display for DynamicFilterSnapshot {
171185
impl From<DynamicFilterSnapshot> for DynamicFilterPhysicalExpr {
172186
fn from(snapshot: DynamicFilterSnapshot) -> Self {
173187
let DynamicFilterSnapshot {
174-
expression_id,
175188
children,
176189
remapped_children,
190+
expression_id,
177191
generation,
178192
inner_expr,
179193
is_complete,
@@ -187,9 +201,9 @@ impl From<DynamicFilterSnapshot> for DynamicFilterPhysicalExpr {
187201
let (state_watch, _) = watch::channel(state);
188202

189203
Self {
190-
expression_id,
191204
children,
192205
remapped_children,
206+
expression_id,
193207
inner: Arc::new(RwLock::new(Inner {
194208
generation,
195209
expr: inner_expr,
@@ -210,9 +224,9 @@ impl From<&DynamicFilterPhysicalExpr> for DynamicFilterSnapshot {
210224
(inner.generation, Arc::clone(&inner.expr), inner.is_complete)
211225
};
212226
DynamicFilterSnapshot {
213-
expression_id: expr.expression_id,
214227
children: expr.children.clone(),
215228
remapped_children: expr.remapped_children.clone(),
229+
expression_id: expr.expression_id,
216230
generation,
217231
inner_expr,
218232
is_complete,
@@ -271,9 +285,9 @@ impl Display for DynamicFilterPhysicalExpr {
271285

272286
impl DynamicFilterPhysicalExpr {
273287
pub fn new_expression_id() -> u64 {
274-
static NEXT_EXPRESSION_ID: LazyLock<AtomicU64> =
275-
LazyLock::new(|| AtomicU64::new(random::<u64>()));
276-
NEXT_EXPRESSION_ID.fetch_add(1, Ordering::Relaxed)
288+
// Collision probability at ~10⁹ filters is 2.7%; realistic plans
289+
// carry a handful. 2⁻⁶⁴ per pair is not worth an atomic counter.
290+
random::<u64>()
277291
}
278292

279293
/// Create a new [`DynamicFilterPhysicalExpr`]
@@ -309,9 +323,9 @@ impl DynamicFilterPhysicalExpr {
309323
) -> Self {
310324
let (state_watch, _) = watch::channel(FilterState::InProgress { generation: 1 });
311325
Self {
312-
expression_id: Self::new_expression_id(),
313326
children,
314327
remapped_children: None, // Initially no remapped children
328+
expression_id: Self::new_expression_id(),
315329
inner: Arc::new(RwLock::new(Inner::new(inner))),
316330
state_watch,
317331
data_type: Arc::new(RwLock::new(None)),
@@ -501,9 +515,9 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
501515
children: Vec<Arc<dyn PhysicalExpr>>,
502516
) -> Result<Arc<dyn PhysicalExpr>> {
503517
Ok(Arc::new(Self {
504-
expression_id: self.expression_id,
505518
children: self.children.clone(),
506519
remapped_children: Some(children),
520+
expression_id: self.expression_id,
507521
inner: Arc::clone(&self.inner),
508522
state_watch: self.state_watch.clone(),
509523
data_type: Arc::clone(&self.data_type),

0 commit comments

Comments
 (0)