Skip to content

Commit a4d45b5

Browse files
wip
1 parent 244a848 commit a4d45b5

13 files changed

Lines changed: 553 additions & 671 deletions

File tree

datafusion/physical-expr-common/src/physical_expr.rs

Lines changed: 6 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use std::any::Any;
1919
use std::fmt;
2020
use std::fmt::{Debug, Display, Formatter};
21-
use std::hash::{DefaultHasher, Hash, Hasher};
21+
use std::hash::{Hash, Hasher};
2222
use std::sync::Arc;
2323

2424
use crate::utils::scatter;
@@ -439,51 +439,14 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
439439
ExpressionPlacement::KeepInPlace
440440
}
441441

442-
/// Returns a composite identifier for a [`PhysicalExpr`]. Note that if the expression
443-
/// is dropped, then the returned id is no longer valid.
444-
fn expr_id(self: Arc<Self>, salt: &[u64]) -> Option<PhysicalExprId> {
445-
Some(PhysicalExprId::new(expr_id_from_arc(&self, salt), None))
442+
/// Returns a process-local identifier used during serde round-trips to
443+
/// preserve referential integrity for special expressions like dynamic
444+
/// filters. Ordinary expressions return `None` by default.
445+
fn expression_id(&self) -> Option<u64> {
446+
None
446447
}
447448
}
448449

449-
/// A composite identifier for [`PhysicalExpr`].
450-
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
451-
pub struct PhysicalExprId {
452-
exact: u64,
453-
shallow: Option<u64>,
454-
}
455-
456-
impl PhysicalExprId {
457-
/// Create a new [`PhysicalExprId`]. Both ids must be globally unique within
458-
/// a process.
459-
pub fn new(exact: u64, shallow: Option<u64>) -> Self {
460-
Self { exact, shallow }
461-
}
462-
463-
/// Returns the identifier for the full expression tree, including children.
464-
pub fn exact(&self) -> u64 {
465-
self.exact
466-
}
467-
468-
/// Returns the identifier for just the expression root, ignoring children.
469-
pub fn shallow(&self) -> Option<u64> {
470-
self.shallow
471-
}
472-
}
473-
474-
/// Computes a unique identifier for a type contained within an [`Arc`]. It hashes
475-
/// the [`Arc`] pointer to create a process-local identifier that remains valid
476-
/// only while that allocation is still alive.
477-
pub fn expr_id_from_arc<T: ?Sized>(expr: &Arc<T>, salt: &[u64]) -> u64 {
478-
let mut hasher = DefaultHasher::new();
479-
let ptr = Arc::as_ptr(expr) as *const () as u64;
480-
ptr.hash(&mut hasher);
481-
for &salt in salt {
482-
salt.hash(&mut hasher);
483-
}
484-
hasher.finish()
485-
}
486-
487450
#[deprecated(
488451
since = "50.0.0",
489452
note = "Use `datafusion_expr_common::dyn_eq` instead"

datafusion/physical-expr/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ indexmap = { workspace = true }
5555
itertools = { workspace = true, features = ["use_std"] }
5656
parking_lot = { workspace = true }
5757
petgraph = "0.8.3"
58+
rand = { workspace = true }
5859
recursive = { workspace = true, optional = true }
5960
tokio = { workspace = true }
6061
half = { workspace = true }
@@ -64,7 +65,6 @@ arrow = { workspace = true, features = ["test_utils"] }
6465
criterion = { workspace = true }
6566
datafusion-functions = { workspace = true }
6667
insta = { workspace = true }
67-
rand = { workspace = true }
6868
rstest = { workspace = true }
6969

7070
[[bench]]

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 47 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,14 @@
1616
// under the License.
1717

1818
use parking_lot::RwLock;
19-
use std::{fmt::Display, hash::Hash, sync::Arc};
19+
use std::{
20+
fmt::Display,
21+
hash::Hash,
22+
sync::{
23+
Arc, LazyLock,
24+
atomic::{AtomicU64, Ordering},
25+
},
26+
};
2027
use tokio::sync::watch;
2128

