Skip to content

Commit 1bd7082

Browse files
alambxanderbailey
andauthored
[branch-52] Fix repartition from dropping data when spilling (#20672) (#20777)
- Part of #20681 - Closes #20683 on branch-52 This PR: - Backports #20672 from @xanderbailey to the `branch-52` line Co-authored-by: Xander <zander181@googlemail.com>
1 parent 9797095 commit 1bd7082

1 file changed

Lines changed: 101 additions & 1 deletion

File tree

datafusion/physical-plan/src/spill/spill_pool.rs

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ struct SpillPoolShared {
6161
/// Writer's reference to the current file (shared by all cloned writers).
6262
/// Has its own lock to allow I/O without blocking queue access.
6363
current_write_file: Option<Arc<Mutex<ActiveSpillFileShared>>>,
64+
/// Number of active writer clones. Only when this reaches zero should
65+
/// `writer_dropped` be set to true. This prevents premature EOF signaling
66+
/// when one writer clone is dropped while others are still active.
67+
active_writer_count: usize,
6468
}
6569

6670
impl SpillPoolShared {
@@ -72,6 +76,7 @@ impl SpillPoolShared {
7276
waker: None,
7377
writer_dropped: false,
7478
current_write_file: None,
79+
active_writer_count: 1,
7580
}
7681
}
7782

@@ -97,7 +102,6 @@ impl SpillPoolShared {
97102
/// The writer automatically manages file rotation based on the `max_file_size_bytes`
98103
/// configured in [`channel`]. When the last writer clone is dropped, it finalizes the
99104
/// current file so readers can access all written data.
100-
#[derive(Clone)]
101105
pub struct SpillPoolWriter {
102106
/// Maximum size in bytes before rotating to a new file.
103107
/// Typically set from configuration `datafusion.execution.max_spill_file_size_bytes`.
@@ -106,6 +110,18 @@ pub struct SpillPoolWriter {
106110
shared: Arc<Mutex<SpillPoolShared>>,
107111
}
108112

113+
impl Clone for SpillPoolWriter {
114+
fn clone(&self) -> Self {
115+
// Increment the active writer count so that `writer_dropped` is only
116+
// set to true when the *last* clone is dropped.
117+
self.shared.lock().active_writer_count += 1;
118+
Self {
119+
max_file_size_bytes: self.max_file_size_bytes,
120+
shared: Arc::clone(&self.shared),
121+
}
122+
}
123+
}
124+
109125
impl SpillPoolWriter {
110126
/// Spills a batch to the pool, rotating files when necessary.
111127
///
@@ -233,6 +249,15 @@ impl Drop for SpillPoolWriter {
233249
fn drop(&mut self) {
234250
let mut shared = self.shared.lock();
235251

252+
shared.active_writer_count -= 1;
253+
let is_last_writer = shared.active_writer_count == 0;
254+
255+
if !is_last_writer {
256+
// Other writer clones are still active; do not finalize or
257+
// signal EOF to readers.
258+
return;
259+
}
260+
236261
// Finalize the current file when the last writer is dropped
237262
if let Some(current_file) = shared.current_write_file.take() {
238263
// Release shared lock before locking file
@@ -1343,6 +1368,81 @@ mod tests {
13431368
Ok(())
13441369
}
13451370

1371+
/// Verifies that the reader stays alive as long as any writer clone exists.
1372+
///
1373+
/// `SpillPoolWriter` is `Clone`, and in non-preserve-order repartitioning
1374+
/// mode multiple input partition tasks share clones of the same writer.
1375+
/// The reader must not see EOF until **all** clones have been dropped,
1376+
/// even if the queue is temporarily empty between writes from different
1377+
/// clones.
1378+
///
1379+
/// The test sequence is:
1380+
///
1381+
/// 1. writer1 writes a batch, then is dropped.
1382+
/// 2. The reader consumes that batch (queue is now empty).
1383+
/// 3. writer2 (still alive) writes a batch.
1384+
/// 4. The reader must see that batch.
1385+
/// 5. EOF is only signalled after writer2 is also dropped.
1386+
#[tokio::test]
1387+
async fn test_clone_drop_does_not_signal_eof_prematurely() -> Result<()> {
1388+
let (writer1, mut reader) = create_spill_channel(1024 * 1024);
1389+
let writer2 = writer1.clone();
1390+
1391+
// Synchronization: tell writer2 when it may proceed.
1392+
let (proceed_tx, proceed_rx) = tokio::sync::oneshot::channel::<()>();
1393+
1394+
// Spawn writer2 — it waits for the signal before writing.
1395+
let writer2_handle = SpawnedTask::spawn(async move {
1396+
proceed_rx.await.unwrap();
1397+
writer2.push_batch(&create_test_batch(10, 10)).unwrap();
1398+
// writer2 is dropped here (last clone → true EOF)
1399+
});
1400+
1401+
// Writer1 writes one batch, then drops.
1402+
writer1.push_batch(&create_test_batch(0, 10))?;
1403+
drop(writer1);
1404+
1405+
// Read writer1's batch.
1406+
let batch1 = reader.next().await.unwrap()?;
1407+
assert_eq!(batch1.num_rows(), 10);
1408+
let col = batch1
1409+
.column(0)
1410+
.as_any()
1411+
.downcast_ref::<Int32Array>()
1412+
.unwrap();
1413+
assert_eq!(col.value(0), 0);
1414+
1415+
// Signal writer2 to write its batch. It will execute when the
1416+
// current task yields (i.e. when reader.next() returns Pending).
1417+
proceed_tx.send(()).unwrap();
1418+
1419+
// The reader should wait (Pending) for writer2's data, not EOF.
1420+
let batch2 =
1421+
tokio::time::timeout(std::time::Duration::from_secs(5), reader.next())
1422+
.await
1423+
.expect("Reader timed out — should not hang");
1424+
1425+
assert!(
1426+
batch2.is_some(),
1427+
"Reader must not return EOF while a writer clone is still alive"
1428+
);
1429+
let batch2 = batch2.unwrap()?;
1430+
assert_eq!(batch2.num_rows(), 10);
1431+
let col = batch2
1432+
.column(0)
1433+
.as_any()
1434+
.downcast_ref::<Int32Array>()
1435+
.unwrap();
1436+
assert_eq!(col.value(0), 10);
1437+
1438+
writer2_handle.await.unwrap();
1439+
1440+
// All writers dropped — reader should see real EOF now.
1441+
assert!(reader.next().await.is_none());
1442+
1443+
Ok(())
1444+
}
1445+
13461446
#[tokio::test]
13471447
async fn test_disk_usage_decreases_as_files_consumed() -> Result<()> {
13481448
use datafusion_execution::runtime_env::RuntimeEnvBuilder;

0 commit comments

Comments
 (0)