Skip to content

Commit 23fc7f1

Browse files
wip
1 parent a4d45b5 commit 23fc7f1

8 files changed

Lines changed: 63 additions & 384 deletions

File tree

datafusion/physical-expr-common/src/physical_expr.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,9 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
163163
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>>;
164164

165165
/// Returns a new PhysicalExpr where all children were replaced by new exprs.
166+
///
167+
/// If the implementation returns a [`PhysicalExpr::expression_id`], then
168+
/// the identifier should be preserved by the new expression.
166169
fn with_new_children(
167170
self: Arc<Self>,
168171
children: Vec<Arc<dyn PhysicalExpr>>,
@@ -439,9 +442,19 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
439442
ExpressionPlacement::KeepInPlace
440443
}
441444

442-
/// Returns a process-local identifier used during serde round-trips to
443-
/// preserve referential integrity for special expressions like dynamic
444-
/// filters. Ordinary expressions return `None` by default.
445+
/// Return a stable, globally-unique identifier for this [`PhysicalExpr`], if it
446+
/// has one.
447+
///
448+
/// This identifier tracks which expressions which are connected (e.g. `DynamicFilterPhysicalExpr`
449+
/// where two expressions may be different but store the same mutable inner state). Tracking
450+
/// connected expressions helps preserve referential integrity within plan nodes
451+
/// during serialization and deserialization.
452+
///
453+
/// This id must be preserved across [`PhysicalExpr::with_new_children`] or any other
454+
/// methods which may want to preserve identity.
455+
///
456+
/// Default is `None`: the expression has no identity worth preserving across a
457+
/// serialization boundary.
445458
fn expression_id(&self) -> Option<u64> {
446459
None
447460
}

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 40 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,7 @@
1616
// under the License.
1717

1818
use parking_lot::RwLock;
19-
use std::{
20-
fmt::Display,
21-
hash::Hash,
22-
sync::{
23-
Arc, LazyLock,
24-
atomic::{AtomicU64, Ordering},
25-
},
26-
};
19+
use std::{fmt::Display, hash::Hash, sync::Arc};
2720
use tokio::sync::watch;
2821