2229
use crate::PhysicalExpr;
@@ -26,9 +33,8 @@ use datafusion_common::{
2633
tree_node::{Transformed, TransformedResult, TreeNode},
2734
};
2835
use datafusion_expr::ColumnarValue;
29-
use datafusion_physical_expr_common::physical_expr::{
30-
DynHash, PhysicalExprId, expr_id_from_arc,
31-
};
36+
use datafusion_physical_expr_common::physical_expr::DynHash;
37+
use rand::random;
3238

3339
/// State of a dynamic filter, tracking both updates and completion.
3440
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -59,6 +65,7 @@ impl FilterState {
5965
/// [Dynamic Filters: Passing Information Between Operators During Execution for 25x Faster Queries blog]: https://datafusion.apache.org/blog/2025/09/10/dynamic-filters
6066
#[derive(Debug)]
6167
pub struct DynamicFilterPhysicalExpr {
68+
expression_id: u64,
6269
/// The original children of this PhysicalExpr, if any.
6370
/// This is necessary because the dynamic filter may be initialized with a placeholder (e.g. `lit(true)`)
6471
/// and later remapped to the actual expressions that are being filtered.
@@ -93,6 +100,7 @@ struct Inner {
93100
/// An atomic snapshot of a [`DynamicFilterPhysicalExpr`] used to reconstruct the expression during
94101
/// serialization / deserialization.
95102
pub struct DynamicFilterSnapshot {
103+
expression_id: u64,
96104
children: Vec<Arc<dyn PhysicalExpr>>,
97105
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
98106
// Inner state.
@@ -103,13 +111,15 @@ pub struct DynamicFilterSnapshot {
103111

104112
impl DynamicFilterSnapshot {
105113
pub fn new(
114+
expression_id: u64,
106115
children: Vec<Arc<dyn PhysicalExpr>>,
107116
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
108117
generation: u64,
109118
inner_expr: Arc<dyn PhysicalExpr>,
110119
is_complete: bool,
111120
) -> Self {
112121
Self {
122+
expression_id,
113123
children,
114124
remapped_children,
115125
generation,
@@ -122,6 +132,10 @@ impl DynamicFilterSnapshot {
122132
&self.children
123133
}
124134

135+
pub fn expression_id(&self) -> u64 {
136+
self.expression_id
137+
}
138+
125139
pub fn remapped_children(&self) -> Option<&[Arc<dyn PhysicalExpr>]> {
126140
self.remapped_children.as_deref()
127141
}
@@ -143,7 +157,8 @@ impl Display for DynamicFilterSnapshot {
143157
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144158
write!(
145159
f,
146-
"DynamicFilterSnapshot {{ children: {:?}, remapped_children: {:?}, generation: {}, inner_expr: {:?}, is_complete: {} }}",
160+
"DynamicFilterSnapshot {{ expression_id: {}, children: {:?}, remapped_children: {:?}, generation: {}, inner_expr: {:?}, is_complete: {} }}",
161+
self.expression_id,
147162
self.children,
148163
self.remapped_children,
149164
self.generation,
@@ -156,6 +171,7 @@ impl Display for DynamicFilterSnapshot {
156171
impl From<DynamicFilterSnapshot> for DynamicFilterPhysicalExpr {
157172
fn from(snapshot: DynamicFilterSnapshot) -> Self {
158173
let DynamicFilterSnapshot {
174+
expression_id,
159175
children,
160176
remapped_children,
161177
generation,
@@ -171,6 +187,7 @@ impl From<DynamicFilterSnapshot> for DynamicFilterPhysicalExpr {
171187
let (state_watch, _) = watch::channel(state);
172188

173189
Self {
190+
expression_id,
174191
children,
175192
remapped_children,
176193
inner: Arc::new(RwLock::new(Inner {
@@ -193,6 +210,7 @@ impl From<&DynamicFilterPhysicalExpr> for DynamicFilterSnapshot {
193210
(inner.generation, Arc::clone(&inner.expr), inner.is_complete)
194211
};
195212
DynamicFilterSnapshot {
213+
expression_id: expr.expression_id,
196214
children: expr.children.clone(),
197215
remapped_children: expr.remapped_children.clone(),
198216
generation,
@@ -252,6 +270,12 @@ impl Display for DynamicFilterPhysicalExpr {
252270
}
253271

254272
impl DynamicFilterPhysicalExpr {
273+
pub fn new_expression_id() -> u64 {
274+
static NEXT_EXPRESSION_ID: LazyLock<AtomicU64> =
275+
LazyLock::new(|| AtomicU64::new(random::<u64>()));
276+
NEXT_EXPRESSION_ID.fetch_add(1, Ordering::Relaxed)
277+
}
278+
255279
/// Create a new [`DynamicFilterPhysicalExpr`]
256280
/// from an initial expression and a list of children.
257281
/// The list of children is provided separately because
@@ -285,6 +309,7 @@ impl DynamicFilterPhysicalExpr {
285309
) -> Self {
286310
let (state_watch, _) = watch::channel(FilterState::InProgress { generation: 1 });
287311
Self {
312+
expression_id: Self::new_expression_id(),
288313
children,
289314
remapped_children: None, // Initially no remapped children
290315
inner: Arc::new(RwLock::new(Inner::new(inner))),
@@ -476,6 +501,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
476501
children: Vec<Arc<dyn PhysicalExpr>>,
477502
) -> Result<Arc<dyn PhysicalExpr>> {
478503
Ok(Arc::new(Self {
504+
expression_id: self.expression_id,
479505
children: self.children.clone(),
480506
remapped_children: Some(children),
481507
inner: Arc::clone(&self.inner),
@@ -559,13 +585,8 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
559585
self.inner.read().generation
560586
}
561587

562-
fn expr_id(self: Arc<Self>, salt: &[u64]) -> Option<PhysicalExprId> {
563-
Some(PhysicalExprId::new(
564-
// Capture the outer arc, which contains children and the expr.
565-
expr_id_from_arc(&self, salt),
566-
// Capture the inner arc, which contains the expr only.
567-
Some(expr_id_from_arc(&self.inner, salt)),
568-
))
588+
fn expression_id(&self) -> Option<u64> {
589+
Some(self.expression_id)
569590
}
570591
}
571592

@@ -1033,7 +1054,7 @@ mod test {
10331054
}
10341055

10351056
#[tokio::test]
1036-
async fn test_expr_id() {
1057+
async fn test_expression_id() {
10371058
let source_schema =
10381059
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
10391060
let col_a = col("a", &source_schema).unwrap();
@@ -1056,38 +1077,24 @@ mod test {
10561077
)
10571078
.expect("reassign_expr_columns should succeed");
10581079

1059-
let derived_expr_id = Arc::clone(&derived)
1060-
.expr_id(&[])
1061-
.expect("combined filter should have an expr_id");
1062-
let source_expr_id = Arc::clone(&source)
1063-
.expr_id(&[])
1064-
.expect("source filter should have an expr_id");
1065-
let source_clone_expr_id = Arc::clone(&source_clone)
1066-
.expr_id(&[])
1067-
.expect("source clone should have an expr_id");
1080+
let derived_expression_id = derived
1081+
.expression_id()
1082+
.expect("derived filter should have an expression id");
1083+
let source_expression_id = source
1084+
.expression_id()
1085+
.expect("source filter should have an expression id");
1086+
let source_clone_expression_id = source_clone
1087+
.expression_id()
1088+
.expect("source clone should have an expression id");
10681089

10691090
assert_eq!(
1070-
source_clone_expr_id.exact(),
1071-
source_expr_id.exact(),
1072-
"cloned filter should have the same exact id because the children are the same",
1073-
);
1074-
1075-
assert_eq!(
1076-
source_clone_expr_id.shallow(),
1077-
source_expr_id.shallow(),
1078-
"cloned filter should have the same shallow id because the exprs are the same",
1079-
);
1080-
1081-
assert_ne!(
1082-
derived_expr_id.exact(),
1083-
source_expr_id.exact(),
1084-
"filters should have different exact ids because the children are different",
1091+
source_clone_expression_id, source_expression_id,
1092+
"cloned filter should preserve its expression id",
10851093
);
10861094

10871095
assert_eq!(
1088-
derived_expr_id.shallow(),
1089-
source_expr_id.shallow(),
1090-
"filters should have the same shallow id because the exprs are the same",
1096+
derived_expression_id, source_expression_id,
1097+
"derived filters should carry forward the source expression id",
10911098
);
10921099
}
10931100
}

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1225,6 +1225,30 @@ impl AggregateExec {
12251225
Some(total)
12261226
}
12271227

1228+
/// The dynamic filter carried by this aggregate for cross-operator
1229+
/// pushdown, if one has been initialized. Returns the inner
1230+
/// `DynamicFilterPhysicalExpr` (the identity-bearing piece), not the
1231+
/// private `AggrDynFilter` wrapper.
1232+
pub fn dynamic_filter(&self) -> Option<Arc<DynamicFilterPhysicalExpr>> {
1233+
self.dynamic_filter.as_ref().map(|d| Arc::clone(&d.filter))
1234+
}
1235+
1236+
/// Replace the inner `DynamicFilterPhysicalExpr` while preserving the
1237+
/// auto-built `supported_accumulators_info`. Used by serde roundtrips to
1238+
/// restore the filter's `expression_id` after `try_new` has already minted
1239+
/// a fresh one via `init_dynamic_filter`.
1240+
pub fn with_dynamic_filter(mut self, filter: Arc<DynamicFilterPhysicalExpr>) -> Self {
1241+
if let Some(existing) = self.dynamic_filter.take() {
1242+
let supported_accumulators_info =
1243+
existing.supported_accumulators_info.clone();
1244+
self.dynamic_filter = Some(Arc::new(AggrDynFilter {
1245+
filter,
1246+
supported_accumulators_info,
1247+
}));
1248+
}
1249+
self
1250+
}
1251+
12281252
/// Check if dynamic filter is possible for the current plan node.
12291253
/// - If yes, init one inside `AggregateExec`'s `dynamic_filter` field.
12301254
/// - If not supported, `self.dynamic_filter` should be kept `None`

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,21 @@ impl HashJoinExecBuilder {
494494
self.exec.dynamic_filter = filter;
495495
self
496496
}
497+
498+
/// Attach an explicit `DynamicFilterPhysicalExpr` to the join, preserving
499+
/// its identity (`expression_id`). Used by serde roundtrips so a filter
500+
/// doesn't get re-minted on decode. A fresh `build_accumulator` is created
501+
/// — that is runtime state and not part of the wire format.
502+
pub fn with_dynamic_filter_expr(
503+
mut self,
504+
filter: Arc<DynamicFilterPhysicalExpr>,
505+
) -> Self {
506+
self.exec.dynamic_filter = Some(HashJoinExecDynamicFilter {
507+
filter,
508+
build_accumulator: OnceLock::new(),
509+
});
510+
self
511+
}
497512
}
498513

499514
impl From<&HashJoinExec> for HashJoinExecBuilder {
@@ -861,6 +876,14 @@ impl HashJoinExec {
861876
true
862877
}
863878

879+
/// The join's dynamic filter for probe-side pushdown, if one has been
880+
/// attached (typically by filter-pushdown optimization). Returns the inner
881+
/// `DynamicFilterPhysicalExpr` — the identity-bearing piece, not the
882+
/// private wrapper.
883+
pub fn dynamic_filter(&self) -> Option<Arc<DynamicFilterPhysicalExpr>> {
884+
self.dynamic_filter.as_ref().map(|d| Arc::clone(&d.filter))
885+
}
886+
864887
/// left (build) side which gets hashed
865888
pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
866889
&self.left

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -920,6 +920,21 @@ impl SortExec {
920920
))))
921921
}
922922

923+
/// The dynamic filter used by this sort for TopK pushdown, if any.
924+
/// Returns `None` when `fetch` is `None` or the filter has not yet been
925+
/// initialized (e.g. directly after `SortExec::new` before `with_fetch`).
926+
pub fn dynamic_filter(&self) -> Option<Arc<DynamicFilterPhysicalExpr>> {
927+
self.filter.as_ref().map(|f| f.read().expr())
928+
}
929+
930+
/// Attach an explicit dynamic filter, replacing whatever was there.
931+
/// Used by serde roundtrips to restore the filter's identity
932+
/// (`expression_id`) rather than letting `with_fetch` mint a fresh one.
933+
pub fn with_dynamic_filter(mut self, filter: Arc<DynamicFilterPhysicalExpr>) -> Self {
934+
self.filter = Some(Arc::new(RwLock::new(TopKDynamicFilters::new(filter))));
935+
self
936+
}
937+
923938
fn cloned(&self) -> Self {
924939
SortExec {
925940
input: Arc::clone(&self.input),

0 commit comments

Comments
 (0)