Skip to content

Commit b806117

Browse files
authored
Fix flaky SpillPool channel test by synchronizing reader and writer tasks (#19110)
## Which issue does this PR close? * Closes #19058. ## Rationale for this change The `spill_pool` channel test `test_reader_catches_up_to_writer` was flaky due to non-deterministic coordination between the reader and writer tasks. The test used time-based sleeps and polling on shared state to infer when the reader had started and when it had processed a batch. Under varying scheduler timing, this could cause the reader to miss events or observe them in a different order, leading to intermittent failures where the recorded event sequence did not match expectations (for example, observing `3` instead of `5` reads). Since this test verifies the correctness and wakeup behavior of the spill channel used by the spill pool, flakiness here undermines confidence in the spill mechanism and can cause spurious CI failures. This PR makes the test coordination explicit and deterministic using `oneshot` channels, and also improves the usage example for the spill channel to show how to run writer and reader concurrently in a robust way. ## What changes are included in this PR? 1. **Example: concurrent writer and reader usage** * Update the `spill_pool::channel` usage example to: * Spawn writer and reader tasks concurrently instead of only spawning the writer. * Use `writer.push_batch(&batch)?` so the example returns a `Result` and propagates errors correctly. * Explicitly `drop(writer)` at the end of the writer task to finalize the spill file and wake the reader. * Use `tokio::join!` to await both tasks and map join errors into `DataFusionError::Execution`. * Assert that the reader sees all expected batches (`batches_read == 5`). * The updated example better demonstrates the intended concurrent usage pattern of the spill channel and ensures the reader is correctly woken when the writer finishes. 2. **Test: make `test_reader_catches_up_to_writer` deterministic** * Introduce two `oneshot` channels: * `reader_waiting_tx/rx` to signal when the reader has started and is pending on its first `next()` call. * `first_read_done_tx/rx` to signal when the reader has completed processing the first batch. * In the reader task: * Record `ReadStart` and send on `reader_waiting_tx` before awaiting `reader.next()`. * After successfully reading and recording the first batch, send on `first_read_done_tx`. * Then read the second batch as before. * In the test body: * Wait on `reader_waiting_rx` instead of sleeping for a fixed duration, ensuring the reader is actually pending before writing the first batch. * After the first write, wait on `first_read_done_rx` before issuing the second write. * This establishes a precise and documented sequence of events: 1. Reader starts and pends on the first `next()`. 2. First write occurs, waking the reader. 3. Reader processes the first batch and signals completion. 4. Second write occurs. * With this explicit synchronization, the event ordering in the test is stable and no longer depends on scheduler timing or arbitrary sleeps, eliminating the flakiness. ## Are these changes tested? Yes. ``` for i in {1..200}; do echo "Run #$i started" cargo test -p datafusion-physical-plan --profile ci --doc -q || break echo "Run #$i completed" done ``` * The modified test `test_reader_catches_up_to_writer` continues to run as part of the existing `spill_pool` test suite, but now uses explicit synchronization instead of timing-based assumptions. * The test has been exercised repeatedly to confirm that: * The expected read/write event sequence is stable across runs. * The intermittent assertion failures (e.g., mismatched read counts such as `3` vs `5`) no longer occur. * The updated example code compiles and type-checks by returning `datafusion_common::Result` from both spawned tasks and from the combined `tokio::join!` result. ## Are there any user-facing changes? There are no behavior changes to the public API or spill pool semantics. * The spill channel and spill pool behavior remains the same at runtime. * Only the documentation/example and the internal test harness have been updated. * No configuration flags or public methods were added, removed, or changed, so there are no breaking changes or documentation requirements beyond what is already updated inline. ## LLM-generated code disclosure This PR includes LLM-generated code and comments. All LLM-generated content has been manually reviewed and tested.
1 parent 3a41cc6 commit b806117

1 file changed

Lines changed: 40 additions & 27 deletions

File tree

datafusion/physical-plan/src/spill/spill_pool.rs

Lines changed: 40 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -384,28 +384,33 @@ impl Drop for SpillPoolWriter {
384384
/// // Create channel with 1MB file size limit
385385
/// let (writer, mut reader) = spill_pool::channel(1024 * 1024, spill_manager);
386386
///
387-
/// // Spawn writer task to produce batches
388-
/// let write_handle = tokio::spawn(async move {
387+
/// // Spawn writer and reader concurrently; writer wakes reader via wakers
388+
/// let writer_task = tokio::spawn(async move {
389389
/// for i in 0..5 {
390390
/// let array: ArrayRef = Arc::new(Int32Array::from(vec![i; 100]));
391391
/// let batch = RecordBatch::try_new(schema.clone(), vec![array]).unwrap();
392-
/// writer.push_batch(&batch).unwrap();
392+
/// writer.push_batch(&batch)?;
393393
/// }
394-
/// // Writer dropped here, finalizing current file
394+
/// // Explicitly drop writer to finalize the spill file and wake the reader
395+
/// drop(writer);
396+
/// datafusion_common::Result::<()>::Ok(())
395397
/// });
396398
///
397-
/// // Reader consumes batches in FIFO order (can run concurrently with writer)
398-
/// let mut batches_read = 0;
399-
/// while let Some(result) = reader.next().await {
400-
/// let batch = result?;
401-
/// batches_read += 1;
402-
/// // Process batch...
403-
/// if batches_read == 5 {
404-
/// break; // Got all expected batches
399+
/// let reader_task = tokio::spawn(async move {
400+
/// let mut batches_read = 0;
401+
/// while let Some(result) = reader.next().await {
402+
/// let _batch = result?;
403+
/// batches_read += 1;
405404
/// }
406-
/// }
405+
/// datafusion_common::Result::<usize>::Ok(batches_read)
406+
/// });
407+
///
408+
/// let (writer_res, reader_res) = tokio::join!(writer_task, reader_task);
409+
/// writer_res
410+
/// .map_err(|e| datafusion_common::DataFusionError::Execution(e.to_string()))??;
411+
/// let batches_read = reader_res
412+
/// .map_err(|e| datafusion_common::DataFusionError::Execution(e.to_string()))??;
407413
///
408-
/// write_handle.await.unwrap();
409414
/// assert_eq!(batches_read, 5);
410415
/// # Ok(())
411416
/// # }
@@ -1173,6 +1178,9 @@ mod tests {
11731178
async fn test_reader_catches_up_to_writer() -> Result<()> {
11741179
let (writer, mut reader) = create_spill_channel(1024 * 1024);
11751180

1181+
let (reader_waiting_tx, reader_waiting_rx) = tokio::sync::oneshot::channel();
1182+
let (first_read_done_tx, first_read_done_rx) = tokio::sync::oneshot::channel();
1183+
11761184
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
11771185
enum ReadWriteEvent {
11781186
ReadStart,
@@ -1185,36 +1193,41 @@ mod tests {
11851193
let reader_events = Arc::clone(&events);
11861194
let reader_handle = SpawnedTask::spawn(async move {
11871195
reader_events.lock().push(ReadWriteEvent::ReadStart);
1196+
reader_waiting_tx
1197+
.send(())
1198+
.expect("reader_waiting channel closed unexpectedly");
11881199
let result = reader.next().await.unwrap().unwrap();
11891200
reader_events
11901201
.lock()
11911202
.push(ReadWriteEvent::Read(result.num_rows()));
1203+
first_read_done_tx
1204+
.send(())
1205+
.expect("first_read_done channel closed unexpectedly");
11921206
let result = reader.next().await.unwrap().unwrap();
11931207
reader_events
11941208
.lock()
11951209
.push(ReadWriteEvent::Read(result.num_rows()));
11961210
});
11971211

1198-
// Give reader time to start pending
1199-
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
1212+
// Wait until the reader is pending on the first batch
1213+
reader_waiting_rx
1214+
.await
1215+
.expect("reader should signal when waiting");
12001216

12011217
// Now write a batch (should wake the reader)
12021218
let batch = create_test_batch(0, 5);
12031219
events.lock().push(ReadWriteEvent::Write(batch.num_rows()));
12041220
writer.push_batch(&batch)?;
12051221

1206-
// Wait for the reader to process
1207-
let processed = async {
1208-
loop {
1209-
if events.lock().len() >= 3 {
1210-
break;
1211-
}
1212-
tokio::time::sleep(std::time::Duration::from_micros(500)).await;
1213-
}
1214-
};
1215-
tokio::time::timeout(std::time::Duration::from_secs(1), processed)
1222+
// Wait for the reader to finish the first read before allowing the
1223+
// second write. This ensures deterministic ordering of events:
1224+
// 1. The reader starts and pends on the first `next()`
1225+
// 2. The first write wakes the reader
1226+
// 3. The reader processes the first batch and signals completion
1227+
// 4. The second write is issued, ensuring consistent event ordering
1228+
first_read_done_rx
12161229
.await
1217-
.unwrap();
1230+
.expect("reader should signal when first read completes");
12181231

12191232
// Write another batch
12201233
let batch = create_test_batch(5, 10);

0 commit comments

Comments
 (0)