Skip to content

Commit 8875956

Browse files
adriangbclaude
andauthored
Handle canceled partitioned hash join dynamic filters lazily (#21666)
## Which issue does this PR close? - Closes #21625 ## Rationale for this change Partitioned hash join dynamic filters assumed every build-side partition would eventually report build data to the shared coordinator. That assumption breaks when an upstream partitioned operator legally short-circuits and drops a child hash-join partition before it is ever polled far enough to report. In the original reproducer, a parent `RightSemi` join completes early for partitions whose own build side is empty. That causes child partitioned hash-join streams to be dropped while still waiting to build/report their dynamic-filter contribution. Sibling partitions then wait forever for reports that will never arrive. ## What changes are included in this PR? - teach the shared partitioned dynamic-filter coordinator to distinguish terminal partition states: - reported build data - canceled before build data was known - mark unreported partitioned hash-join streams as canceled on `Drop` - treat canceled partitions as `true` in the synthesized partitioned filter so they do not block completion or incorrectly filter probe rows - preserve existing empty-partition behavior so known-empty partitions still contribute `false` - preserve the existing compact filter plan shapes when there are no canceled partitions, including the single-branch collapse used in hash-collision mode - add a regression test for the cancellation pattern that previously hung ## Are these changes tested? - `cargo fmt --all` - `cargo test -p datafusion-physical-plan test_partitioned_dynamic_filter_reports_empty_canceled_partitions -- --nocapture` - `cargo test -p datafusion --test core_integration physical_optimizer::filter_pushdown::test_hashjoin_dynamic_filter_pushdown_partitioned -- --nocapture` - `cargo test -p datafusion --test core_integration physical_optimizer::filter_pushdown::test_hashjoin_dynamic_filter_pushdown_partitioned --features force_hash_collisions -- --nocapture` - verified that `test_partitioned_dynamic_filter_reports_empty_canceled_partitions` times out on the pre-fix revision and passes on this branch `cargo clippy --all-targets --all-features -- -D warnings` still fails on an unrelated existing workspace lint in `datafusion/expr/src/logical_plan/plan.rs:3773` (`clippy::mutable_key_type`). ## Are there any user-facing changes? No. --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 8a45d02 commit 8875956

3 files changed

Lines changed: 695 additions & 302 deletions

File tree

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 196 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1316,6 +1316,33 @@ impl ExecutionPlan for HashJoinExec {
13161316
.with_category(MetricCategory::Rows)
13171317
.counter(ARRAY_MAP_CREATED_COUNT_METRIC_NAME, partition);
13181318

1319+
// Initialize build_accumulator lazily with runtime partition counts (only if enabled)
1320+
// Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing
1321+
let repartition_random_state = REPARTITION_RANDOM_STATE;
1322+
let build_accumulator = enable_dynamic_filter_pushdown
1323+
.then(|| {
1324+
self.dynamic_filter.as_ref().map(|df| {
1325+
let filter = Arc::clone(&df.filter);
1326+
let on_right = self
1327+
.on
1328+
.iter()
1329+
.map(|(_, right_expr)| Arc::clone(right_expr))
1330+
.collect::<Vec<_>>();
1331+
Some(Arc::clone(df.build_accumulator.get_or_init(|| {
1332+
Arc::new(SharedBuildAccumulator::new_from_partition_mode(
1333+
self.mode,
1334+
self.left.as_ref(),
1335+
self.right.as_ref(),
1336+
filter,
1337+
on_right,
1338+
repartition_random_state,
1339+
))
1340+
})))
1341+
})
1342+
})
1343+
.flatten()
1344+
.flatten();
1345+
13191346
let left_fut = match self.mode {
13201347
PartitionMode::CollectLeft => self.left_fut.try_once(|| {
13211348
let left_stream = self.left.execute(0, Arc::clone(&context))?;
@@ -1343,7 +1370,6 @@ impl ExecutionPlan for HashJoinExec {
13431370
let reservation =
13441371
MemoryConsumer::new(format!("HashJoinInput[{partition}]"))
13451372
.register(context.memory_pool());
1346-
13471373
OnceFut::new(collect_left_input(
13481374
self.random_state.random_state().clone(),
13491375
left_stream,
@@ -1368,33 +1394,6 @@ impl ExecutionPlan for HashJoinExec {
13681394

13691395
let batch_size = context.session_config().batch_size();
13701396

1371-
// Initialize build_accumulator lazily with runtime partition counts (only if enabled)
1372-
// Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing
1373-
let repartition_random_state = REPARTITION_RANDOM_STATE;
1374-
let build_accumulator = enable_dynamic_filter_pushdown
1375-
.then(|| {
1376-
self.dynamic_filter.as_ref().map(|df| {
1377-
let filter = Arc::clone(&df.filter);
1378-
let on_right = self
1379-
.on
1380-
.iter()
1381-
.map(|(_, right_expr)| Arc::clone(right_expr))
1382-
.collect::<Vec<_>>();
1383-
Some(Arc::clone(df.build_accumulator.get_or_init(|| {
1384-
Arc::new(SharedBuildAccumulator::new_from_partition_mode(
1385-
self.mode,
1386-
self.left.as_ref(),
1387-
self.right.as_ref(),
1388-
filter,
1389-
on_right,
1390-
repartition_random_state,
1391-
))
1392-
})))
1393-
})
1394-
})
1395-
.flatten()
1396-
.flatten();
1397-
13981397
// we have the batches and the hash map with their keys. We can how create a stream
13991398
// over the right that uses this information to issue new batches.
14001399
let right_stream = self.right.execute(partition, context)?;
@@ -2347,6 +2346,22 @@ mod tests {
23472346
right: Arc<dyn ExecutionPlan>,
23482347
on: JoinOn,
23492348
join_type: JoinType,
2349+
) -> Result<(HashJoinExec, Arc<DynamicFilterPhysicalExpr>)> {
2350+
hash_join_with_dynamic_filter_and_mode(
2351+
left,
2352+
right,
2353+
on,
2354+
join_type,
2355+
PartitionMode::CollectLeft,
2356+
)
2357+
}
2358+
2359+
fn hash_join_with_dynamic_filter_and_mode(
2360+
left: Arc<dyn ExecutionPlan>,
2361+
right: Arc<dyn ExecutionPlan>,
2362+
on: JoinOn,
2363+
join_type: JoinType,
2364+
mode: PartitionMode,
23502365
) -> Result<(HashJoinExec, Arc<DynamicFilterPhysicalExpr>)> {
23512366
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
23522367
let mut join = HashJoinExec::try_new(
@@ -2356,7 +2371,7 @@ mod tests {
23562371
None,
23572372
&join_type,
23582373
None,
2359-
PartitionMode::CollectLeft,
2374+
mode,
23602375
NullEquality::NullEqualsNothing,
23612376
false,
23622377
)?;
@@ -5628,6 +5643,158 @@ mod tests {
56285643
Ok(())
56295644
}
56305645

5646+
#[tokio::test]
5647+
async fn test_partitioned_dynamic_filter_reports_empty_canceled_partitions()
5648+
-> Result<()> {
5649+
let mut session_config = SessionConfig::default();
5650+
session_config
5651+
.options_mut()
5652+
.optimizer
5653+
.enable_dynamic_filter_pushdown = true;
5654+
let task_ctx =
5655+
Arc::new(TaskContext::default().with_session_config(session_config));
5656+
5657+
let child_left_schema = Arc::new(Schema::new(vec![
5658+
Field::new("child_left_payload", DataType::Int32, false),
5659+
Field::new("child_key", DataType::Int32, false),
5660+
Field::new("child_left_extra", DataType::Int32, false),
5661+
]));
5662+
let child_right_schema = Arc::new(Schema::new(vec![
5663+
Field::new("child_right_payload", DataType::Int32, false),
5664+
Field::new("child_right_key", DataType::Int32, false),
5665+
Field::new("child_right_extra", DataType::Int32, false),
5666+
]));
5667+
let parent_left_schema = Arc::new(Schema::new(vec![
5668+
Field::new("parent_payload", DataType::Int32, false),
5669+
Field::new("parent_key", DataType::Int32, false),
5670+
Field::new("parent_extra", DataType::Int32, false),
5671+
]));
5672+
5673+
let child_left: Arc<dyn ExecutionPlan> = TestMemoryExec::try_new_exec(
5674+
&[
5675+
vec![build_table_i32(
5676+
("child_left_payload", &vec![10]),
5677+
("child_key", &vec![0]),
5678+
("child_left_extra", &vec![100]),
5679+
)],
5680+
vec![build_table_i32(
5681+
("child_left_payload", &vec![11]),
5682+
("child_key", &vec![1]),
5683+
("child_left_extra", &vec![101]),
5684+
)],
5685+
vec![build_table_i32(
5686+
("child_left_payload", &vec![12]),
5687+
("child_key", &vec![2]),
5688+
("child_left_extra", &vec![102]),
5689+
)],
5690+
vec![build_table_i32(
5691+
("child_left_payload", &vec![13]),
5692+
("child_key", &vec![3]),
5693+
("child_left_extra", &vec![103]),
5694+
)],
5695+
],
5696+
Arc::clone(&child_left_schema),
5697+
None,
5698+
)?;
5699+
let child_right: Arc<dyn ExecutionPlan> = TestMemoryExec::try_new_exec(
5700+
&[
5701+
vec![build_table_i32(
5702+
("child_right_payload", &vec![20]),
5703+
("child_right_key", &vec![0]),
5704+
("child_right_extra", &vec![200]),
5705+
)],
5706+
vec![build_table_i32(
5707+
("child_right_payload", &vec![21]),
5708+
("child_right_key", &vec![1]),
5709+
("child_right_extra", &vec![201]),
5710+
)],
5711+
vec![build_table_i32(
5712+
("child_right_payload", &vec![22]),
5713+
("child_right_key", &vec![2]),
5714+
("child_right_extra", &vec![202]),
5715+
)],
5716+
vec![build_table_i32(
5717+
("child_right_payload", &vec![23]),
5718+
("child_right_key", &vec![3]),
5719+
("child_right_extra", &vec![203]),
5720+
)],
5721+
],
5722+
Arc::clone(&child_right_schema),
5723+
None,
5724+
)?;
5725+
let parent_left: Arc<dyn ExecutionPlan> = TestMemoryExec::try_new_exec(
5726+
&[
5727+
vec![build_table_i32(
5728+
("parent_payload", &vec![30]),
5729+
("parent_key", &vec![0]),
5730+
("parent_extra", &vec![300]),
5731+
)],
5732+
vec![RecordBatch::new_empty(Arc::clone(&parent_left_schema))],
5733+
vec![build_table_i32(
5734+
("parent_payload", &vec![32]),
5735+
("parent_key", &vec![2]),
5736+
("parent_extra", &vec![302]),
5737+
)],
5738+
vec![RecordBatch::new_empty(Arc::clone(&parent_left_schema))],
5739+
],
5740+
Arc::clone(&parent_left_schema),
5741+
None,
5742+
)?;
5743+
5744+
let child_on = vec![(
5745+
Arc::new(Column::new_with_schema("child_key", &child_left_schema)?) as _,
5746+
Arc::new(Column::new_with_schema(
5747+
"child_right_key",
5748+
&child_right_schema,
5749+
)?) as _,
5750+
)];
5751+
let (child_join, _child_dynamic_filter) = hash_join_with_dynamic_filter_and_mode(
5752+
child_left,
5753+
child_right,
5754+
child_on,
5755+
JoinType::Inner,
5756+
PartitionMode::Partitioned,
5757+
)?;
5758+
let child_join: Arc<dyn ExecutionPlan> = Arc::new(child_join);
5759+
5760+
let parent_on = vec![(
5761+
Arc::new(Column::new_with_schema("parent_key", &parent_left_schema)?) as _,
5762+
Arc::new(Column::new_with_schema("child_key", &child_join.schema())?) as _,
5763+
)];
5764+
let parent_join = HashJoinExec::try_new(
5765+
parent_left,
5766+
child_join,
5767+
parent_on,
5768+
None,
5769+
&JoinType::RightSemi,
5770+
None,
5771+
PartitionMode::Partitioned,
5772+
NullEquality::NullEqualsNothing,
5773+
false,
5774+
)?;
5775+
5776+
let batches = tokio::time::timeout(
5777+
std::time::Duration::from_secs(5),
5778+
crate::execution_plan::collect(Arc::new(parent_join), task_ctx),
5779+
)
5780+
.await
5781+
.expect("partitioned right-semi join should not hang")?;
5782+
5783+
assert_batches_sorted_eq!(
5784+
[
5785+
"+--------------------+-----------+------------------+---------------------+-----------------+-------------------+",
5786+
"| child_left_payload | child_key | child_left_extra | child_right_payload | child_right_key | child_right_extra |",
5787+
"+--------------------+-----------+------------------+---------------------+-----------------+-------------------+",
5788+
"| 10 | 0 | 100 | 20 | 0 | 200 |",
5789+
"| 12 | 2 | 102 | 22 | 2 | 202 |",
5790+
"+--------------------+-----------+------------------+---------------------+-----------------+-------------------+",
5791+
],
5792+
&batches
5793+
);
5794+
5795+
Ok(())
5796+
}
5797+
56315798
#[tokio::test]
56325799
async fn test_hash_join_skips_probe_on_empty_build_after_partition_bounds_report()
56335800
-> Result<()> {

0 commit comments

Comments
 (0)