Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 217 additions & 41 deletions datafusion/physical-plan/src/sorts/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@
// under the License.

use crate::spill::get_record_batch_memory_size;
use arrow::array::ArrayRef;
use arrow::compute::interleave;
use arrow::datatypes::SchemaRef;
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_common::{DataFusionError, Result};
use datafusion_execution::memory_pool::MemoryReservation;
use log::warn;
use std::any::Any;
use std::panic::{AssertUnwindSafe, catch_unwind};
use std::sync::Arc;

#[derive(Debug, Copy, Clone, Default)]
Expand Down Expand Up @@ -126,49 +131,60 @@ impl BatchBuilder {
&self.schema
}

/// Drains the in_progress row indexes, and builds a new RecordBatch from them
///
/// Will then drop any batches for which all rows have been yielded to the output
///
/// Returns `None` if no pending rows
pub fn build_record_batch(&mut self) -> Result<Option<RecordBatch>> {
if self.is_empty() {
return Ok(None);
}

let columns = (0..self.schema.fields.len())
/// Try to interleave all columns using the given index slice.
fn try_interleave_columns(
&self,
indices: &[(usize, usize)],
) -> Result<Vec<ArrayRef>> {
(0..self.schema.fields.len())
.map(|column_idx| {
let arrays: Vec<_> = self
.batches
.iter()
.map(|(_, batch)| batch.column(column_idx).as_ref())
.collect();
Ok(interleave(&arrays, &self.indices)?)
recover_offset_overflow_from_panic(|| interleave(&arrays, indices))
})
.collect::<Result<Vec<_>>>()?;

self.indices.clear();

// New cursors are only created once the previous cursor for the stream
// is finished. This means all remaining rows from all but the last batch
// for each stream have been yielded to the newly created record batch
//
// We can therefore drop all but the last batch for each stream
let mut batch_idx = 0;
let mut retained = 0;
self.batches.retain(|(stream_idx, batch)| {
let stream_cursor = &mut self.cursors[*stream_idx];
let retain = stream_cursor.batch_idx == batch_idx;
batch_idx += 1;

if retain {
stream_cursor.batch_idx = retained;
retained += 1;
} else {
self.batches_mem_used -= get_record_batch_memory_size(batch);
}
retain
});
.collect::<Result<Vec<_>>>()
}

/// Builds a record batch from the first `rows_to_emit` buffered rows.
fn finish_record_batch(
&mut self,
rows_to_emit: usize,
columns: Vec<ArrayRef>,
) -> Result<RecordBatch> {
// Remove consumed indices, keeping any remaining for the next call.
self.indices.drain(..rows_to_emit);

// Only clean up fully-consumed batches when all indices are drained,
Copy link
Copy Markdown
Contributor

@kosiew kosiew Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice change. One thing that stood out to me here: now that build_record_batch() can emit a prefix and leave the remainder buffered, this branch seems to keep every fully-consumed input batch alive until self.indices is empty.

That seems functionally correct, but it also means overflow cases could retain quite a bit of memory across several follow-up polls - especially for FETCH-limited queries where we stop pulling new input and just drain leftovers.

Would it make sense to either release batches that are no longer referenced by the remaining indices, or at least leave a quick comment here calling out that this retention is intentional? I think that would help future readers understand the tradeoff.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, added a comment clarifying this is intentional — the retention is bounded and short-lived since leftover rows drain over subsequent polls. Doing a precise scan of remaining indices to release individual batches would add complexity to the hot path for marginal gain.

// because remaining indices may still reference earlier batches.
// In the overflow/partial-emit case this may retain some extra memory
// across a few drain polls, but avoids costly index scanning on the
// hot path. The retention is bounded and short-lived since leftover
// rows are drained over subsequent polls.
if self.indices.is_empty() {
// New cursors are only created once the previous cursor for the stream
// is finished. This means all remaining rows from all but the last batch
// for each stream have been yielded to the newly created record batch
//
// We can therefore drop all but the last batch for each stream
let mut batch_idx = 0;
let mut retained = 0;
self.batches.retain(|(stream_idx, batch)| {
let stream_cursor = &mut self.cursors[*stream_idx];
let retain = stream_cursor.batch_idx == batch_idx;
batch_idx += 1;

if retain {
stream_cursor.batch_idx = retained;
retained += 1;
} else {
self.batches_mem_used -= get_record_batch_memory_size(batch);
}
retain
});
}

// Release excess memory back to the pool, but never shrink below
// initial_reservation to maintain the anti-starvation guarantee
Expand All @@ -178,10 +194,27 @@ impl BatchBuilder {
self.reservation.shrink(self.reservation.size() - target);
}

Ok(Some(RecordBatch::try_new(
Arc::clone(&self.schema),
columns,
)?))
RecordBatch::try_new(Arc::clone(&self.schema), columns).map_err(Into::into)
}

/// Drains the in_progress row indexes, and builds a new RecordBatch from them
///
/// Will then drop any batches for which all rows have been yielded to the output.
/// If an offset overflow occurs (e.g. string/list offsets exceed i32::MAX),
/// retries with progressively fewer rows until it succeeds.
///
/// Returns `None` if no pending rows
pub fn build_record_batch(&mut self) -> Result<Option<RecordBatch>> {
if self.is_empty() {
return Ok(None);
}

let (rows_to_emit, columns) =
retry_interleave(self.indices.len(), self.indices.len(), |rows_to_emit| {
self.try_interleave_columns(&self.indices[..rows_to_emit])
})?;

Ok(Some(self.finish_record_batch(rows_to_emit, columns)?))
}
}

