Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions datafusion/physical-expr-common/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>>;

/// Returns a new PhysicalExpr where all children were replaced by new exprs.
///
/// If the implementation returns a [`PhysicalExpr::expression_id`], then
/// the identifier should be preserved by the new expression.
Comment on lines +167 to +168
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏻

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn PhysicalExpr>>,
Expand Down Expand Up @@ -438,6 +441,23 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
fn placement(&self) -> ExpressionPlacement {
ExpressionPlacement::KeepInPlace
}

/// Return a stable, globally-unique identifier for this [`PhysicalExpr`], if it
/// has one.
///
/// This identifier tracks which expressions which are connected (e.g. `DynamicFilterPhysicalExpr`
Comment thread
jayshrivastava marked this conversation as resolved.
Outdated
/// where two expressions may be different but store the same mutable inner state). Tracking
/// connected expressions helps preserve referential integrity within plan nodes
/// during serialization and deserialization.
///
/// This id must be preserved across [`PhysicalExpr::with_new_children`] or any other
/// methods which may want to preserve identity.
///
/// Default is `None`: the expression has no identity worth preserving across a
/// serialization boundary.
fn expression_id(&self) -> Option<u64> {
None
}
}

#[deprecated(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ indexmap = { workspace = true }
itertools = { workspace = true, features = ["use_std"] }
parking_lot = { workspace = true }
petgraph = "0.8.3"
rand = { workspace = true }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what the conventions are, but: introducing random order to the physical-expr crate seems like a big deal? Is this being used to generate something that could be deterministic from some sort of context instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good question! I was thinking about a few alternatives 😄

  • We could hash all the exprs in the DynamicFilterPhysicalExpr to get an id. However, this wouldn't solve the shared Inner state linking problem. We don't want this type of identifier - we want an identifier for the inner state specifically.
  • We could use the Arc address of the Inner struct, but it's a bit of a smell to rely on pointer addresses - for example, IDs derived from Arc pointers are only valid until the Arc is dropped. This is what the old code used and something I used as well in initial versions of this PR

A rand u64 is not bad. Realistically, we just need to not have a collision between distinct dynamic filters in a plan. I figure that the probability of more than 2 or 3 distinct dynamic filters in a query must be very low already. And the probability of 2 or 3 rand u64s colliding is negligible.

Lmk what you think!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm less worried about collisions, and more worried about non-determinism (causing flaky tests, different plans to be generated randomly, etc). The other annoying aspect of random ids is that they are huge. If you go ahead and actually render this everywhere, a random ID is going to take up a lot more space than one generated on a context (...starting from 0, etc).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now the ids are also random (generated from a mashup of arc pointer address, process id, etc.). So in that sense it's no better or worse. But I do agree something deterministically generated from context would be better.

The only alternative that occurs to me is a process level atomic. Not sure if that might cause some locking, etc. Do you have any other suggestions?

Lastly: we can always change this. As long as there is no API contract on what this number is going to be we could replace it at any point if it becomes a problem.

recursive = { workspace = true, optional = true }
tokio = { workspace = true }
half = { workspace = true }
Expand All @@ -64,7 +65,6 @@ arrow = { workspace = true, features = ["test_utils"] }
criterion = { workspace = true }
datafusion-functions = { workspace = true }
insta = { workspace = true }
rand = { workspace = true }
rstest = { workspace = true }

[[bench]]
Expand Down
225 changes: 219 additions & 6 deletions datafusion/physical-expr/src/expressions/dynamic_filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datafusion_common::{
};
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr_common::physical_expr::DynHash;
use rand::random;

