Skip to content

Commit 6aa5a7e

Browse files
authored
refactor: Share left-side spill file across partitions on OOM fallback (#21699)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #. ## Rationale for this change To reduce the redundant re-execution of the left side during OOM fallback. ## What changes are included in this PR? Previously when OnceFut failed with OOM, each partition independently re-executed the left child to get its own stream. This was redundant since all partitions need the same left data. Now the first partition to initiate fallback spills the entire left side to disk via a shared OnceAsync<LeftSpillData>. Other partitions wait on the same future and read from the shared spill file, avoiding redundant re-execution of the left child. Co-authored-by: Claude Code ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Unit test ## Are there any user-facing changes? No <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 1fbbba5 commit 6aa5a7e

2 files changed

Lines changed: 135 additions & 32 deletions

File tree

datafusion/physical-plan/src/joins/nested_loop_join.rs

Lines changed: 131 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,13 @@ pub struct NestedLoopJoinExec {
207207
/// Each output stream waits on the `OnceAsync` to signal the completion of
208208
/// the build(left) side data, and buffer them all for later joining.
209209
build_side_data: OnceAsync<JoinLeftData>,
210+
/// Shared left-side spill data for OOM fallback.
211+
///
212+
/// When `build_side_data` fails with OOM, the first partition to
213+
/// initiate fallback spills the entire left side to disk. Other
214+
/// partitions share the same spill file via this `OnceAsync`,
215+
/// avoiding redundant re-execution of the left child.
216+
left_spill_data: Arc<OnceAsync<LeftSpillData>>,
210217
/// Information of index and left / right placement of columns
211218
column_indices: Vec<ColumnIndex>,
212219
/// Projection to apply to the output of the join
@@ -290,6 +297,7 @@ impl NestedLoopJoinExecBuilder {
290297
join_type,
291298
join_schema,
292299
build_side_data: Default::default(),
300+
left_spill_data: Arc::new(OnceAsync::default()),
293301
column_indices,
294302
projection,
295303
metrics: Default::default(),
@@ -492,6 +500,7 @@ impl NestedLoopJoinExec {
492500
right,
493501
metrics: ExecutionPlanMetricsSet::new(),
494502
build_side_data: Default::default(),
503+
left_spill_data: Arc::new(OnceAsync::default()),
495504
cache: Arc::clone(&self.cache),
496505
filter: self.filter.clone(),
497506
join_type: self.join_type,
@@ -655,6 +664,7 @@ impl ExecutionPlan for NestedLoopJoinExec {
655664
SpillState::Pending {
656665
left_plan: Arc::clone(&self.left),
657666
task_context: Arc::clone(&context),
667+
left_spill_data: Arc::clone(&self.left_spill_data),
658668
}
659669
} else {
660670
SpillState::Disabled
@@ -863,6 +873,20 @@ enum NLJState {
863873
EmitLeftUnmatched,
864874
Done,
865875
}
876+
/// Shared data for the left-side spill fallback.
877+
///
878+
/// When the in-memory `OnceFut` path fails with OOM, the first partition
879+
/// spills the entire left side to disk. This struct holds the spill file
880+
/// reference so other partitions can read from the same file.
881+
pub(crate) struct LeftSpillData {
882+
/// SpillManager used to read the spill file (has the left schema)
883+
spill_manager: SpillManager,
884+
/// The spill file containing all left-side batches
885+
spill_file: RefCountedTempFile,
886+
/// Left-side schema
887+
schema: SchemaRef,
888+
}
889+
866890
/// Tracks the state of the memory-limited spill fallback for NLJ.
867891
///
868892
/// The NLJ always starts with the standard OnceFut path. If the in-memory
@@ -882,6 +906,9 @@ pub(crate) enum SpillState {
882906
left_plan: Arc<dyn ExecutionPlan>,
883907
/// TaskContext for re-execution and SpillManager creation
884908
task_context: Arc<TaskContext>,
909+
/// Shared OnceAsync for left-side spill data. The first partition
910+
/// to initiate fallback spills the left side; others share the file.
911+
left_spill_data: Arc<OnceAsync<LeftSpillData>>,
885912
},
886913

887914
/// Fallback has been triggered. Left data is being loaded in chunks
@@ -892,16 +919,20 @@ pub(crate) enum SpillState {
892919
/// State for active memory-limited spill execution.
893920
/// Boxed inside [`SpillState::Active`] to reduce enum size.
894921
pub(crate) struct SpillStateActive {
895-
/// Left input stream for incremental buffering
896-
left_stream: SendableRecordBatchStream,
922+
/// Shared future for left-side spill data. All partitions wait on
923+
/// the same future — the first to poll triggers the actual spill.
924+
left_spill_fut: OnceFut<LeftSpillData>,
925+
/// Left input stream for incremental chunk reading (from spill file).
926+
/// None until `left_spill_fut` resolves.
927+
left_stream: Option<SendableRecordBatchStream>,
928+
/// Left-side schema (set once `left_spill_fut` resolves)
929+
left_schema: Option<SchemaRef>,
897930
/// Memory reservation for left-side buffering
898931
reservation: MemoryReservation,
899932
/// Accumulated left batches for the current chunk
900933
pending_batches: Vec<RecordBatch>,
901-
/// Left-side schema (for concat_batches)
902-
left_schema: SchemaRef,
903934
/// SpillManager for right-side spilling
904-
spill_manager: SpillManager,
935+
right_spill_manager: SpillManager,
905936
/// In-progress spill file for writing right batches during first pass
906937
right_spill_in_progress: Option<InProgressSpillFile>,
907938
/// Completed right-side spill file (available after first pass)
@@ -1263,29 +1294,61 @@ impl NestedLoopJoinStream {
12631294

12641295
/// Switch from the standard OnceFut path to memory-limited mode.
12651296
///
1266-
/// Re-executes the left child to get a fresh stream, creates a
1267-
/// SpillManager for right-side spilling, and transitions the spill
1268-
/// state from `Pending` to `Active`. The next call to
1269-
/// `handle_buffering_left` will dispatch to
1270-
/// `handle_buffering_left_memory_limited`.
1297+
/// Uses the shared `left_spill_data` OnceAsync so that only the first
1298+
/// partition to reach this point re-executes the left child and spills
1299+
/// it to disk. Other partitions share the same spill file.
12711300
fn initiate_fallback(&mut self) -> Result<()> {
12721301
// Take ownership of Pending state
1273-
let (left_plan, context) =
1302+
let (left_plan, context, left_spill_data) =
12741303
match std::mem::replace(&mut self.spill_state, SpillState::Disabled) {
12751304
SpillState::Pending {
12761305
left_plan,
12771306
task_context,
1278-
} => (left_plan, task_context),
1307+
left_spill_data,
1308+
} => (left_plan, task_context, left_spill_data),
12791309
_ => {
12801310
return internal_err!(
12811311
"initiate_fallback called in non-Pending spill state"
12821312
);
12831313
}
12841314
};
12851315

1286-
// Re-execute left child to get a fresh stream
1287-
let left_stream = left_plan.execute(0, Arc::clone(&context))?;
1288-
let left_schema = left_stream.schema();
1316+
// Use OnceAsync to ensure only the first partition spills the left
1317+
// side. Other partitions will get the same OnceFut that resolves
1318+
// to the shared spill file.
1319+
let left_spill_fut = left_spill_data.try_once(|| {
1320+
let plan = Arc::clone(&left_plan);
1321+
let ctx = Arc::clone(&context);
1322+
let spill_metrics = self.metrics.spill_metrics.clone();
1323+
Ok(async move {
1324+
let mut stream = plan.execute(0, Arc::clone(&ctx))?;
1325+
let schema = stream.schema();
1326+
let left_spill_manager = SpillManager::new(
1327+
ctx.runtime_env(),
1328+
spill_metrics,
1329+
Arc::clone(&schema),
1330+
)
1331+
.with_compression_type(ctx.session_config().spill_compression());
1332+
1333+
let result = left_spill_manager
1334+
.spill_record_batch_stream_and_return_max_batch_memory(
1335+
&mut stream,
1336+
"NestedLoopJoin left spill",
1337+
)
1338+
.await?;
1339+
1340+
match result {
1341+
Some((file, _max_batch_memory)) => Ok(LeftSpillData {
1342+
spill_manager: left_spill_manager,
1343+
spill_file: file,
1344+
schema,
1345+
}),
1346+
None => {
1347+
internal_err!("Left side produced no data to spill")
1348+
}
1349+
}
1350+
})
1351+
})?;
12891352

12901353
// Create reservation with can_spill for fair memory allocation
12911354
let reservation = MemoryConsumer::new("NestedLoopJoinLoad[fallback]".to_string())
@@ -1294,19 +1357,20 @@ impl NestedLoopJoinStream {
12941357

12951358
// Create SpillManager for right-side spilling
12961359
let right_schema = self.right_data.schema();
1297-
let spill_manager = SpillManager::new(
1360+
let right_spill_manager = SpillManager::new(
12981361
context.runtime_env(),
12991362
self.metrics.spill_metrics.clone(),
13001363
right_schema,
13011364
)
13021365
.with_compression_type(context.session_config().spill_compression());
13031366

13041367
self.spill_state = SpillState::Active(Box::new(SpillStateActive {
1305-
left_stream,
1368+
left_spill_fut,
1369+
left_stream: None,
1370+
left_schema: None,
13061371
reservation,
13071372
pending_batches: Vec::new(),
1308-
left_schema,
1309-
spill_manager,
1373+
right_spill_manager,
13101374
right_spill_in_progress: None,
13111375
right_spill_file: None,
13121376
right_max_batch_memory: 0,
@@ -1378,11 +1442,44 @@ impl NestedLoopJoinStream {
13781442
);
13791443
};
13801444

1445+
// On first entry (or after re-entry for a new chunk pass when
1446+
// left_stream was consumed), wait for the shared left spill
1447+
// future to resolve and then open a stream from the spill file.
1448+
if active.left_stream.is_none() {
1449+
match active.left_spill_fut.get_shared(cx) {
1450+
Poll::Ready(Ok(spill_data)) => {
1451+
match spill_data
1452+
.spill_manager
1453+
.read_spill_as_stream(spill_data.spill_file.clone(), None)
1454+
{
1455+
Ok(stream) => {
1456+
active.left_schema = Some(Arc::clone(&spill_data.schema));
1457+
active.left_stream = Some(stream);
1458+
}
1459+
Err(e) => {
1460+
return ControlFlow::Break(Poll::Ready(Some(Err(e))));
1461+
}
1462+
}
1463+
}
1464+
Poll::Ready(Err(e)) => {
1465+
return ControlFlow::Break(Poll::Ready(Some(Err(e))));
1466+
}
1467+
Poll::Pending => {
1468+
return ControlFlow::Break(Poll::Pending);
1469+
}
1470+
}
1471+
}
1472+
1473+
let left_stream = active
1474+
.left_stream
1475+
.as_mut()
1476+
.expect("left_stream must be set after spill future resolves");
1477+
13811478
// Poll left stream for more batches.
13821479
// Note: pending_batches may already contain a batch from the
13831480
// previous chunk iteration (the batch that triggered the memory limit).
13841481
loop {
1385-
match active.left_stream.poll_next_unpin(cx) {
1482+
match left_stream.poll_next_unpin(cx) {
13861483
Poll::Ready(Some(Ok(batch))) => {
13871484
if batch.num_rows() == 0 {
13881485
continue;
@@ -1431,13 +1528,18 @@ impl NestedLoopJoinStream {
14311528
return ControlFlow::Continue(());
14321529
}
14331530

1434-
let merged_batch =
1435-
match concat_batches(&active.left_schema, &active.pending_batches) {
1436-
Ok(batch) => batch,
1437-
Err(e) => {
1438-
return ControlFlow::Break(Poll::Ready(Some(Err(e.into()))));
1439-
}
1440-
};
1531+
let merged_batch = match concat_batches(
1532+
active
1533+
.left_schema
1534+
.as_ref()
1535+
.expect("left_schema must be set"),
1536+
&active.pending_batches,
1537+
) {
1538+
Ok(batch) => batch,
1539+
Err(e) => {
1540+
return ControlFlow::Break(Poll::Ready(Some(Err(e.into()))));
1541+
}
1542+
};
14411543
active.pending_batches.clear();
14421544

14431545
// Build visited bitmap if needed for this join type
@@ -1472,7 +1574,7 @@ impl NestedLoopJoinStream {
14721574
// Set up right-side stream for this pass
14731575
if !active.is_first_right_pass {
14741576
if let Some(file) = active.right_spill_file.as_ref() {
1475-
match active.spill_manager.read_spill_as_stream(
1577+
match active.right_spill_manager.read_spill_as_stream(
14761578
file.clone(),
14771579
Some(active.right_max_batch_memory),
14781580
) {
@@ -1487,7 +1589,7 @@ impl NestedLoopJoinStream {
14871589
} else {
14881590
// First pass: create InProgressSpillFile for right side
14891591
match active
1490-
.spill_manager
1592+
.right_spill_manager
14911593
.create_in_progress_file("NestedLoopJoin right spill")
14921594
{
14931595
Ok(file) => {

datafusion/sqllogictest/test_files/nested_loop_join_spill.slt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ INNER JOIN generate_series(1, 1) AS t2(v2)
3939
100000 1 100000
4040

4141
# --- Verify spill metrics via EXPLAIN ANALYZE ---
42-
# The NestedLoopJoinExec line should show spill_count=1, confirming
43-
# the memory-limited fallback path was taken and right side was spilled.
42+
# The NestedLoopJoinExec line should show spill_count=2, confirming
43+
# the memory-limited fallback path was taken (left side spilled once,
44+
# right side spilled once).
4445
query TT
4546
EXPLAIN ANALYZE SELECT count(*)
4647
FROM generate_series(1, 100000) AS t1(v1)
@@ -50,7 +51,7 @@ INNER JOIN generate_series(1, 1) AS t2(v2)
5051
Plan with Metrics
5152
01)ProjectionExec: expr=[count(Int64(1))@0 as count(*)], metrics=[<slt:ignore>]
5253
02)--AggregateExec: mode=Single, gby=[], aggr=[count(Int64(1))], metrics=[<slt:ignore>]
53-
03)----NestedLoopJoinExec: join_type=Inner, filter=v1@0 + v2@1 > 0, projection=[], metrics=[output_rows=100.0 K, <slt:ignore> spill_count=1, <slt:ignore>]
54+
03)----NestedLoopJoinExec: join_type=Inner, filter=v1@0 + v2@1 > 0, projection=[], metrics=[output_rows=100.0 K, <slt:ignore> spill_count=2, <slt:ignore>]
5455
04)------ProjectionExec: expr=[value@0 as v1], metrics=[<slt:ignore>]
5556
05)--------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192], metrics=[<slt:ignore>]
5657
06)------ProjectionExec: expr=[value@0 as v2], metrics=[<slt:ignore>]

0 commit comments

Comments
 (0)