2922
use crate::PhysicalExpr;
@@ -63,9 +56,7 @@ impl FilterState {
6356
/// For more background, please also see the [Dynamic Filters: Passing Information Between Operators During Execution for 25x Faster Queries blog]
6457
///
6558
/// [Dynamic Filters: Passing Information Between Operators During Execution for 25x Faster Queries blog]: https://datafusion.apache.org/blog/2025/09/10/dynamic-filters
66-
#[derive(Debug)]
6759
pub struct DynamicFilterPhysicalExpr {
68-
expression_id: u64,
6960
/// The original children of this PhysicalExpr, if any.
7061
/// This is necessary because the dynamic filter may be initialized with a placeholder (e.g. `lit(true)`)
7162
/// and later remapped to the actual expressions that are being filtered.
@@ -74,6 +65,10 @@ pub struct DynamicFilterPhysicalExpr {
7465
/// If any of the children were remapped / modified (e.g. to adjust for projections) we need to keep track of the new children
7566
/// so that when we update `current()` in subsequent iterations we can re-apply the replacements.
7667
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
68+
/// Unique identifier for this dynamic filter.
69+
///
70+
/// Derived filters (ex. via `with_new_children`) should inherit the expression id of the source filter.
71+
expression_id: u64,
7772
/// The source of dynamic filters.
7873
inner: Arc<RwLock<Inner>>,
7974
/// Broadcasts filter state (updates and completion) to all waiters.
@@ -100,9 +95,9 @@ struct Inner {
10095
/// An atomic snapshot of a [`DynamicFilterPhysicalExpr`] used to reconstruct the expression during
10196
/// serialization / deserialization.
10297
pub struct DynamicFilterSnapshot {
103-
expression_id: u64,
10498
children: Vec<Arc<dyn PhysicalExpr>>,
10599
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
100+
expression_id: u64,
106101
// Inner state.
107102
generation: u64,
108103
inner_expr: Arc<dyn PhysicalExpr>,
@@ -119,9 +114,9 @@ impl DynamicFilterSnapshot {
119114
is_complete: bool,
120115
) -> Self {
121116
Self {
122-
expression_id,
123117
children,
124118
remapped_children,
119+
expression_id,
125120
generation,
126121
inner_expr,
127122
is_complete,
@@ -153,14 +148,33 @@ impl DynamicFilterSnapshot {
153148
}
154149
}
155150

151+
// TODO: Include expression_id in debug output.
152+
//
153+
// See https://github.com/apache/datafusion/issues/20418. Currently, plan nodes
154+
// like `HashJoinExec`, `AggregateExec`, `SortExec` do not serialize their
155+
// dynamic filter. This causes round trips to fail on the `expression_id`
156+
// because it is regenerated on deserialization.
157+
impl std::fmt::Debug for DynamicFilterPhysicalExpr {
158+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159+
f.debug_struct("DynamicFilterPhysicalExpr")
160+
.field("children", &self.children)
161+
.field("remapped_children", &self.remapped_children)
162+
.field("inner", &self.inner)
163+
.field("state_watch", &self.state_watch)
164+
.field("data_type", &self.data_type)
165+
.field("nullable", &self.nullable)
166+
.finish()
167+
}
168+
}
169+
156170
impl Display for DynamicFilterSnapshot {
157171
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158172
write!(
159173
f,
160-
"DynamicFilterSnapshot {{ expression_id: {}, children: {:?}, remapped_children: {:?}, generation: {}, inner_expr: {:?}, is_complete: {} }}",
161-
self.expression_id,
174+
"DynamicFilterSnapshot {{ children: {:?}, remapped_children: {:?}, expression_id: {}, generation: {}, inner_expr: {:?}, is_complete: {} }}",
162175
self.children,
163176
self.remapped_children,
177+
self.expression_id,
164178
self.generation,
165179
self.inner_expr,
166180
self.is_complete
@@ -171,9 +185,9 @@ impl Display for DynamicFilterSnapshot {
171185
impl From<DynamicFilterSnapshot> for DynamicFilterPhysicalExpr {
172186
fn from(snapshot: DynamicFilterSnapshot) -> Self {
173187
let DynamicFilterSnapshot {
174-
expression_id,
175188
children,
176189
remapped_children,
190+
expression_id,
177191
generation,
178192
inner_expr,
179193
is_complete,
@@ -187,9 +201,9 @@ impl From<DynamicFilterSnapshot> for DynamicFilterPhysicalExpr {
187201
let (state_watch, _) = watch::channel(state);
188202

189203
Self {
190-
expression_id,
191204
children,
192205
remapped_children,
206+
expression_id,
193207
inner: Arc::new(RwLock::new(Inner {
194208
generation,
195209
expr: inner_expr,
@@ -210,9 +224,9 @@ impl From<&DynamicFilterPhysicalExpr> for DynamicFilterSnapshot {
210224
(inner.generation, Arc::clone(&inner.expr), inner.is_complete)
211225
};
212226
DynamicFilterSnapshot {
213-
expression_id: expr.expression_id,
214227
children: expr.children.clone(),
215228
remapped_children: expr.remapped_children.clone(),
229+
expression_id: expr.expression_id,
216230
generation,
217231
inner_expr,
218232
is_complete,
@@ -270,12 +284,6 @@ impl Display for DynamicFilterPhysicalExpr {
270284
}
271285

272286
impl DynamicFilterPhysicalExpr {
273-
pub fn new_expression_id() -> u64 {
274-
static NEXT_EXPRESSION_ID: LazyLock<AtomicU64> =
275-
LazyLock::new(|| AtomicU64::new(random::<u64>()));
276-
NEXT_EXPRESSION_ID.fetch_add(1, Ordering::Relaxed)
277-
}
278-
279287
/// Create a new [`DynamicFilterPhysicalExpr`]
280288
/// from an initial expression and a list of children.
281289
/// The list of children is provided separately because
@@ -309,9 +317,9 @@ impl DynamicFilterPhysicalExpr {
309317
) -> Self {
310318
let (state_watch, _) = watch::channel(FilterState::InProgress { generation: 1 });
311319
Self {
312-
expression_id: Self::new_expression_id(),
313320
children,
314321
remapped_children: None, // Initially no remapped children
322+
expression_id: Self::new_expression_id(),
315323
inner: Arc::new(RwLock::new(Inner::new(inner))),
316324
state_watch,
317325
data_type: Arc::new(RwLock::new(None)),
@@ -485,6 +493,11 @@ impl DynamicFilterPhysicalExpr {
485493

486494
write!(f, " ]")
487495
}
496+
497+
/// Generate a new expression id for this filter.
498+
pub fn new_expression_id() -> u64 {
499+
random::<u64>()
500+
}
488501
}
489502

490503
impl PhysicalExpr for DynamicFilterPhysicalExpr {
@@ -501,9 +514,11 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
501514
children: Vec<Arc<dyn PhysicalExpr>>,
502515
) -> Result<Arc<dyn PhysicalExpr>> {
503516
Ok(Arc::new(Self {
504-
expression_id: self.expression_id,
505517
children: self.children.clone(),
506518
remapped_children: Some(children),
519+
// Note that we ensure the derived expression linked to `self`
520+
// via the unique identifier.
521+
expression_id: self.expression_id,
507522
inner: Arc::clone(&self.inner),
508523
state_watch: self.state_watch.clone(),
509524
data_type: Arc::clone(&self.data_type),

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -920,21 +920,6 @@ impl SortExec {
920920
))))
921921
}
922922

923-
/// The dynamic filter used by this sort for TopK pushdown, if any.
924-
/// Returns `None` when `fetch` is `None` or the filter has not yet been
925-
/// initialized (e.g. directly after `SortExec::new` before `with_fetch`).
926-
pub fn dynamic_filter(&self) -> Option<Arc<DynamicFilterPhysicalExpr>> {
927-
self.filter.as_ref().map(|f| f.read().expr())
928-
}
929-
930-
/// Attach an explicit dynamic filter, replacing whatever was there.
931-
/// Used by serde roundtrips to restore the filter's identity
932-
/// (`expression_id`) rather than letting `with_fetch` mint a fresh one.
933-
pub fn with_dynamic_filter(mut self, filter: Arc<DynamicFilterPhysicalExpr>) -> Self {
934-
self.filter = Some(Arc::new(RwLock::new(TopKDynamicFilters::new(filter))));
935-
self
936-
}
937-
938923
fn cloned(&self) -> Self {
939924
SortExec {
940925
input: Arc::clone(&self.input),

datafusion/proto/proto/datafusion.proto

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -870,12 +870,12 @@ message PhysicalExtensionNode {
870870
repeated PhysicalPlanNode inputs = 2;
871871
}
872872

873+
// physical expressions
873874
message PhysicalExprNode {
874875
// Was date_time_interval_expr
875876
reserved 17;
876877

877-
// Was expr_id (generic per-expression dedup identifier). Identity is now
878-
// carried only by DynamicFilterPhysicalExpr via PhysicalDynamicFilterNode.expression_id.
878+
// Was expr_id
879879
reserved 30;
880880

881881
oneof ExprType {
@@ -926,8 +926,6 @@ message PhysicalDynamicFilterNode {
926926
uint64 generation = 3;
927927
PhysicalExprNode inner_expr = 4;
928928
bool is_complete = 5;
929-
// Process-local identifier preserved across serde roundtrip so multiple
930-
// occurrences of the same dynamic filter in a plan share inner state.
931929
uint64 expression_id = 6;
932930
}
933931

@@ -1172,10 +1170,6 @@ message HashJoinExecNode {
11721170
JoinFilter filter = 8;
11731171
repeated uint32 projection = 9;
11741172
bool null_aware = 10;
1175-
// Optional dynamic filter pushed down to the probe side.
1176-
// Carries an `expression_id` so shared identity is preserved across a
1177-
// serde roundtrip.
1178-
PhysicalExprNode dynamic_filter = 11;
11791173
}
11801174

11811175
enum StreamPartitionMode {
@@ -1307,10 +1301,6 @@ message AggregateExecNode {
13071301
repeated MaybeFilter filter_expr = 10;
13081302
AggLimit limit = 11;
13091303
bool has_grouping_set = 12;
1310-
// Optional dynamic filter attached to this aggregate (min/max pushdown).
1311-
// Carries an `expression_id` so shared identity is preserved across a
1312-
// serde roundtrip rather than being re-minted by `init_dynamic_filter`.
1313-
PhysicalExprNode dynamic_filter = 13;
13141304
}
13151305

13161306
message GlobalLimitExecNode {
@@ -1332,10 +1322,6 @@ message SortExecNode {
13321322
// Maximum number of highest/lowest rows to fetch; negative means no limit
13331323
int64 fetch = 3;
13341324
bool preserve_partitioning = 4;
1335-
// Optional TopK dynamic filter attached to this sort. Present when `fetch`
1336-
// is set and the filter has been initialized. Carries an `expression_id`
1337-
// so shared identity is preserved across a serde roundtrip.
1338-
PhysicalExprNode dynamic_filter = 5;
13391325
}
13401326

13411327
message SortPreservingMergeExecNode {

0 commit comments

Comments
 (0)