Skip to content

Commit 5c92944

Browse files
address feedback
1 parent 6311fa3 commit 5c92944

9 files changed

Lines changed: 234 additions & 210 deletions

File tree

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 102 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -80,72 +80,21 @@ pub struct DynamicFilterPhysicalExpr {
8080
nullable: Arc<RwLock<Option<bool>>>,
8181
}
8282

83-
#[derive(Debug)]
84-
struct Inner {
83+
/// Atomic internal state of a [`DynamicFilterPhysicalExpr`].
84+
///
85+
/// **Warning:** exposed publicly solely so that proto (de)serialization in
86+
/// `datafusion-proto` can read and rebuild this state. Do not treat this type
87+
/// or its layout as a stable API.
88+
#[derive(Debug, Clone)]
89+
pub struct Inner {
8590
/// A counter that gets incremented every time the expression is updated so that we can track changes cheaply.
8691
/// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes.
87-
generation: u64,
88-
expr: Arc<dyn PhysicalExpr>,
92+
pub generation: u64,
93+
pub expr: Arc<dyn PhysicalExpr>,
8994
/// Flag for quick synchronous check if filter is complete.
9095
/// This is redundant with the watch channel state, but allows us to return immediately
9196
/// from `wait_complete()` without subscribing if already complete.
92-
is_complete: bool,
93-
}
94-
95-
/// An atomic snapshot of a [`DynamicFilterPhysicalExpr`] used to reconstruct the expression during
96-
/// serialization / deserialization.
97-
pub struct DynamicFilterSnapshot {
98-
children: Vec<Arc<dyn PhysicalExpr>>,
99-
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
100-
expression_id: u64,
101-
// Inner state.
102-
generation: u64,
103-
inner_expr: Arc<dyn PhysicalExpr>,
104-
is_complete: bool,
105-
}
106-
107-
impl DynamicFilterSnapshot {
108-
pub fn new(
109-
expression_id: u64,
110-
children: Vec<Arc<dyn PhysicalExpr>>,
111-
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
112-
generation: u64,
113-
inner_expr: Arc<dyn PhysicalExpr>,
114-
is_complete: bool,
115-
) -> Self {
116-
Self {
117-
children,
118-
remapped_children,
119-
expression_id,
120-
generation,
121-
inner_expr,
122-
is_complete,
123-
}
124-
}
125-
126-
pub fn children(&self) -> &[Arc<dyn PhysicalExpr>] {
127-
&self.children
128-
}
129-
130-
pub fn expression_id(&self) -> u64 {
131-
self.expression_id
132-
}
133-
134-
pub fn remapped_children(&self) -> Option<&[Arc<dyn PhysicalExpr>]> {
135-
self.remapped_children.as_deref()
136-
}
137-
138-
pub fn generation(&self) -> u64 {
139-
self.generation
140-
}
141-
142-
pub fn inner_expr(&self) -> &Arc<dyn PhysicalExpr> {
143-
&self.inner_expr
144-
}
145-
146-
pub fn is_complete(&self) -> bool {
147-
self.is_complete
148-
}
97+
pub is_complete: bool,
14998
}
15099

151100
// TODO: Include expression_id in debug output.
@@ -167,73 +116,6 @@ impl std::fmt::Debug for DynamicFilterPhysicalExpr {
167116
}
168117
}
169118

