Skip to content

Commit 03f4947

Browse files
committed
Prepare hash join dynamic filter fix for review
1 parent 65483f8 commit 03f4947

3 files changed

Lines changed: 22 additions & 139 deletions

File tree

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5726,7 +5726,10 @@ mod tests {
57265726
async fn test_partitioned_dynamic_filter_reports_empty_canceled_partitions()
57275727
-> Result<()> {
57285728
let mut session_config = SessionConfig::default();
5729-
session_config.options_mut().optimizer.enable_dynamic_filter_pushdown = true;
5729+
session_config
5730+
.options_mut()
5731+
.optimizer
5732+
.enable_dynamic_filter_pushdown = true;
57305733
let task_ctx =
57315734
Arc::new(TaskContext::default().with_session_config(session_config));
57325735

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -983,6 +983,7 @@ impl ExecutionPlan for RepartitionExec {
983983
spill_metrics,
984984
input.schema(),
985985
);
986+
986987
// Get existing ordering to use for merging
987988
let sort_exprs = self.sort_exprs().cloned();
988989

@@ -1627,7 +1628,10 @@ impl PerPartitionStream {
16271628
// Poll the memory channel for next message
16281629
let value = match self.receiver.recv().poll_unpin(cx) {
16291630
Poll::Ready(v) => v,
1630-
Poll::Pending => return Poll::Pending,
1631+
Poll::Pending => {
1632+
// Nothing from channel, wait
1633+
return Poll::Pending;
1634+
}
16311635
};
16321636

16331637
match value {
@@ -1660,7 +1664,10 @@ impl PerPartitionStream {
16601664
// Continue to poll for more data from other partitions
16611665
continue;
16621666
}
1663-
None => return Poll::Ready(None),
1667+
None => {
1668+
// Channel closed unexpectedly
1669+
return Poll::Ready(None);
1670+
}
16641671
}
16651672
}
16661673
StreamState::ReadingSpilled => {
@@ -1673,8 +1680,15 @@ impl PerPartitionStream {
16731680
Poll::Ready(Some(Err(e))) => {
16741681
return Poll::Ready(Some(Err(e)));
16751682
}
1676-
Poll::Ready(None) => self.state = StreamState::ReadingMemory,
1677-
Poll::Pending => return Poll::Pending,
1683+
Poll::Ready(None) => {
1684+
// Spill stream ended, keep draining the memory channel
1685+
self.state = StreamState::ReadingMemory;
1686+
}
1687+
Poll::Pending => {
1688+
// Spilled batch not ready yet, must wait
1689+
// This preserves ordering by blocking until spill data arrives
1690+
return Poll::Pending;
1691+
}
16781692
}
16791693
}
16801694
}

report.md

Lines changed: 0 additions & 134 deletions
This file was deleted.

0 commit comments

Comments
 (0)