/// State of a dynamic filter, tracking both updates and completion.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -55,7 +56,6 @@ impl FilterState {
/// For more background, please also see the [Dynamic Filters: Passing Information Between Operators During Execution for 25x Faster Queries blog]
///
/// [Dynamic Filters: Passing Information Between Operators During Execution for 25x Faster Queries blog]: https://datafusion.apache.org/blog/2025/09/10/dynamic-filters
#[derive(Debug)]
pub struct DynamicFilterPhysicalExpr {
/// The original children of this PhysicalExpr, if any.
/// This is necessary because the dynamic filter may be initialized with a placeholder (e.g. `lit(true)`)
Expand All @@ -65,6 +65,10 @@ pub struct DynamicFilterPhysicalExpr {
/// If any of the children were remapped / modified (e.g. to adjust for projections) we need to keep track of the new children
/// so that when we update `current()` in subsequent iterations we can re-apply the replacements.
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
/// Unique identifier for this dynamic filter.
///
/// Derived filters (ex. via `with_new_children`) should inherit the expression id of the source filter.
expression_id: u64,
/// The source of dynamic filters.
inner: Arc<RwLock<Inner>>,
Comment on lines +68 to 73
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if it were clearer exactly how the Inner relates to the expression_id.

With (mostly) an outsider's perspective, it seems like the expression_id in this case should actually literally be an id for the inner state? So as with #21650 , it feels like this is more like a mutable_state_id, perhaps?

Because you might have two different DynamicFilterPhysicalExpr (different wrapping expressions) wrapped around the same inner state... and at that point, the expression_id is not an "id for the expression".

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because you might have two different DynamicFilterPhysicalExpr (different wrapping expressions) wrapped around the same inner state... and at that point, the expression_id is not an "id for the expression".

I think the inner state is actually the real expression. The only thing that may change via wrapping is the children expressions. Ex. if you call reassign_expr_columns. When you update a dynamic filter, the new expression is written directly into Inner::expr

Copy link
Copy Markdown
Contributor Author

@jayshrivastava jayshrivastava Apr 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With (mostly) an outsider's perspective, it seems like the expression_id in this case should actually literally be an id for the inner state? So as with #21650 , it feels like this is more like a mutable_state_id, perhaps?

We could use the Arc address of Inner, but it's a smell to rely on pointers. The approach in this PR is randomly generate an id for Inner and ensure that this ID propagates when new expressions are derived using the same Inner.

You make a good point though. Do you think it would be more clear if we put the expression_id inside the Inner struct? I feel like that's better

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it would be more clear if we put the expression_id inside the Inner struct? I feel like that's better

I agree that's cleaner. The con is that you need to acquire the lock just to read it. But maybe that's fine.

/// Broadcasts filter state (updates and completion) to all waiters.
Expand All @@ -76,16 +80,40 @@ pub struct DynamicFilterPhysicalExpr {
nullable: Arc<RwLock<Option<bool>>>,
}

#[derive(Debug)]
struct Inner {
/// Atomic internal state of a [`DynamicFilterPhysicalExpr`].
///
/// **Warning:** exposed publicly solely so that proto (de)serialization in
/// `datafusion-proto` can read and rebuild this state. Do not treat this type
/// or its layout as a stable API.
#[derive(Debug, Clone)]
pub struct Inner {
/// A counter that gets incremented every time the expression is updated so that we can track changes cheaply.
/// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes.
generation: u64,
expr: Arc<dyn PhysicalExpr>,
pub generation: u64,
pub expr: Arc<dyn PhysicalExpr>,
/// Flag for quick synchronous check if filter is complete.
/// This is redundant with the watch channel state, but allows us to return immediately
/// from `wait_complete()` without subscribing if already complete.
is_complete: bool,
pub is_complete: bool,
}

// TODO: Include expression_id in debug output.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally even in the DisplayAs representation for plans, I would think. It's actually possibly the most important thing to have in the plan when rendering a dynamic filter?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to keep this PR small, so I hid the expression_id to avoid 1 test that was failing. I have code ready to address this TODO which I plan to publish as a next step after this PR is merged. This PR is just an incremental step.

// See https://github.com/apache/datafusion/issues/20418. Currently, plan nodes
// like `HashJoinExec`, `AggregateExec`,  `SortExec` do not serialize their
// dynamic filter. This causes round trips to fail on the `expression_id`
// because it is regenerated on deserialization.

//
// See https://github.com/apache/datafusion/issues/20418. Currently, plan nodes
// like `HashJoinExec`, `AggregateExec`, `SortExec` do not serialize their
// dynamic filter. This causes round trips to fail on the `expression_id`
// because it is regenerated on deserialization.
impl std::fmt::Debug for DynamicFilterPhysicalExpr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DynamicFilterPhysicalExpr")
.field("children", &self.children)
.field("remapped_children", &self.remapped_children)
.field("inner", &self.inner)
.field("state_watch", &self.state_watch)
.field("data_type", &self.data_type)
.field("nullable", &self.nullable)
.finish()
}
}

impl Inner {
Expand Down Expand Up @@ -173,6 +201,7 @@ impl DynamicFilterPhysicalExpr {
Self {
children,
remapped_children: None, // Initially no remapped children
expression_id: Self::new_expression_id(),
inner: Arc::new(RwLock::new(Inner::new(inner))),
state_watch,
data_type: Arc::new(RwLock::new(None)),
Expand Down Expand Up @@ -346,6 +375,75 @@ impl DynamicFilterPhysicalExpr {

write!(f, " ]")
}

/// Generate a new expression id for this filter.
fn new_expression_id() -> u64 {
random::<u64>()
}
Comment on lines +379 to +382
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a public function?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Made it private.


/// Return the filter's original children (before any remapping).
///
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
/// Not a stable API.
pub fn original_children(&self) -> &[Arc<dyn PhysicalExpr>] {
&self.children
}

/// Return the filter's remapped children, if any have been set via
/// [`PhysicalExpr::with_new_children`].
///
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
/// Not a stable API.
pub fn remapped_children(&self) -> Option<&[Arc<dyn PhysicalExpr>]> {
self.remapped_children.as_deref()
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two getters aren't horrible to keep around, so I didn't mark them as unstable. Lmk if you think we should add

/// **Warning:** intended only for `datafusion-proto` (de)serialization.
/// Not a stable API

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think defaulting to at least marking things as private via comments is a good defensive approach. The situation with proto serialization is unfortunate, I'll see if I can do something about it...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


/// Rebuild a `DynamicFilterPhysicalExpr` from its stored parts. Used by
/// proto deserialization to preserve `expression_id` across a roundtrip
/// rather than minting a fresh one.
///
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
/// Not a stable API.
Comment on lines +401 to +406
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be generic via the shared state proposal? Not sure what is up with that. Ditto fn inner.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shared state discussion seems promising! I'm hoping it will be an easy migration. I imagine that we would end up storing all Inners in the TaskContext instead of in the actual DynamicFilterPhysicalExpr and looking up the Inners via the expression_id. It's still an ongoing discussion though so I can't be sure how the migration will look.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that would make sense to me

pub fn from_parts(
expression_id: u64,
children: Vec<Arc<dyn PhysicalExpr>>,
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
inner: Inner,
) -> Self {
let state = if inner.is_complete {
FilterState::Complete {
generation: inner.generation,
}
} else {
FilterState::InProgress {
generation: inner.generation,
}
};
let (state_watch, _) = watch::channel(state);

Self {
children,
remapped_children,
expression_id,
inner: Arc::new(RwLock::new(inner)),
state_watch,
data_type: Arc::new(RwLock::new(None)),
nullable: Arc::new(RwLock::new(None)),
}
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method looks like a bit of a smell but it gets the job done.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah unfortunately I think this is once again just the shape the current proto stuff forces everything into.


/// Return a clone of the atomically-captured `Inner` state.
///
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
/// Not a stable API.
pub fn inner(&self) -> Inner {
let guard = self.inner.read();
Inner {
generation: guard.generation,
expr: Arc::clone(&guard.expr),
is_complete: guard.is_complete,
}
}
}

impl PhysicalExpr for DynamicFilterPhysicalExpr {
Expand All @@ -364,6 +462,9 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
Ok(Arc::new(Self {
children: self.children.clone(),
remapped_children: Some(children),
// Note that we ensure the derived expression linked to `self`
// via the unique identifier.
expression_id: self.expression_id,
inner: Arc::clone(&self.inner),
state_watch: self.state_watch.clone(),
data_type: Arc::clone(&self.data_type),
Expand Down Expand Up @@ -444,6 +545,10 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
// Return the current generation of the expression.
self.inner.read().generation
}

fn expression_id(&self) -> Option<u64> {
Some(self.expression_id)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -861,4 +966,112 @@ mod test {
"Hash should be stable after update (identity-based)"
);
}

/// Verifies that `from_parts` rebuilds a `DynamicFilterPhysicalExpr`
/// whose observable state (original children, remapped children,
/// expression id, inner generation/expr/is_complete) matches the source
/// filter.
#[test]
fn test_from_parts_preserves_state() {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let col_a = col("a", &schema).unwrap();

// Create a dynamic filter with children
let expr = Arc::new(BinaryExpr::new(
Arc::clone(&col_a),
datafusion_expr::Operator::Gt,
lit(10) as Arc<dyn PhysicalExpr>,
));
let filter = DynamicFilterPhysicalExpr::new(
vec![Arc::clone(&col_a)],
expr as Arc<dyn PhysicalExpr>,
);

// Add remapped children.
let reassigned_schema = Arc::new(Schema::new(vec![
Field::new("b", DataType::Int32, false),
Field::new("a", DataType::Int32, false),
]));
let reassigned = reassign_expr_columns(
Arc::new(filter) as Arc<dyn PhysicalExpr>,
&reassigned_schema,
)
.expect("reassign_expr_columns should succeed");
let reassigned = reassigned
.downcast_ref::<DynamicFilterPhysicalExpr>()
.expect("Expected dynamic filter after reassignment");

reassigned
.update(lit(42) as Arc<dyn PhysicalExpr>)
.expect("Update should succeed");
reassigned.mark_complete();

// Capture the parts and reconstruct.
let reconstructed = DynamicFilterPhysicalExpr::from_parts(
reassigned.expression_id,
reassigned.original_children().to_vec(),
reassigned.remapped_children().map(|r| r.to_vec()),
reassigned.inner(),
);

assert_eq!(
reassigned.original_children(),
reconstructed.original_children(),
);
assert_eq!(
reassigned.remapped_children(),
reconstructed.remapped_children(),
);
assert_eq!(reassigned.expression_id(), reconstructed.expression_id());
let r = reassigned.inner();
let c = reconstructed.inner();
assert_eq!(r.generation, c.generation);
assert_eq!(r.is_complete, c.is_complete);
assert_eq!(format!("{:?}", r.expr), format!("{:?}", c.expr));
}

#[tokio::test]
async fn test_expression_id() {
let source_schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let col_a = col("a", &source_schema).unwrap();

// Create a source filter
let source = Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::clone(&col_a)],
lit(true) as Arc<dyn PhysicalExpr>,
));
let source_clone = Arc::clone(&source);

// Create a derived filter by reassigning the source filter to a different schema.
let derived_schema = Arc::new(Schema::new(vec![
Field::new("x", DataType::Int32, false),
Field::new("a", DataType::Int32, false),
]));
let derived = reassign_expr_columns(
Arc::clone(&source) as Arc<dyn PhysicalExpr>,
&derived_schema,
)
.expect("reassign_expr_columns should succeed");

let derived_expression_id = derived
.expression_id()
.expect("derived filter should have an expression id");
let source_expression_id = source
.expression_id()
.expect("source filter should have an expression id");
let source_clone_expression_id = source_clone
.expression_id()
.expect("source clone should have an expression id");

assert_eq!(
source_clone_expression_id, source_expression_id,
"cloned filter should preserve its expression id",
);

assert_eq!(
derived_expression_id, source_expression_id,
"derived filters should carry forward the source expression id",
);
}
}
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub use case::{CaseExpr, case};
pub use cast::{CastExpr, cast};
pub use column::{Column, col, with_new_schema};
pub use datafusion_expr::utils::format_state_name;
pub use dynamic_filters::DynamicFilterPhysicalExpr;
pub use dynamic_filters::{DynamicFilterPhysicalExpr, Inner as DynamicFilterInner};
pub use in_list::{InListExpr, in_list};
pub use is_not_null::{IsNotNullExpr, is_not_null};
pub use is_null::{IsNullExpr, is_null};
Expand Down
1 change: 0 additions & 1 deletion datafusion/proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ datafusion-proto-common = { workspace = true }
object_store = { workspace = true }
pbjson = { workspace = true, optional = true }
prost = { workspace = true }
rand = { workspace = true }
serde = { version = "1.0", optional = true }
serde_json = { workspace = true, optional = true }

Expand Down
Loading
Loading