170-
impl Display for DynamicFilterSnapshot {
171-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172-
write!(
173-
f,
174-
"DynamicFilterSnapshot {{ children: {:?}, remapped_children: {:?}, expression_id: {}, generation: {}, inner_expr: {:?}, is_complete: {} }}",
175-
self.children,
176-
self.remapped_children,
177-
self.expression_id,
178-
self.generation,
179-
self.inner_expr,
180-
self.is_complete
181-
)
182-
}
183-
}
184-
185-
impl From<DynamicFilterSnapshot> for DynamicFilterPhysicalExpr {
186-
fn from(snapshot: DynamicFilterSnapshot) -> Self {
187-
let DynamicFilterSnapshot {
188-
children,
189-
remapped_children,
190-
expression_id,
191-
generation,
192-
inner_expr,
193-
is_complete,
194-
} = snapshot;
195-
196-
let state = if is_complete {
197-
FilterState::Complete { generation }
198-
} else {
199-
FilterState::InProgress { generation }
200-
};
201-
let (state_watch, _) = watch::channel(state);
202-
203-
Self {
204-
children,
205-
remapped_children,
206-
expression_id,
207-
inner: Arc::new(RwLock::new(Inner {
208-
generation,
209-
expr: inner_expr,
210-
is_complete,
211-
})),
212-
state_watch,
213-
data_type: Arc::new(RwLock::new(None)),
214-
nullable: Arc::new(RwLock::new(None)),
215-
}
216-
}
217-
}
218-
219-
impl From<&DynamicFilterPhysicalExpr> for DynamicFilterSnapshot {
220-
fn from(expr: &DynamicFilterPhysicalExpr) -> Self {
221-
// Snapshot everything in the mutex atomically.
222-
let (generation, inner_expr, is_complete) = {
223-
let inner = expr.inner.read();
224-
(inner.generation, Arc::clone(&inner.expr), inner.is_complete)
225-
};
226-
DynamicFilterSnapshot {
227-
children: expr.children.clone(),
228-
remapped_children: expr.remapped_children.clone(),
229-
expression_id: expr.expression_id,
230-
generation,
231-
inner_expr,
232-
is_complete,
233-
}
234-
}
235-
}
236-
237119
impl Inner {
238120
fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
239121
Self {
@@ -495,9 +377,67 @@ impl DynamicFilterPhysicalExpr {
495377
}
496378

497379
/// Generate a new expression id for this filter.
498-
pub fn new_expression_id() -> u64 {
380+
fn new_expression_id() -> u64 {
499381
random::<u64>()
500382
}
383+
384+
/// Return the filter's original children (before any remapping).
385+
pub fn original_children(&self) -> &[Arc<dyn PhysicalExpr>] {
386+
&self.children
387+
}
388+
389+
/// Return the filter's remapped children, if any have been set via
390+
/// [`PhysicalExpr::with_new_children`].
391+
pub fn remapped_children(&self) -> Option<&[Arc<dyn PhysicalExpr>]> {
392+
self.remapped_children.as_deref()
393+
}
394+
395+
/// Rebuild a `DynamicFilterPhysicalExpr` from its stored parts. Used by
396+
/// proto deserialization to preserve `expression_id` across a roundtrip
397+
/// rather than minting a fresh one.
398+
///
399+
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
400+
/// Not a stable API.
401+
pub fn from_parts(
402+
expression_id: u64,
403+
children: Vec<Arc<dyn PhysicalExpr>>,
404+
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
405+
inner: Inner,
406+
) -> Self {
407+
let state = if inner.is_complete {
408+
FilterState::Complete {
409+
generation: inner.generation,
410+
}
411+
} else {
412+
FilterState::InProgress {
413+
generation: inner.generation,
414+
}
415+
};
416+
let (state_watch, _) = watch::channel(state);
417+
418+
Self {
419+
children,
420+
remapped_children,
421+
expression_id,
422+
inner: Arc::new(RwLock::new(inner)),
423+
state_watch,
424+
data_type: Arc::new(RwLock::new(None)),
425+
nullable: Arc::new(RwLock::new(None)),
426+
}
427+
}
428+
429+
/// Return a clone of the atomically-captured `Inner` state.
430+
///
431+
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
432+
/// Not a stable API.
433+
pub fn inner(&self) -> Inner {
434+
let guard = self.inner.read();
435+
Inner {
436+
generation: guard.generation,
437+
expr: Arc::clone(&guard.expr),
438+
is_complete: guard.is_complete,
439+
}
440+
}
501441
}
502442

503443
impl PhysicalExpr for DynamicFilterPhysicalExpr {
@@ -1021,6 +961,10 @@ mod test {
1021961
);
1022962
}
1023963

964+
/// Verifies that reassigning (which triggers `with_new_children`) followed
965+
/// by rebuilding via `from_parts` preserves the filter's observable state
966+
/// (original children, remapped children, expression id, and inner
967+
/// generation/expr/is_complete).
1024968
#[test]
1025969
fn test_current_snapshot_roundtrip() {
1026970
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
@@ -1057,14 +1001,37 @@ mod test {
10571001
.downcast_ref::<DynamicFilterPhysicalExpr>()
10581002
.expect("Expected dynamic filter after reassignment");
10591003

1060-
// Take a snapshot and reconstruct
1061-
let snapshot = DynamicFilterSnapshot::from(reassigned);
1062-
let reconstructed = DynamicFilterPhysicalExpr::from(snapshot);
1004+
// Capture the parts and reconstruct.
1005+
let reconstructed = DynamicFilterPhysicalExpr::from_parts(
1006+
reassigned.expression_id,
1007+
reassigned.original_children().to_vec(),
1008+
reassigned.remapped_children().map(|r| r.to_vec()),
1009+
reassigned.inner(),
1010+
);
10631011

1064-
// Assert snapshots are equal.
1012+
// Assert the reconstructed filter matches field-by-field.
1013+
assert_eq!(
1014+
reassigned.original_children(),
1015+
reconstructed.original_children(),
1016+
);
1017+
assert_eq!(
1018+
reassigned.remapped_children(),
1019+
reconstructed.remapped_children(),
1020+
);
1021+
assert_eq!(
1022+
reassigned.expression_id().unwrap(),
1023+
reconstructed.expression_id().unwrap(),
1024+
);
1025+
let reassigned_inner = reassigned.inner();
1026+
let reconstructed_inner = reconstructed.inner();
1027+
assert_eq!(reassigned_inner.generation, reconstructed_inner.generation);
1028+
assert_eq!(
1029+
reassigned_inner.is_complete,
1030+
reconstructed_inner.is_complete
1031+
);
10651032
assert_eq!(
1066-
DynamicFilterSnapshot::from(reassigned).to_string(),
1067-
DynamicFilterSnapshot::from(&reconstructed).to_string(),
1033+
format!("{:?}", reassigned_inner.expr),
1034+
format!("{:?}", reconstructed_inner.expr),
10681035
);
10691036
}
10701037

datafusion/physical-expr/src/expressions/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ pub use case::{CaseExpr, case};
4343
pub use cast::{CastExpr, cast};
4444
pub use column::{Column, col, with_new_schema};
4545
pub use datafusion_expr::utils::format_state_name;
46-
pub use dynamic_filters::{DynamicFilterPhysicalExpr, DynamicFilterSnapshot};
46+
pub use dynamic_filters::{DynamicFilterPhysicalExpr, Inner as DynamicFilterInner};
4747
pub use in_list::{InListExpr, in_list};
4848
pub use is_not_null::{IsNotNullExpr, is_not_null};
4949
pub use is_null::{IsNullExpr, is_null};

datafusion/proto/proto/datafusion.proto

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -875,8 +875,11 @@ message PhysicalExprNode {
875875
// Was date_time_interval_expr
876876
reserved 17;
877877

878-
// Was expr_id
879-
reserved 30;
878+
// Unique identifier for this expression to do deduplication during deserialization.
879+
// When serializing, this is set via `PhysicalExpr::expression_id`. When deserializing,
880+
// this id is used by the `DeduplicatingProtoConverter` to preserve referential
881+
// integrity across serde roundtrips for different expressions with the same id.
882+
optional uint64 expr_id = 30;
880883

881884
oneof ExprType {
882885
// column references
@@ -926,7 +929,6 @@ message PhysicalDynamicFilterNode {
926929
uint64 generation = 3;
927930
PhysicalExprNode inner_expr = 4;
928931
bool is_complete = 5;
929-
uint64 expression_id = 6;
930932
}
931933

932934
message PhysicalScalarUdfNode {

0 commit comments

Comments
 (0)