Expand All @@ -200,3 +233,146 @@ pub(crate) fn try_grow_reservation_to_at_least(
}
Ok(())
}

/// Returns true if the error is an Arrow offset overflow.
fn is_offset_overflow(e: &DataFusionError) -> bool {
matches!(
e,
DataFusionError::ArrowError(boxed, _)
if matches!(boxed.as_ref(), ArrowError::OffsetOverflowError(_))
)
}

fn offset_overflow_error() -> DataFusionError {
DataFusionError::ArrowError(Box::new(ArrowError::OffsetOverflowError(0)), None)
}

fn recover_offset_overflow_from_panic<T, F>(f: F) -> Result<T>
Copy link
Copy Markdown
Contributor

@kosiew kosiew Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The retry behavior looks good, but right now it seems like it’s only covered through synthetic helper failures.

Since the production path depends on matching Arrow’s panic payload pretty closely, I think it’d be great to add one higher-level regression test closer to BatchBuilder::build_record_batch() or SortPreservingMergeStream that exercises the retry/drain flow end-to-end through an injectable interleave hook.

That would make it a lot easier to catch future Arrow-side panic-message changes - or refactors in this file before they slip through.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The panic-catching path is intentionally temporary — once apache/arrow-rs#9549 lands in DataFusion, interleave will return a proper OffsetOverflowError instead of panicking, and we can remove the catch_unwind + message matching entirely. At that point the existing retry_interleave unit tests will cover the full production path without any panic-payload coupling. Adding an expensive end-to-end test for a short-lived workaround does not seem worth it.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comments 0b0bd6c

where
F: FnOnce() -> std::result::Result<T, ArrowError>,
{
// Arrow's interleave can panic on i32 offset overflow with
// `.expect("overflow")` / `.expect("offset overflow")`.
// Catch only those specific panics so the caller can retry
// with fewer rows while unrelated defects still unwind.
//
// TODO: remove once arrow-rs#9549 lands — interleave will return
// OffsetOverflowError directly instead of panicking.
match catch_unwind(AssertUnwindSafe(f)) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be avoided after the apache/arrow-rs#9549 gets into DF

Ok(result) => Ok(result?),
Err(panic_payload) => {
if is_arrow_offset_overflow_panic(panic_payload.as_ref()) {
Err(offset_overflow_error())
} else {
std::panic::resume_unwind(panic_payload);
}
}
}
}

fn retry_interleave<T, F>(
mut rows_to_emit: usize,
total_rows: usize,
mut interleave: F,
) -> Result<(usize, T)>
where
F: FnMut(usize) -> Result<T>,
{
loop {
match interleave(rows_to_emit) {
Ok(value) => return Ok((rows_to_emit, value)),
Err(e) if is_offset_overflow(&e) => {
rows_to_emit /= 2;
if rows_to_emit == 0 {
return Err(e);
}
warn!(
"Interleave offset overflow with {total_rows} rows, retrying with {rows_to_emit}"
);
}
Err(e) => return Err(e),
}
}
}

fn panic_message(payload: &(dyn Any + Send)) -> Option<&str> {
if let Some(msg) = payload.downcast_ref::<&str>() {
return Some(msg);
}
if let Some(msg) = payload.downcast_ref::<String>() {
return Some(msg.as_str());
}
None
}

/// Returns true if a caught panic payload matches the Arrow offset overflows
/// raised by interleave's offset builders.
fn is_arrow_offset_overflow_panic(payload: &(dyn Any + Send)) -> bool {
matches!(panic_message(payload), Some("overflow" | "offset overflow"))
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::error::ArrowError;

#[test]
fn test_retry_interleave_halves_rows_until_success() {
let mut attempts = Vec::new();

let (rows_to_emit, result) = retry_interleave(4, 4, |rows_to_emit| {
attempts.push(rows_to_emit);
if rows_to_emit > 1 {
Err(offset_overflow_error())
} else {
Ok("ok")
}
})
.unwrap();

assert_eq!(rows_to_emit, 1);
assert_eq!(result, "ok");
assert_eq!(attempts, vec![4, 2, 1]);
}

#[test]
fn test_recover_offset_overflow_from_panic() {
let error = recover_offset_overflow_from_panic(
|| -> std::result::Result<(), ArrowError> { panic!("offset overflow") },
)
.unwrap_err();

assert!(is_offset_overflow(&error));
}

#[test]
fn test_recover_offset_overflow_from_panic_rethrows_unrelated_panics() {
let panic_payload = catch_unwind(AssertUnwindSafe(|| {
let _ = recover_offset_overflow_from_panic(
|| -> std::result::Result<(), ArrowError> { panic!("capacity overflow") },
);
}));

assert!(panic_payload.is_err());
}

#[test]
fn test_is_arrow_offset_overflow_panic() {
let overflow = Box::new("overflow") as Box<dyn Any + Send>;
assert!(is_arrow_offset_overflow_panic(overflow.as_ref()));

let offset_overflow =
Box::new(String::from("offset overflow")) as Box<dyn Any + Send>;
assert!(is_arrow_offset_overflow_panic(offset_overflow.as_ref()));

let capacity_overflow = Box::new("capacity overflow") as Box<dyn Any + Send>;
assert!(!is_arrow_offset_overflow_panic(capacity_overflow.as_ref()));

let arithmetic_overflow =
Box::new(String::from("attempt to multiply with overflow"))
as Box<dyn Any + Send>;
assert!(!is_arrow_offset_overflow_panic(
arithmetic_overflow.as_ref()
));
}
}
Loading
Loading