Skip to content

Commit eca2e00

Browse files
2010YOUY01martin-g
andauthored
refactor: Simplify NLJ re-scans with ReplayableStreamSource (#21742)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ### Background #21448 introduced memory-limited execution for NLJ (thanks to @viirya). The idea is: 1. Load the build (left) side of the NLJ until the memory limit is reached. 2. Probe the right side and complete the join for the current buffered build-side chunk. 3. Load the next chunk of the build side and repeat the right-side scan until all data is processed. To support repeated probing of the right side, input batches are spilled to disk during the first pass. In subsequent passes, input is read directly from the spill. This avoids re-evaluating potentially expensive pipelines (e.g., Parquet decoding + filtering), making repeated probes both memory-efficient and fast. ### This PR This PR extracts the spill-backed replayable stream into a separate module to simplify the NLJ implementation. Although the lines of code increase, the new module provides a clearer interface and I think it's easier to maintain. Additionally, this utility may be useful elsewhere. I have seen a similar pattern in SedonaDB for memory-limited spatial joins. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> - Introducing `ReplayableStreamSource` for the above purpose - Refactor NLJ logic with `ReplayableStreamSource` ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 5. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> UTs, also covered by existing memory-limited NLJ test ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> No --------- Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com>
1 parent 766dff1 commit eca2e00

3 files changed

Lines changed: 482 additions & 83 deletions

File tree

datafusion/physical-plan/src/joins/nested_loop_join.rs

Lines changed: 36 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ use log::debug;
7979
use parking_lot::Mutex;
8080

8181
use crate::metrics::SpillMetrics;
82-
use crate::spill::in_progress_spill_file::InProgressSpillFile;
82+
use crate::spill::replayable_spill_input::ReplayableStreamSource;
8383
use crate::spill::spill_manager::SpillManager;
8484

8585
#[expect(rustdoc::private_intra_doc_links)]
@@ -931,16 +931,8 @@ pub(crate) struct SpillStateActive {
931931
reservation: MemoryReservation,
932932
/// Accumulated left batches for the current chunk
933933
pending_batches: Vec<RecordBatch>,
934-
/// SpillManager for right-side spilling
935-
right_spill_manager: SpillManager,
936-
/// In-progress spill file for writing right batches during first pass
937-
right_spill_in_progress: Option<InProgressSpillFile>,
938-
/// Completed right-side spill file (available after first pass)
939-
right_spill_file: Option<RefCountedTempFile>,
940-
/// Max right batch memory size (for read_spill_as_stream)
941-
right_max_batch_memory: usize,
942-
/// Whether this is the first right-side pass (need to spill while reading)
943-
is_first_right_pass: bool,
934+
/// Right input that spills on the first pass and replays from spill later.
935+
right_input: ReplayableStreamSource,
944936
}
945937

946938
pub(crate) struct NestedLoopJoinStream {
@@ -960,7 +952,8 @@ pub(crate) struct NestedLoopJoinStream {
960952
/// type of the join
961953
pub(crate) join_type: JoinType,
962954
/// the probe-side(right) table data of the nested loop join
963-
pub(crate) right_data: SendableRecordBatchStream,
955+
/// `Option` is used because memory-limited path requires resetting it.
956+
pub(crate) right_data: Option<SendableRecordBatchStream>,
964957
/// the build-side table data of the nested loop join
965958
pub(crate) left_data: OnceFut<JoinLeftData>,
966959
/// Projection to construct the output schema from the left and right tables.
@@ -1258,7 +1251,7 @@ impl NestedLoopJoinStream {
12581251
output_schema: Arc::clone(&schema),
12591252
join_filter: filter,
12601253
join_type,
1261-
right_data,
1254+
right_data: Some(right_data),
12621255
column_indices,
12631256
left_data,
12641257
metrics,
@@ -1356,7 +1349,15 @@ impl NestedLoopJoinStream {
13561349
.register(context.memory_pool());
13571350

13581351
// Create SpillManager for right-side spilling
1359-
let right_schema = self.right_data.schema();
1352+
let right_schema = self
1353+
.right_data
1354+
.as_ref()
1355+
.expect("right_data must be present before fallback")
1356+
.schema();
1357+
let right_data = self
1358+
.right_data
1359+
.take()
1360+
.expect("right_data must be present before fallback");
13601361
let right_spill_manager = SpillManager::new(
13611362
context.runtime_env(),
13621363
self.metrics.spill_metrics.clone(),
@@ -1370,11 +1371,11 @@ impl NestedLoopJoinStream {
13701371
left_schema: None,
13711372
reservation,
13721373
pending_batches: Vec::new(),
1373-
right_spill_manager,
1374-
right_spill_in_progress: None,
1375-
right_spill_file: None,
1376-
right_max_batch_memory: 0,
1377-
is_first_right_pass: true,
1374+
right_input: ReplayableStreamSource::new(
1375+
right_data,
1376+
right_spill_manager,
1377+
"NestedLoopJoin right spill",
1378+
),
13781379
}));
13791380

13801381
// State stays BufferingLeft — next poll will enter
@@ -1571,33 +1572,12 @@ impl NestedLoopJoinStream {
15711572

15721573
self.buffered_left_data = Some(Arc::new(left_data));
15731574

1574-
// Set up right-side stream for this pass
1575-
if !active.is_first_right_pass {
1576-
if let Some(file) = active.right_spill_file.as_ref() {
1577-
match active.right_spill_manager.read_spill_as_stream(
1578-
file.clone(),
1579-
Some(active.right_max_batch_memory),
1580-
) {
1581-
Ok(stream) => {
1582-
self.right_data = stream;
1583-
}
1584-
Err(e) => {
1585-
return ControlFlow::Break(Poll::Ready(Some(Err(e))));
1586-
}
1587-
}
1575+
match active.right_input.open_pass() {
1576+
Ok(stream) => {
1577+
self.right_data = Some(stream);
15881578
}
1589-
} else {
1590-
// First pass: create InProgressSpillFile for right side
1591-
match active
1592-
.right_spill_manager
1593-
.create_in_progress_file("NestedLoopJoin right spill")
1594-
{
1595-
Ok(file) => {
1596-
active.right_spill_in_progress = Some(file);
1597-
}
1598-
Err(e) => {
1599-
return ControlFlow::Break(Poll::Ready(Some(Err(e))));
1600-
}
1579+
Err(e) => {
1580+
return ControlFlow::Break(Poll::Ready(Some(Err(e))));
16011581
}
16021582
}
16031583

@@ -1613,7 +1593,12 @@ impl NestedLoopJoinStream {
16131593
&mut self,
16141594
cx: &mut std::task::Context<'_>,
16151595
) -> ControlFlow<Poll<Option<Result<RecordBatch>>>> {
1616-
match self.right_data.poll_next_unpin(cx) {
1596+
match self
1597+
.right_data
1598+
.as_mut()
1599+
.expect("right_data must be present while fetching right")
1600+
.poll_next_unpin(cx)
1601+
{
16171602
Poll::Ready(result) => match result {
16181603
Some(Ok(right_batch)) => {
16191604
// Update metrics
@@ -1626,19 +1611,6 @@ impl NestedLoopJoinStream {
16261611
return ControlFlow::Continue(());
16271612
}
16281613

1629-
// In memory-limited mode, spill right batch to disk on first pass
1630-
if let SpillState::Active(ref mut active) = self.spill_state
1631-
&& active.is_first_right_pass
1632-
&& let Some(ref mut spill_file) = active.right_spill_in_progress
1633-
{
1634-
if let Err(e) = spill_file.append_batch(&right_batch) {
1635-
return ControlFlow::Break(Poll::Ready(Some(Err(e))));
1636-
}
1637-
active.right_max_batch_memory = active
1638-
.right_max_batch_memory
1639-
.max(right_batch.get_array_memory_size());
1640-
}
1641-
16421614
self.current_right_batch = Some(right_batch);
16431615

16441616
// Prepare right bitmap
@@ -1654,29 +1626,6 @@ impl NestedLoopJoinStream {
16541626
}
16551627
Some(Err(e)) => ControlFlow::Break(Poll::Ready(Some(Err(e)))),
16561628
None => {
1657-
// Right stream exhausted.
1658-
// In memory-limited mode, finalize the spill file after first pass.
1659-
if let SpillState::Active(ref mut active) = self.spill_state
1660-
&& active.is_first_right_pass
1661-
{
1662-
if let Some(mut spill_in_progress) =
1663-
active.right_spill_in_progress.take()
1664-
{
1665-
match spill_in_progress.finish() {
1666-
Ok(Some(file)) => {
1667-
active.right_spill_file = Some(file);
1668-
}
1669-
Ok(None) => {
1670-
// No data was spilled (right side was empty)
1671-
}
1672-
Err(e) => {
1673-
return ControlFlow::Break(Poll::Ready(Some(Err(e))));
1674-
}
1675-
}
1676-
}
1677-
active.is_first_right_pass = false;
1678-
}
1679-
16801629
self.state = NLJState::EmitLeftUnmatched;
16811630
ControlFlow::Continue(())
16821631
}
@@ -2257,7 +2206,11 @@ impl NestedLoopJoinStream {
22572206
}
22582207
let bitmap_sliced = BooleanArray::new(bitmap_sliced.finish(), None);
22592208

2260-
let right_schema = self.right_data.schema();
2209+
let right_schema = self
2210+
.right_data
2211+
.as_ref()
2212+
.expect("right_data must be present when building unmatched batch")
2213+
.schema();
22612214
build_unmatched_batch(
22622215
&self.output_schema,
22632216
&left_batch_sliced,

datafusion/physical-plan/src/spill/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
//! Defines the spilling functions
1919
2020
pub(crate) mod in_progress_spill_file;
21+
pub(crate) mod replayable_spill_input;
2122
pub(crate) mod spill_manager;
2223
pub mod spill_pool;
2324

0 commit comments

Comments
 (0)