Skip to content

Commit f6c3410

Browse files
jayshrivastavaadriangb
authored andcommitted
address feedback
1 parent 41273a0 commit f6c3410

2 files changed

Lines changed: 39 additions & 81 deletions

File tree

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 17 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -961,12 +961,12 @@ mod test {
961961
);
962962
}
963963

964-
/// Verifies that reassigning (which triggers `with_new_children`) followed
965-
/// by rebuilding via `from_parts` preserves the filter's observable state
966-
/// (original children, remapped children, expression id, and inner
967-
/// generation/expr/is_complete).
964+
/// Verifies that `from_parts` rebuilds a `DynamicFilterPhysicalExpr`
965+
/// whose observable state (original children, remapped children,
966+
/// expression id, inner generation/expr/is_complete) matches the source
967+
/// filter.
968968
#[test]
969-
fn test_current_snapshot_roundtrip() {
969+
fn test_from_parts_preserves_state() {
970970
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
971971
let col_a = col("a", &schema).unwrap();
972972

@@ -981,13 +981,7 @@ mod test {
981981
expr as Arc<dyn PhysicalExpr>,
982982
);
983983

984-
// Update expression and mark complete
985-
filter
986-
.update(lit(42) as Arc<dyn PhysicalExpr>)
987-
.expect("Update should succeed");
988-
filter.mark_complete();
989-
990-
// Change the children of the expr.
984+
// Add remapped children.
991985
let reassigned_schema = Arc::new(Schema::new(vec![
992986
Field::new("b", DataType::Int32, false),
993987
Field::new("a", DataType::Int32, false),
@@ -1001,6 +995,11 @@ mod test {
1001995
.downcast_ref::<DynamicFilterPhysicalExpr>()
1002996
.expect("Expected dynamic filter after reassignment");
1003997

998+
reassigned
999+
.update(lit(42) as Arc<dyn PhysicalExpr>)
1000+
.expect("Update should succeed");
1001+
reassigned.mark_complete();
1002+
10041003
// Capture the parts and reconstruct.
10051004
let reconstructed = DynamicFilterPhysicalExpr::from_parts(
10061005
reassigned.expression_id,
@@ -1009,7 +1008,6 @@ mod test {
10091008
reassigned.inner(),
10101009
);
10111010

1012-
// Assert the reconstructed filter matches field-by-field.
10131011
assert_eq!(
10141012
reassigned.original_children(),
10151013
reconstructed.original_children(),
@@ -1018,21 +1016,12 @@ mod test {
10181016
reassigned.remapped_children(),
10191017
reconstructed.remapped_children(),
10201018
);
1021-
assert_eq!(
1022-
reassigned.expression_id().unwrap(),
1023-
reconstructed.expression_id().unwrap(),
1024-
);
1025-
let reassigned_inner = reassigned.inner();
1026-
let reconstructed_inner = reconstructed.inner();
1027-
assert_eq!(reassigned_inner.generation, reconstructed_inner.generation);
1028-
assert_eq!(
1029-
reassigned_inner.is_complete,
1030-
reconstructed_inner.is_complete
1031-
);
1032-
assert_eq!(
1033-
format!("{:?}", reassigned_inner.expr),
1034-
format!("{:?}", reconstructed_inner.expr),
1035-
);
1019+
assert_eq!(reassigned.expression_id(), reconstructed.expression_id());
1020+
let r = reassigned.inner();
1021+
let c = reconstructed.inner();
1022+
assert_eq!(r.generation, c.generation);
1023+
assert_eq!(r.is_complete, c.is_complete);
1024+
assert_eq!(format!("{:?}", r.expr), format!("{:?}", c.expr));
10361025
}
10371026

10381027
#[tokio::test]

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 22 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -2871,39 +2871,25 @@ fn assert_dynamic_filter_update_is_visible(
28712871
Ok(())
28722872
}
28732873

2874-
/// Format the observable state of a `DynamicFilterPhysicalExpr` for
2875-
/// equality comparison in roundtrip tests.
2876-
fn fmt_dynamic_filter(filter: &DynamicFilterPhysicalExpr) -> String {
2877-
let inner = filter.inner();
2878-
format!(
2879-
"DynamicFilter {{ children: {:?}, remapped_children: {:?}, expression_id: {}, generation: {}, inner_expr: {:?}, is_complete: {} }}",
2880-
filter.original_children(),
2881-
filter.remapped_children(),
2882-
filter
2883-
.expression_id()
2884-
.expect("DynamicFilterPhysicalExpr always has an expression_id"),
2885-
inner.generation,
2886-
inner.expr,
2887-
inner.is_complete,
2888-
)
2889-
}
2890-
2891-
/// Assert that the actual dynamic filter snapshot matches the expected one
2892-
/// after roundtrip.
2893-
fn assert_dynamic_filter_snapshot_matches(
2874+
/// Assert that two dynamic filters are equal both structurally (Debug output)
2875+
/// and by identity (`expression_id`).
2876+
///
2877+
fn assert_dynamic_filters_equal(
28942878
expected: &Arc<dyn PhysicalExpr>,
28952879
actual: &Arc<dyn PhysicalExpr>,
28962880
) {
2897-
let expected_filter = expected
2898-
.downcast_ref::<DynamicFilterPhysicalExpr>()
2899-
.expect("Expected dynamic filter");
2900-
let actual_filter = actual
2901-
.downcast_ref::<DynamicFilterPhysicalExpr>()
2902-
.expect("Expected dynamic filter");
2903-
2904-
let actual_snapshot = fmt_dynamic_filter(actual_filter);
2905-
let expected_snapshot = fmt_dynamic_filter(expected_filter);
2906-
if expected_snapshot == actual_snapshot {
2881+
// TODO: Debug currently omits `expression_id` so the id has to be checked
2882+
// separately here. Once plan nodes like `SortExec` / `AggregateExec` /
2883+
// `HashJoinExec` serialize their own dynamic filter, Debug can include
2884+
// `expression_id`.
2885+
//
2886+
// See https://github.com/apache/datafusion/issues/20418
2887+
assert_eq!(expected.expression_id(), actual.expression_id());
2888+
2889+
// Structural.
2890+
let expected_dbg = format!("{expected:?}");
2891+
let actual_dbg = format!("{actual:?}");
2892+
if expected_dbg == actual_dbg {
29072893
return;
29082894
}
29092895

@@ -2913,12 +2899,7 @@ fn assert_dynamic_filter_snapshot_matches(
29132899
let rewritten = Arc::clone(expected)
29142900
.with_new_children(expected.children().iter().map(|c| Arc::clone(c)).collect())
29152901
.expect("with_new_children on a dynamic filter should not fail");
2916-
let rewritten_snapshot = fmt_dynamic_filter(
2917-
rewritten
2918-
.downcast_ref::<DynamicFilterPhysicalExpr>()
2919-
.expect("with_new_children returns a DynamicFilterPhysicalExpr"),
2920-
);
2921-
assert_eq!(rewritten_snapshot, actual_snapshot);
2902+
assert_eq!(format!("{rewritten:?}"), actual_dbg);
29222903
}
29232904

29242905
// Two clones of a dynamic filter expression should be deduped to the exact same expression.
@@ -2936,15 +2917,9 @@ fn test_dynamic_filter_roundtrip_dedupe() -> Result<()> {
29362917
)?;
29372918

29382919
// Assert the filters are not modified during roundtrip.
2939-
assert_dynamic_filter_snapshot_matches(
2940-
&filter_expr_1,
2941-
&filter_expr_1_after_roundtrip,
2942-
);
2943-
assert_dynamic_filter_snapshot_matches(
2944-
&filter_expr_2,
2945-
&filter_expr_2_after_roundtrip,
2946-
);
2947-
assert_dynamic_filter_snapshot_matches(
2920+
assert_dynamic_filters_equal(&filter_expr_1, &filter_expr_1_after_roundtrip);
2921+
assert_dynamic_filters_equal(&filter_expr_2, &filter_expr_2_after_roundtrip);
2922+
assert_dynamic_filters_equal(
29482923
&filter_expr_1_after_roundtrip,
29492924
&filter_expr_2_after_roundtrip,
29502925
);
@@ -2970,14 +2945,8 @@ fn test_dynamic_filter_plan_roundtrip_dedupe() -> Result<()> {
29702945
) = roundtrip_dynamic_filter_plan_pair()?;
29712946

29722947
// Assert the filters are not modified during roundtrip.
2973-
assert_dynamic_filter_snapshot_matches(
2974-
&filter_expr_1,
2975-
&filter_expr_1_after_roundtrip,
2976-
);
2977-
assert_dynamic_filter_snapshot_matches(
2978-
&filter_expr_2,
2979-
&filter_expr_2_after_roundtrip,
2980-
);
2948+
assert_dynamic_filters_equal(&filter_expr_1, &filter_expr_1_after_roundtrip);
2949+
assert_dynamic_filters_equal(&filter_expr_2, &filter_expr_2_after_roundtrip);
29812950

29822951
// Assert referential integrity.
29832952
assert_dynamic_filter_update_is_visible(

0 commit comments

Comments
 (0)