Skip to content

Commit 948cd09

Browse files
proto: serialize and dedupe dynamic filters v2 (#21807)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> Informs: datafusion-contrib/datafusion-distributed#180 Closes: #20418 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> Consider you have a plan with a `HashJoinExec` and `DataSourceExec` ``` HashJoinExec(dynamic_filter_1 on a@0) (...left side of join) ProjectionExec(a := Column("a", source_index)) DataSourceExec ParquetSource(predicate = dynamic_filter_2) ``` You serialize the plan, deserialize it, and execute it. What should happen is that the dynamic filter should "work", meaning: 1. When you deserialize the plan, both the `HashJoinExec` and `DataSourceExec` should have pointers to the same `DynamicFilterPhysicalExpr` 2. The `DynamicFilterPhysicalExpr` should be updated during execution by the `HashJoinExec` and the `DataSourceExec` should filter out rows This does not happen today for a few reasons, a couple of which this PR aims to address 1. `DynamicFilterPhysicalExpr` is not survive round-tripping. The internal exprs get inlined (ex. it may be serialized as `Literal`) due to the `PhysicalExpr::snapshot()` API 2. Even if `DynamicFilterPhysicalExpr` survives round-tripping, the one pushed down to the `DataSourceExec` often has different children. In this case, you have two `DynamicFilterPhysicalExpr` which do not survive deduping, causing referential integrity to be lost. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> This PR aims to fix those problems by: 1. Removing the `snapshot()` call from the serialization process 2. Adding protos for `DynamicFilterPhysicalExpr` so it can be serialized and deserialized 3. Removing `Arc`-based deduplication. We now only dedupe on `expression_id` if the `PhysicalExpr` reports a `expression_id`. After this change, only `DynamicFilterPhysicalExpr` reports an `expression_id` to be deduped. 4. `expression_id` is now just a random u64. Since a given query likely only has a few `DynamicFilterPhysicalExpr` instances, the odds of a collision are very low 5. There's no need for a `DedupingSerializer` anymore since the `expression_id` is already stored in the dynamic filter proto itself Future work: 1. Serialize dynamic filters in `HashJoinExec`, `AggregateExec` and `SortExec` 2. Add tests which actually execute plans after deserialization and assert that dynamic filtering is functional 3. Add proto converters to the `PhysicalExtensionCodec` trait so implementors can utilize deduping logic ## Are these changes tested? - adds tests which roundtrip dynamic filters and assert that referential integrity is maintained - removes tests that test `Arc`-based deduplication and session id rotation since we don't support that anymore ## Are there any user-facing changes? - The default codec does not call `snapshot()` on `PhysicalExpr` during serialization anymore. This means that `DynamicFilterPhysicalExpr` are now serialized and deserialized without snapshotting. - All `PhysicalExpr` are not deduped anymore. Only `DynamicFilterPhysicalExpr` is --------- Co-authored-by: Dmitrii Blaginin <dmitrii@blaginin.me>
1 parent bb86364 commit 948cd09

12 files changed

Lines changed: 860 additions & 522 deletions

File tree

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/physical-expr-common/src/physical_expr.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,9 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
161161
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>>;
162162

163163
/// Returns a new PhysicalExpr where all children were replaced by new exprs.
164+
///
165+
/// If the implementation returns a [`PhysicalExpr::expression_id`], then
166+
/// the identifier should be preserved by the new expression.
164167
fn with_new_children(
165168
self: Arc<Self>,
166169
children: Vec<Arc<dyn PhysicalExpr>>,
@@ -444,6 +447,23 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
444447
fn placement(&self) -> ExpressionPlacement {
445448
ExpressionPlacement::KeepInPlace
446449
}
450+
451+
/// Return a stable, globally-unique identifier for this [`PhysicalExpr`], if it
452+
/// has one.
453+
///
454+
/// This identifier tracks which expressions which are connected (e.g. `DynamicFilterPhysicalExpr`
455+
/// where two expressions may be different but store the same mutable inner state). Tracking
456+
/// connected expressions helps preserve referential integrity within plan nodes
457+
/// during serialization and deserialization.
458+
///
459+
/// This id must be preserved across [`PhysicalExpr::with_new_children`] or any other
460+
/// methods which may want to preserve identity.
461+
///
462+
/// Default is `None`: the expression has no identity worth preserving across a
463+
/// serialization boundary.
464+
fn expression_id(&self) -> Option<u64> {
465+
None
466+
}
447467
}
448468

449469
#[deprecated(

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 249 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
use parking_lot::RwLock;
19+
use std::sync::atomic::{AtomicU64, Ordering};
1920
use std::{fmt::Display, hash::Hash, sync::Arc};
2021
use tokio::sync::watch;
2122

@@ -76,21 +77,51 @@ pub struct DynamicFilterPhysicalExpr {
7677
nullable: Arc<RwLock<Option<bool>>>,
7778
}
7879

79-
#[derive(Debug)]
80-
struct Inner {
80+
/// Atomic internal state of a [`DynamicFilterPhysicalExpr`].
81+
///
82+
/// `expression_id` lives here because it identifies the actual filter expression `expr`.
83+
/// Derived `DynamicFilterPhysicalExpr`s (e.g. via [`PhysicalExpr::with_new_children`]) are
84+
/// the same logical filter and must report the same `expression_id`.
85+
///
86+
/// **Warning:** exposed publicly solely so that proto (de)serialization in
87+
/// `datafusion-proto` can read and rebuild this state. Do not treat this type
88+
/// or its layout as a stable API.
89+
#[derive(Clone)]
90+
pub struct Inner {
91+
/// A unique identifier for the expression.
92+
pub expression_id: u64,
8193
/// A counter that gets incremented every time the expression is updated so that we can track changes cheaply.
8294
/// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes.
83-
generation: u64,
84-
expr: Arc<dyn PhysicalExpr>,
95+
pub generation: u64,
96+
pub expr: Arc<dyn PhysicalExpr>,
8597
/// Flag for quick synchronous check if filter is complete.
8698
/// This is redundant with the watch channel state, but allows us to return immediately
8799
/// from `wait_complete()` without subscribing if already complete.
88-
is_complete: bool,
100+
pub is_complete: bool,
101+
}
102+
103+
// TODO: Include expression_id in Debug output.
104+
//
105+
// See https://github.com/apache/datafusion/issues/20418. Currently, plan nodes
106+
// like `HashJoinExec`, `AggregateExec`, `SortExec` do not serialize their
107+
// dynamic filter. They auto-create one on decode with a fresh `expression_id`,
108+
// so a round-trip Debug comparison would diverge purely on the id even when
109+
// the rest of the state is preserved. Excluding it from Debug keeps those
110+
// roundtrip equality assertions meaningful until that work lands.
111+
impl std::fmt::Debug for Inner {
112+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113+
f.debug_struct("Inner")
114+
.field("generation", &self.generation)
115+
.field("expr", &self.expr)
116+
.field("is_complete", &self.is_complete)
117+
.finish()
118+
}
89119
}
90120

91121
impl Inner {
92122
fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
93123
Self {
124+
expression_id: EXPR_ID_SOURCE.next(),
94125
// Start with generation 1 which gives us a different result for [`PhysicalExpr::generation`] than the default 0.
95126
// This is not currently used anywhere but it seems useful to have this simple distinction.
96127
generation: 1,
@@ -243,6 +274,8 @@ impl DynamicFilterPhysicalExpr {
243274
let mut current = self.inner.write();
244275
let new_generation = current.generation + 1;
245276
*current = Inner {
277+
// Preserve the expression id across updates.
278+
expression_id: current.expression_id,
246279
generation: new_generation,
247280
expr: new_expr,
248281
is_complete: current.is_complete,
@@ -346,6 +379,62 @@ impl DynamicFilterPhysicalExpr {
346379

347380
write!(f, " ]")
348381
}
382+
383+
/// Return the filter's original children (before any remapping).
384+
///
385+
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
386+
/// Not a stable API.
387+
pub fn original_children(&self) -> &[Arc<dyn PhysicalExpr>] {
388+
&self.children
389+
}
390+
391+
/// Return the filter's remapped children, if any have been set via
392+
/// [`PhysicalExpr::with_new_children`].
393+
///
394+
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
395+
/// Not a stable API.
396+
pub fn remapped_children(&self) -> Option<&[Arc<dyn PhysicalExpr>]> {
397+
self.remapped_children.as_deref()
398+
}
399+
400+
/// Rebuild a `DynamicFilterPhysicalExpr` from its stored parts. Used by
401+
/// proto deserialization.
402+
///
403+
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
404+
/// Not a stable API.
405+
pub fn from_parts(
406+
children: Vec<Arc<dyn PhysicalExpr>>,
407+
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
408+
inner: Inner,
409+
) -> Self {
410+
let state = if inner.is_complete {
411+
FilterState::Complete {
412+
generation: inner.generation,
413+
}
414+
} else {
415+
FilterState::InProgress {
416+
generation: inner.generation,
417+
}
418+
};
419+
let (state_watch, _) = watch::channel(state);
420+
421+
Self {
422+
children,
423+
remapped_children,
424+
inner: Arc::new(RwLock::new(inner)),
425+
state_watch,
426+
data_type: Arc::new(RwLock::new(None)),
427+
nullable: Arc::new(RwLock::new(None)),
428+
}
429+
}
430+
431+
/// Return a clone of the atomically-captured `Inner` state.
432+
///
433+
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
434+
/// Not a stable API.
435+
pub fn inner(&self) -> Inner {
436+
self.inner.read().clone()
437+
}
349438
}
350439

351440
impl PhysicalExpr for DynamicFilterPhysicalExpr {
@@ -364,6 +453,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
364453
Ok(Arc::new(Self {
365454
children: self.children.clone(),
366455
remapped_children: Some(children),
456+
// Note: expression_id is preserved
367457
inner: Arc::clone(&self.inner),
368458
state_watch: self.state_watch.clone(),
369459
data_type: Arc::clone(&self.data_type),
@@ -444,8 +534,36 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
444534
// Return the current generation of the expression.
445535
self.inner.read().generation
446536
}
537+
538+
fn expression_id(&self) -> Option<u64> {
539+
Some(self.inner.read().expression_id)
540+
}
447541
}
448542

543+
/// An atomic counter used to generate monotonic u64 ids.
544+
struct ExpressionIdAtomicCounter {
545+
inner: AtomicU64,
546+
}
547+
548+
impl ExpressionIdAtomicCounter {
549+
const fn new() -> Self {
550+
Self {
551+
inner: AtomicU64::new(0),
552+
}
553+
}
554+
555+
/// Returns a fresh `expression_id` by incrementing the internal counter.
556+
fn next(&self) -> u64 {
557+
self.inner.fetch_add(1, Ordering::Relaxed)
558+
}
559+
}
560+
561+
/// Process-wide source of deterministic `expression_id`s for [`DynamicFilterPhysicalExpr`].
562+
///
563+
/// Currently, no other [`PhysicalExpr`]s use this source. If needed, it can be moved out of this
564+
/// file and be made public for other expressions to use.
565+
static EXPR_ID_SOURCE: ExpressionIdAtomicCounter = ExpressionIdAtomicCounter::new();
566+
449567
#[cfg(test)]
450568
mod test {
451569
use crate::{
@@ -861,4 +979,130 @@ mod test {
861979
"Hash should be stable after update (identity-based)"
862980
);
863981
}
982+
983+
/// Verifies that `from_parts` rebuilds a `DynamicFilterPhysicalExpr`
984+
/// whose observable state (original children, remapped children,
985+
/// expression id, inner generation/expr/is_complete) matches the source
986+
/// filter.
987+
#[test]
988+
fn test_from_parts_preserves_state() {
989+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
990+
let col_a = col("a", &schema).unwrap();
991+
992+
// Create a dynamic filter with children
993+
let expr = Arc::new(BinaryExpr::new(
994+
Arc::clone(&col_a),
995+
datafusion_expr::Operator::Gt,
996+
lit(10) as Arc<dyn PhysicalExpr>,
997+
));
998+
let filter = DynamicFilterPhysicalExpr::new(
999+
vec![Arc::clone(&col_a)],
1000+
expr as Arc<dyn PhysicalExpr>,
1001+
);
1002+
1003+
// Add remapped children.
1004+
let reassigned_schema = Arc::new(Schema::new(vec![
1005+
Field::new("b", DataType::Int32, false),
1006+
Field::new("a", DataType::Int32, false),
1007+
]));
1008+
let reassigned = reassign_expr_columns(
1009+
Arc::new(filter) as Arc<dyn PhysicalExpr>,
1010+
&reassigned_schema,
1011+
)
1012+
.expect("reassign_expr_columns should succeed");
1013+
let reassigned = reassigned
1014+
.downcast_ref::<DynamicFilterPhysicalExpr>()
1015+
.expect("Expected dynamic filter after reassignment");
1016+
1017+
reassigned
1018+
.update(lit(42) as Arc<dyn PhysicalExpr>)
1019+
.expect("Update should succeed");
1020+
reassigned.mark_complete();
1021+
1022+
// Capture the parts and reconstruct. `expression_id` rides in `inner`.
1023+
let reconstructed = DynamicFilterPhysicalExpr::from_parts(
1024+
reassigned.original_children().to_vec(),
1025+
reassigned.remapped_children().map(|r| r.to_vec()),
1026+
reassigned.inner(),
1027+
);
1028+
1029+
assert_eq!(
1030+
reassigned.original_children(),
1031+
reconstructed.original_children(),
1032+
);
1033+
assert_eq!(
1034+
reassigned.remapped_children(),
1035+
reconstructed.remapped_children(),
1036+
);
1037+
assert_eq!(reassigned.expression_id(), reconstructed.expression_id());
1038+
let r = reassigned.inner();
1039+
let c = reconstructed.inner();
1040+
assert_eq!(r.generation, c.generation);
1041+
assert_eq!(r.is_complete, c.is_complete);
1042+
assert_eq!(format!("{:?}", r.expr), format!("{:?}", c.expr));
1043+
}
1044+
1045+
#[tokio::test]
1046+
async fn test_expression_id() {
1047+
let source_schema =
1048+
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1049+
let col_a = col("a", &source_schema).unwrap();
1050+
1051+
// Create a source filter
1052+
let source = Arc::new(DynamicFilterPhysicalExpr::new(
1053+
vec![Arc::clone(&col_a)],
1054+
lit(true) as Arc<dyn PhysicalExpr>,
1055+
));
1056+
let source_clone = Arc::clone(&source);
1057+
1058+
// Create a derived filter by reassigning the source filter to a different schema.
1059+
let derived_schema = Arc::new(Schema::new(vec![
1060+
Field::new("x", DataType::Int32, false),
1061+
Field::new("a", DataType::Int32, false),
1062+
]));
1063+
let derived = reassign_expr_columns(
1064+
Arc::clone(&source) as Arc<dyn PhysicalExpr>,
1065+
&derived_schema,
1066+
)
1067+
.expect("reassign_expr_columns should succeed");
1068+
1069+
let derived_expression_id = derived
1070+
.expression_id()
1071+
.expect("derived filter should have an expression id");
1072+
let source_expression_id = source
1073+
.expression_id()
1074+
.expect("source filter should have an expression id");
1075+
let source_clone_expression_id = source_clone
1076+
.expression_id()
1077+
.expect("source clone should have an expression id");
1078+
1079+
assert_eq!(
1080+
source_clone_expression_id, source_expression_id,
1081+
"cloned filter should preserve its expression id",
1082+
);
1083+
1084+
assert_eq!(
1085+
derived_expression_id, source_expression_id,
1086+
"derived filters should carry forward the source expression id",
1087+
);
1088+
1089+
// `update()` rewrites the entire `Inner` struct in place; pin down
1090+
// that the rewrite preserves `expression_id`.
1091+
source
1092+
.update(lit(99) as Arc<dyn PhysicalExpr>)
1093+
.expect("update should succeed");
1094+
assert_eq!(
1095+
source.expression_id().unwrap(),
1096+
source_expression_id,
1097+
"update() must not change expression_id",
1098+
);
1099+
1100+
// `mark_complete()` also touches `Inner`; same invariant.
1101+
source.mark_complete();
1102+
assert_eq!(
1103+
source.expression_id().unwrap(),
1104+
source_expression_id,
1105+
"mark_complete() must not change expression_id",
1106+
);
1107+
}
8641108
}

datafusion/physical-expr/src/expressions/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ pub use case::{CaseExpr, case};
4545
pub use cast::{CastExpr, cast};
4646
pub use column::{Column, col, with_new_schema};
4747
pub use datafusion_expr::utils::format_state_name;
48-
pub use dynamic_filters::DynamicFilterPhysicalExpr;
48+
pub use dynamic_filters::{DynamicFilterPhysicalExpr, Inner as DynamicFilterInner};
4949
pub use in_list::{InListExpr, in_list};
5050
pub use is_not_null::{IsNotNullExpr, is_not_null};
5151
pub use is_null::{IsNullExpr, is_null};

datafusion/proto/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ datafusion-proto-common = { workspace = true }
6666
object_store = { workspace = true }
6767
pbjson = { workspace = true, optional = true }
6868
prost = { workspace = true }
69-
rand = { workspace = true }
7069
serde = { version = "1.0", optional = true }
7170
serde_json = { workspace = true, optional = true }
7271

0 commit comments

Comments
 (0)