Handle canceled partitioned hash join dynamic filters lazily#21666
Handle canceled partitioned hash join dynamic filters lazily#21666adriangb merged 18 commits intoapache:mainfrom
Conversation
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing codex/hash-join-empty-partition-reporting (d17d5e4) to 5c653be (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing codex/hash-join-empty-partition-reporting (d17d5e4) to 5c653be (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing codex/hash-join-empty-partition-reporting (d17d5e4) to 5c653be (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
run benchmark tpcds |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing codex/hash-join-empty-partition-reporting (d17d5e4) to 5c653be (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
query 72 took a bit of a hit here. Query72select i_item_desc
,w_warehouse_name
,d1.d_week_seq
,sum(case when p_promo_sk is null then 1 else 0 end) no_promo
,sum(case when p_promo_sk is not null then 1 else 0 end) promo
,count(*) total_cnt
from catalog_sales
join inventory on (cs_item_sk = inv_item_sk)
join warehouse on (w_warehouse_sk=inv_warehouse_sk)
join item on (i_item_sk = cs_item_sk)
join customer_demographics on (cs_bill_cdemo_sk = cd_demo_sk)
join household_demographics on (cs_bill_hdemo_sk = hd_demo_sk)
join date_dim d1 on (cs_sold_date_sk = d1.d_date_sk)
join date_dim d2 on (inv_date_sk = d2.d_date_sk)
join date_dim d3 on (cs_ship_date_sk = d3.d_date_sk)
left outer join promotion on (cs_promo_sk=p_promo_sk)
left outer join catalog_returns on (cr_item_sk = cs_item_sk and cr_order_number = cs_order_number)
where d1.d_week_seq = d2.d_week_seq
and inv_quantity_on_hand < cs_quantity
and d3.d_date > (d1.d_date + INTERVAL '5 days')
and hd_buy_potential = '1001-5000'
and d1.d_year = 2001
and cd_marital_status = 'M'
group by i_item_desc,w_warehouse_name,d1.d_week_seq
order by total_cnt desc, i_item_desc, w_warehouse_name, d_week_seq
limit 100; |
|
Yes. And I think this was another bandaid. But it's closer to the root cause than previous attempts. This has to do with cancellation when multiple joins are involved. TLDR I think what is happening is when you have multiple joins you end up with a tree of operators. One of the joins up higher in the tree hits the new optimization and aborts work, dropping tasks that would have polled downstream joins. But not the downstream join is stuck waiting for all of it's partition tasks to finish even though they never will. I think we were all operating under the assumption that the issue was within a single join operator but really it's an issue any time an upstream operator cancels on a join. I think the real solution is to track when a join build partition task gets dropped and report that to the dynamic filter building so that it doesn't wait for that partition to report. |
|
run benchmark tpcds |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing codex/hash-join-empty-partition-reporting (0584854) to 5c653be (merge-base) diff using: tpcds File an issue against this benchmark runner |
There was a problem hiding this comment.
Pull request overview
Fixes a hang in partitioned hash join dynamic filter coordination by allowing partitions that are dropped early to be treated as “canceled” and not block filter finalization (DataFusion issue #21625).
Changes:
- Add cancellation tracking for partitioned build-side reports and treat canceled partitions as
truein the synthesized partitioned dynamic filter. - Mark partitioned
HashJoinStreampartitions as canceled onDropwhen they never reported build data. - Add a regression test covering the early-completing
RightSemiparent join scenario that previously hung.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| datafusion/physical-plan/src/joins/hash_join/stream.rs | Track whether build info was reported and report partition cancellation to the coordinator on Drop. |
| datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs | Replace barrier-based coordination with explicit partition status + notify-based completion; synthesize filters that handle canceled partitions. |
| datafusion/physical-plan/src/joins/hash_join/exec.rs | Refactor dynamic-filter accumulator initialization and add a regression test for the cancellation/hang scenario. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| enum CompletionState { | ||
| Pending, | ||
| Finalizing, | ||
| Ready(std::result::Result<(), String>), |
There was a problem hiding this comment.
CompletionState::Ready stores errors as Result<(), String>, which forces later callers to lose the original DataFusionError variant/backtrace/context. Consider storing Result<(), Arc<DataFusionError>> (or datafusion_common::SharedResult<()>) in CompletionState instead, so you can propagate DataFusionError::Shared(...) to all waiters without stringifying.
| Ready(std::result::Result<(), String>), | |
| Ready(std::result::Result<(), Arc<datafusion_common::DataFusionError>>), |
| /// # Returns | ||
| /// * `Result<()>` - Ok if successful, Err if filter update failed or mode mismatch | ||
| pub(crate) async fn report_build_data(&self, data: PartitionBuildData) -> Result<()> { | ||
| // Store data in the accumulator | ||
| { | ||
| let finalize_input = { |
There was a problem hiding this comment.
The report_build_data doc comment still says "have reported (barrier wait)", but this method no longer uses tokio::sync::Barrier (it uses Notify/CompletionState). Please update the docs to match the current synchronization mechanism so the comment doesn’t mislead future changes.
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing codex/hash-join-empty-partition-reporting (7011c5d) to 5c653be (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing codex/hash-join-empty-partition-reporting (7011c5d) to 5c653be (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing codex/hash-join-empty-partition-reporting (7011c5d) to 5c653be (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
Okay I think this addresses the root cause with no performance regression or behavior changes. |
|
@RatulDawar could you let me know what you think of this solution? |
|
I'm not actually a committer @adriangb ... legal at work never got back to approving me :( In any case I'm not knowledgeable enough about this area of the code to be much help I'm afraid. |
Looks great now ! |
|
Great all we need is review from a committer now 😄 |
gabotechs
left a comment
There was a problem hiding this comment.
Looks good! left some comments mainly with questions for my understanding, as I'm not very familiar with this code.
| // Initialize build_accumulator lazily with runtime partition counts (only if enabled) | ||
| // Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing | ||
| let repartition_random_state = REPARTITION_RANDOM_STATE; | ||
| let on_right_exprs = self | ||
| .on | ||
| .iter() | ||
| .map(|(_, right_expr)| Arc::clone(right_expr)) | ||
| .collect::<Vec<_>>(); | ||
| let build_accumulator = enable_dynamic_filter_pushdown | ||
| .then(|| { | ||
| self.dynamic_filter.as_ref().map(|df| { | ||
| let filter = Arc::clone(&df.filter); | ||
| Some(Arc::clone(df.build_accumulator.get_or_init(|| { | ||
| Arc::new(SharedBuildAccumulator::new_from_partition_mode( |
There was a problem hiding this comment.
🤔 I'm trying to deduce what does this change do, but I'm seeing that it's not related with the fix and it's just a refactor? is that right or did I missed something?
| self.build_waiter = Some(OnceFut::new(async move { | ||
| acc.report_build_data(build_data).await | ||
| })); | ||
| self.build_reported = true; |
There was a problem hiding this comment.
I'm also trying to understand what exactly changed here, and it looks like the only logical change is that now there's a self.build_reported = true flag set, but everything else is pretty much a refactor. Does that sound right?
| loop { | ||
| let notified = { | ||
| let guard = self.inner.lock(); | ||
| match &guard.completion { | ||
| CompletionState::Ready(Ok(())) => return Ok(()), | ||
| CompletionState::Ready(Err(err)) => { | ||
| return Err(DataFusionError::Shared(Arc::clone(err))); | ||
| } | ||
| CompletionState::Pending | CompletionState::Finalizing => { | ||
| self.completion_notify.notified() | ||
| } | ||
| } | ||
| // Partitioned: CASE expression routing to per-partition filters | ||
| AccumulatedBuildData::Partitioned { partitions } => { | ||
| // Collect all partition data (should all be Some at this point) | ||
| let partition_data: Vec<_> = | ||
| partitions.iter().filter_map(|p| p.as_ref()).collect(); | ||
|
|
||
| if !partition_data.is_empty() { | ||
| // Build a CASE expression that combines range checks AND membership checks | ||
| // CASE (hash_repartition(join_keys) % num_partitions) | ||
| // WHEN 0 THEN (col >= min_0 AND col <= max_0 AND ...) AND membership_check_0 | ||
| // WHEN 1 THEN (col >= min_1 AND col <= max_1 AND ...) AND membership_check_1 | ||
| // ... | ||
| // ELSE false | ||
| // END | ||
|
|
||
| let num_partitions = partition_data.len(); | ||
|
|
||
| // Create base expression: hash_repartition(join_keys) % num_partitions | ||
| let routing_hash_expr = Arc::new(HashExpr::new( | ||
| self.on_right.clone(), | ||
| self.repartition_random_state.clone(), | ||
| "hash_repartition".to_string(), | ||
| )) | ||
| as Arc<dyn PhysicalExpr>; | ||
|
|
||
| let modulo_expr = Arc::new(BinaryExpr::new( | ||
| routing_hash_expr, | ||
| Operator::Modulo, | ||
| lit(ScalarValue::UInt64(Some(num_partitions as u64))), | ||
| )) | ||
| as Arc<dyn PhysicalExpr>; | ||
|
|
||
| // Create WHEN branches for each partition | ||
| let when_then_branches: Vec<( | ||
| Arc<dyn PhysicalExpr>, | ||
| Arc<dyn PhysicalExpr>, | ||
| )> = partitions | ||
| .iter() | ||
| .enumerate() | ||
| .filter_map(|(partition_id, partition_opt)| { | ||
| partition_opt.as_ref().and_then(|partition| { | ||
| // Skip empty partitions - they would always return false anyway | ||
| match &partition.pushdown { | ||
| PushdownStrategy::Empty => None, | ||
| _ => Some((partition_id, partition)), | ||
| } | ||
| }) | ||
| }) | ||
| .map(|(partition_id, partition)| -> Result<_> { | ||
| // WHEN partition_id | ||
| let when_expr = | ||
| lit(ScalarValue::UInt64(Some(partition_id as u64))); | ||
|
|
||
| // THEN: Combine bounds check AND membership predicate | ||
|
|
||
| // 1. Create membership predicate (InList for small build sides, hash lookup otherwise) | ||
| let membership_expr = create_membership_predicate( | ||
| &self.on_right, | ||
| partition.pushdown.clone(), | ||
| &HASH_JOIN_SEED, | ||
| self.probe_schema.as_ref(), | ||
| )?; | ||
|
|
||
| // 2. Create bounds check expression for this partition (if bounds available) | ||
| let bounds_expr = create_bounds_predicate( | ||
| &self.on_right, | ||
| &partition.bounds, | ||
| ); | ||
|
|
||
| // 3. Combine membership and bounds expressions | ||
| let then_expr = match (membership_expr, bounds_expr) { | ||
| (Some(membership), Some(bounds)) => { | ||
| // Both available: combine with AND | ||
| Arc::new(BinaryExpr::new( | ||
| bounds, | ||
| Operator::And, | ||
| membership, | ||
| )) | ||
| as Arc<dyn PhysicalExpr> | ||
| } | ||
| (Some(membership), None) => { | ||
| // Membership available but no bounds (e.g., unsupported data types) | ||
| membership | ||
| } | ||
| (None, Some(bounds)) => { | ||
| // Bounds available but no membership. | ||
| // This should be unreachable in practice: we can always push down a reference | ||
| // to the hash table. | ||
| // But it seems safer to handle it defensively. | ||
| bounds | ||
| } | ||
| (None, None) => { | ||
| // No filter for this partition - should not happen due to filter_map above | ||
| // but handle defensively by returning a "true" literal | ||
| lit(true) | ||
| } | ||
| }; | ||
|
|
||
| Ok((when_expr, then_expr)) | ||
| }) | ||
| .collect::<Result<Vec<_>>>()?; | ||
|
|
||
| // Optimize for single partition: skip CASE expression entirely | ||
| let filter_expr = if when_then_branches.is_empty() { | ||
| // All partitions are empty: no rows can match | ||
| lit(false) | ||
| } else if when_then_branches.len() == 1 { | ||
| // Single partition: just use the condition directly | ||
| // since hash % 1 == 0 always, the WHEN 0 branch will always match | ||
| Arc::clone(&when_then_branches[0].1) | ||
| } else { | ||
| // Multiple partitions: create CASE expression | ||
| Arc::new(CaseExpr::try_new( | ||
| Some(modulo_expr), | ||
| when_then_branches, | ||
| Some(lit(false)), // ELSE false | ||
| )?) as Arc<dyn PhysicalExpr> | ||
| }; | ||
| }; | ||
| notified.await; | ||
| } | ||
| } |
There was a problem hiding this comment.
I don't understand much the difference between the previous barrier and the current loop + Notify approach. At first sight, it looks like a convoluted way of re-implementing what the barrier, and the differential factor that seems key to solve the deadlock is calling report_canceled_partition on HashJoinStream drop.
Is it not possible to just gracefully handling the cancellation by doing something with the Barrier on drop?
There was a problem hiding this comment.
Yes this is a refactor from #21666 (comment)
| let mut guard = self.inner.lock(); | ||
| guard.completion = CompletionState::Ready(result); | ||
| drop(guard); | ||
| self.completion_notify.notify_waiters(); | ||
| } |
There was a problem hiding this comment.
A Mutex with shared state + a Notify for broadcasting update signals sounds essentially a reimplementation of tokio::sync::watch.
I don't have the full context so the current solution might actually be the best one, just want to double-check that other synchronization primitives where considered before choosing this one.
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_partitioned_dynamic_filter_reports_empty_canceled_partitions() |
There was a problem hiding this comment.
Double checked with main and indeed this test reproduces the error 👍
Replaces the manual PartitionBuildData construction + report_build_data call + build_reported flag set in collect_build_side with a single transition_after_build_collected method, making it impossible to forget to report build data when transitioning state. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Per review feedback, the hoisting of on_right_exprs above the SharedBuildAccumulator init was an unrelated cleanup (same Arc-clone and allocation cost) that distracted from the cancellation fix. Restore the two separate constructions to match main. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
5ad96d9 to
9023c19
Compare
Per review feedback, replace the hand-rolled Mutex<CompletionState> + Notify pair with a watch::Sender<Option<SharedResult<()>>> for the terminal completion broadcast. The mutex still guards the incremental per-partition accumulation (which needs partial writes) and the finalizer-election bool, but wait_for_completion collapses to a single rx.wait_for(|v| v.is_some()).await instead of a manual re-check loop. Net -8 lines and drops the three-state CompletionState enum. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This reverts commit 4e95802.
Clarify the role of the Notify primitive so the coordination pattern is obvious without reading the wait/finish methods: it wakes parked partitions once the elected finalizer has published CompletionState::Ready. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Thanks for the ping.
I dug into the lifecycle around reporting and cancellation, and there is a subtle timing issue that could reintroduce a hang. I left details inline. I also added a couple of suggestions that could help keep things easier to reason about going forward.
| self.build_waiter = Some(OnceFut::new(async move { | ||
| acc.report_build_data(build_data).await | ||
| })); | ||
| self.build_reported = true; |
There was a problem hiding this comment.
I think there is still a race here. build_reported gets flipped as soon as the OnceFut is created, but the future is lazy and only runs once wait_for_partition_bounds_report() actually polls it.
If a parent drops this stream after transition_after_build_collected() returns, but before the waiter is ever polled, Drop will skip report_canceled_partition() even though nothing was delivered to the coordinator. That feels like it recreates the original hang, just in a narrower timing window.
Would it make sense to only mark this as reported after the waiter completes successfully? Alternatively, maybe replace the bool with a small lifecycle state so Drop can still cancel something that was only scheduled but never observed.
| let bounds_expr = | ||
| create_bounds_predicate(&self.on_right, &partition_data.bounds); | ||
|
|
||
| if let Some(filter_expr) = match (membership_expr, bounds_expr) { |
There was a problem hiding this comment.
I noticed the “membership + bounds gives final predicate” logic now exists in both the CollectLeft and Partitioned finalize paths, with slightly different control flow.
It might be worth pulling that into a small helper so both branches stay aligned as the dynamic filter logic evolves. That would make future changes a bit safer.
| /// Optional future to signal when build information has been reported by all partitions | ||
| /// and the dynamic filter has been updated | ||
| build_waiter: Option<OnceFut<()>>, | ||
| /// Tracks whether this partition has already reported build information to the coordinator. |
There was a problem hiding this comment.
Once the blocking issue above is sorted out, this field might benefit from a stronger type than just bool.
Something like NotReported, ReportScheduled, and ReportDelivered would make the drop semantics much clearer and help avoid another mix-up between "scheduled" and "actually seen by the coordinator".
…lper Per PR apache#21666 review feedback: * Replace `build_reported: bool` with a `BuildReportState` enum (NotReported / ReportScheduled / ReportDelivered). `ReportDelivered` is only set after the build waiter's `get_shared` returns `Ok`, so `Drop` still cancels a partition whose report was merely scheduled but never observed by the coordinator. `store_canceled_partition` guards on `PartitionStatus::Pending`, so a late cancel arriving after the coordinator already saw the report is a harmless no-op. * Extract `combine_membership_and_bounds` so the CollectLeft and Partitioned arms of `SharedBuildAccumulator::build_filter` share a single (membership, bounds) -> predicate combinator. Partitioned keeps its `lit(true)` fallback via `unwrap_or_else`, preserving the existing CASE-branch shape optimizations. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The `BuildReportState::ReportScheduled` → `Drop` → `report_canceled_partition` path is correct only because `store_canceled_partition` no-ops when the partition is already `Reported`. Previously this invariant was implicit; add unit tests so a future refactor that, say, unconditionally overwrites the partition status can't silently reintroduce the hang. Also pin that `report_canceled_partition` is idempotent — a stray double-drop must not double-count `completed_partitions`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
kosiew
left a comment
There was a problem hiding this comment.
looks good to me
except a comment clean-up for Barrier.
There was a problem hiding this comment.
This should be updated since this PR replaced the Barrier-based coordination.
The synchronization-strategy comment still described a barrier even though this PR replaced it with a per-partition report counter, a CompletionState lifecycle finalized exactly once, and a Notify that wakes parked waiters. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Thanks for the review @kosiew ! @gabotechs I did try to use an existing primitive, but I couldn't quite get anything to fit the shape we need. If you or anyone else can improve it a followup is welcome, but I didn't want to hold up unblocking benchmarks. |
Which issue does this PR close?
Rationale for this change
Partitioned hash join dynamic filters assumed every build-side partition would eventually report build data to the shared coordinator. That assumption breaks when an upstream partitioned operator legally short-circuits and drops a child hash-join partition before it is ever polled far enough to report.
In the original reproducer, a parent
RightSemijoin completes early for partitions whose own build side is empty. That causes child partitioned hash-join streams to be dropped while still waiting to build/report their dynamic-filter contribution. Sibling partitions then wait forever for reports that will never arrive.What changes are included in this PR?
Droptruein the synthesized partitioned filter so they do not block completion or incorrectly filter probe rowsfalseAre these changes tested?
cargo fmt --allcargo test -p datafusion-physical-plan test_partitioned_dynamic_filter_reports_empty_canceled_partitions -- --nocapturecargo test -p datafusion --test core_integration physical_optimizer::filter_pushdown::test_hashjoin_dynamic_filter_pushdown_partitioned -- --nocapturecargo test -p datafusion --test core_integration physical_optimizer::filter_pushdown::test_hashjoin_dynamic_filter_pushdown_partitioned --features force_hash_collisions -- --nocapturetest_partitioned_dynamic_filter_reports_empty_canceled_partitionstimes out on the pre-fix revision and passes on this branchcargo clippy --all-targets --all-features -- -D warningsstill fails on an unrelated existing workspace lint indatafusion/expr/src/logical_plan/plan.rs:3773(clippy::mutable_key_type).Are there any user-facing changes?
No.