Skip to content

Commit 07ddfd7

Browse files
authored
Fix sort merge interleave overflow (#20922)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> When SortPreservingMergeStream merges batches containing large string/binary columns whose combined offsets exceed i32::MAX, Arrow's interleave panics with .expect("overflow"). This PR catches that panic and retries with progressively fewer rows, producing smaller output batches that fit within i32 offset limits. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes UT ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 627faba commit 07ddfd7

File tree

3 files changed

+393
-44
lines changed

3 files changed

+393
-44
lines changed

datafusion/physical-plan/src/sorts/builder.rs

Lines changed: 217 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,16 @@
1616
// under the License.
1717

1818
use crate::spill::get_record_batch_memory_size;
19+
use arrow::array::ArrayRef;
1920
use arrow::compute::interleave;
2021
use arrow::datatypes::SchemaRef;
22+
use arrow::error::ArrowError;
2123
use arrow::record_batch::RecordBatch;
22-
use datafusion_common::Result;
24+
use datafusion_common::{DataFusionError, Result};
2325
use datafusion_execution::memory_pool::MemoryReservation;
26+
use log::warn;
27+
use std::any::Any;
28+
use std::panic::{AssertUnwindSafe, catch_unwind};
2429
use std::sync::Arc;
2530

2631
#[derive(Debug, Copy, Clone, Default)]
@@ -126,49 +131,60 @@ impl BatchBuilder {
126131
&self.schema
127132
}
128133

129-
/// Drains the in_progress row indexes, and builds a new RecordBatch from them
130-
///
131-
/// Will then drop any batches for which all rows have been yielded to the output
132-
///
133-
/// Returns `None` if no pending rows
134-
pub fn build_record_batch(&mut self) -> Result<Option<RecordBatch>> {
135-
if self.is_empty() {
136-
return Ok(None);
137-
}
138-
139-
let columns = (0..self.schema.fields.len())
134+
/// Try to interleave all columns using the given index slice.
135+
fn try_interleave_columns(
136+
&self,
137+
indices: &[(usize, usize)],
138+
) -> Result<Vec<ArrayRef>> {
139+
(0..self.schema.fields.len())
140140
.map(|column_idx| {
141141
let arrays: Vec<_> = self
142142
.batches
143143
.iter()
144144
.map(|(_, batch)| batch.column(column_idx).as_ref())
145145
.collect();
146-
Ok(interleave(&arrays, &self.indices)?)
146+
recover_offset_overflow_from_panic(|| interleave(&arrays, indices))
147147
})
148-
.collect::<Result<Vec<_>>>()?;
149-
150-
self.indices.clear();
151-
152-
// New cursors are only created once the previous cursor for the stream
153-
// is finished. This means all remaining rows from all but the last batch
154-
// for each stream have been yielded to the newly created record batch
155-
//
156-
// We can therefore drop all but the last batch for each stream
157-
let mut batch_idx = 0;
158-
let mut retained = 0;
159-
self.batches.retain(|(stream_idx, batch)| {
160-
let stream_cursor = &mut self.cursors[*stream_idx];
161-
let retain = stream_cursor.batch_idx == batch_idx;
162-
batch_idx += 1;
163-
164-
if retain {
165-
stream_cursor.batch_idx = retained;
166-
retained += 1;
167-
} else {
168-
self.batches_mem_used -= get_record_batch_memory_size(batch);
169-
}
170-
retain
171-
});
148+
.collect::<Result<Vec<_>>>()
149+
}
150+
151+
/// Builds a record batch from the first `rows_to_emit` buffered rows.
152+
fn finish_record_batch(
153+
&mut self,
154+
rows_to_emit: usize,
155+
columns: Vec<ArrayRef>,
156+
) -> Result<RecordBatch> {
157+
// Remove consumed indices, keeping any remaining for the next call.
158+
self.indices.drain(..rows_to_emit);
159+
160+
// Only clean up fully-consumed batches when all indices are drained,
161+
// because remaining indices may still reference earlier batches.
162+
// In the overflow/partial-emit case this may retain some extra memory
163+
// across a few drain polls, but avoids costly index scanning on the
164+
// hot path. The retention is bounded and short-lived since leftover
165+
// rows are drained over subsequent polls.
166+
if self.indices.is_empty() {
167+
// New cursors are only created once the previous cursor for the stream
168+
// is finished. This means all remaining rows from all but the last batch
169+
// for each stream have been yielded to the newly created record batch
170+
//
171+
// We can therefore drop all but the last batch for each stream
172+
let mut batch_idx = 0;
173+
let mut retained = 0;
174+
self.batches.retain(|(stream_idx, batch)| {
175+
let stream_cursor = &mut self.cursors[*stream_idx];
176+
let retain = stream_cursor.batch_idx == batch_idx;
177+
batch_idx += 1;
178+
179+
if retain {
180+
stream_cursor.batch_idx = retained;
181+
retained += 1;
182+
} else {
183+
self.batches_mem_used -= get_record_batch_memory_size(batch);
184+
}
185+
retain
186+
});
187+
}
172188

173189
// Release excess memory back to the pool, but never shrink below
174190
// initial_reservation to maintain the anti-starvation guarantee
@@ -178,10 +194,27 @@ impl BatchBuilder {
178194
self.reservation.shrink(self.reservation.size() - target);
179195
}
180196

181-
Ok(Some(RecordBatch::try_new(
182-
Arc::clone(&self.schema),
183-
columns,
184-
)?))
197+
RecordBatch::try_new(Arc::clone(&self.schema), columns).map_err(Into::into)
198+
}
199+
200+
/// Drains the in_progress row indexes, and builds a new RecordBatch from them
201+
///
202+
/// Will then drop any batches for which all rows have been yielded to the output.
203+
/// If an offset overflow occurs (e.g. string/list offsets exceed i32::MAX),
204+
/// retries with progressively fewer rows until it succeeds.
205+
///
206+
/// Returns `None` if no pending rows
207+
pub fn build_record_batch(&mut self) -> Result<Option<RecordBatch>> {
208+
if self.is_empty() {
209+
return Ok(None);
210+
}
211+
212+
let (rows_to_emit, columns) =
213+
retry_interleave(self.indices.len(), self.indices.len(), |rows_to_emit| {
214+
self.try_interleave_columns(&self.indices[..rows_to_emit])
215+
})?;
216+
217+
Ok(Some(self.finish_record_batch(rows_to_emit, columns)?))
185218
}
186219
}
187220

@@ -200,3 +233,146 @@ pub(crate) fn try_grow_reservation_to_at_least(
200233
}
201234
Ok(())
202235
}
236+
237+
/// Returns true if the error is an Arrow offset overflow.
238+
fn is_offset_overflow(e: &DataFusionError) -> bool {
239+
matches!(
240+
e,
241+
DataFusionError::ArrowError(boxed, _)
242+
if matches!(boxed.as_ref(), ArrowError::OffsetOverflowError(_))
243+
)
244+
}
245+
246+
fn offset_overflow_error() -> DataFusionError {
247+
DataFusionError::ArrowError(Box::new(ArrowError::OffsetOverflowError(0)), None)
248+
}
249+
250+
fn recover_offset_overflow_from_panic<T, F>(f: F) -> Result<T>
251+
where
252+
F: FnOnce() -> std::result::Result<T, ArrowError>,
253+
{
254+
// Arrow's interleave can panic on i32 offset overflow with
255+
// `.expect("overflow")` / `.expect("offset overflow")`.
256+
// Catch only those specific panics so the caller can retry
257+
// with fewer rows while unrelated defects still unwind.
258+
//
259+
// TODO: remove once arrow-rs#9549 lands — interleave will return
260+
// OffsetOverflowError directly instead of panicking.
261+
match catch_unwind(AssertUnwindSafe(f)) {
262+
Ok(result) => Ok(result?),
263+
Err(panic_payload) => {
264+
if is_arrow_offset_overflow_panic(panic_payload.as_ref()) {
265+
Err(offset_overflow_error())
266+
} else {
267+
std::panic::resume_unwind(panic_payload);
268+
}
269+
}
270+
}
271+
}
272+
273+
fn retry_interleave<T, F>(
274+
mut rows_to_emit: usize,
275+
total_rows: usize,
276+
mut interleave: F,
277+
) -> Result<(usize, T)>
278+
where
279+
F: FnMut(usize) -> Result<T>,
280+
{
281+
loop {
282+
match interleave(rows_to_emit) {
283+
Ok(value) => return Ok((rows_to_emit, value)),
284+
Err(e) if is_offset_overflow(&e) => {
285+
rows_to_emit /= 2;
286+
if rows_to_emit == 0 {
287+
return Err(e);
288+
}
289+
warn!(
290+
"Interleave offset overflow with {total_rows} rows, retrying with {rows_to_emit}"
291+
);
292+
}
293+
Err(e) => return Err(e),
294+
}
295+
}
296+
}
297+
298+
fn panic_message(payload: &(dyn Any + Send)) -> Option<&str> {
299+
if let Some(msg) = payload.downcast_ref::<&str>() {
300+
return Some(msg);
301+
}
302+
if let Some(msg) = payload.downcast_ref::<String>() {
303+
return Some(msg.as_str());
304+
}
305+
None
306+
}
307+
308+
/// Returns true if a caught panic payload matches the Arrow offset overflows
309+
/// raised by interleave's offset builders.
310+
fn is_arrow_offset_overflow_panic(payload: &(dyn Any + Send)) -> bool {
311+
matches!(panic_message(payload), Some("overflow" | "offset overflow"))
312+
}
313+
314+
#[cfg(test)]
315+
mod tests {
316+
use super::*;
317+
use arrow::error::ArrowError;
318+
319+
#[test]
320+
fn test_retry_interleave_halves_rows_until_success() {
321+
let mut attempts = Vec::new();
322+
323+
let (rows_to_emit, result) = retry_interleave(4, 4, |rows_to_emit| {
324+
attempts.push(rows_to_emit);
325+
if rows_to_emit > 1 {
326+
Err(offset_overflow_error())
327+
} else {
328+
Ok("ok")
329+
}
330+
})
331+
.unwrap();
332+
333+
assert_eq!(rows_to_emit, 1);
334+
assert_eq!(result, "ok");
335+
assert_eq!(attempts, vec![4, 2, 1]);
336+
}
337+
338+
#[test]
339+
fn test_recover_offset_overflow_from_panic() {
340+
let error = recover_offset_overflow_from_panic(
341+
|| -> std::result::Result<(), ArrowError> { panic!("offset overflow") },
342+
)
343+
.unwrap_err();
344+
345+
assert!(is_offset_overflow(&error));
346+
}
347+
348+
#[test]
349+
fn test_recover_offset_overflow_from_panic_rethrows_unrelated_panics() {
350+
let panic_payload = catch_unwind(AssertUnwindSafe(|| {
351+
let _ = recover_offset_overflow_from_panic(
352+
|| -> std::result::Result<(), ArrowError> { panic!("capacity overflow") },
353+
);
354+
}));
355+
356+
assert!(panic_payload.is_err());
357+
}
358+
359+
#[test]
360+
fn test_is_arrow_offset_overflow_panic() {
361+
let overflow = Box::new("overflow") as Box<dyn Any + Send>;
362+
assert!(is_arrow_offset_overflow_panic(overflow.as_ref()));
363+
364+
let offset_overflow =
365+
Box::new(String::from("offset overflow")) as Box<dyn Any + Send>;
366+
assert!(is_arrow_offset_overflow_panic(offset_overflow.as_ref()));
367+
368+
let capacity_overflow = Box::new("capacity overflow") as Box<dyn Any + Send>;
369+
assert!(!is_arrow_offset_overflow_panic(capacity_overflow.as_ref()));
370+
371+
let arithmetic_overflow =
372+
Box::new(String::from("attempt to multiply with overflow"))
373+
as Box<dyn Any + Send>;
374+
assert!(!is_arrow_offset_overflow_panic(
375+
arithmetic_overflow.as_ref()
376+
));
377+
}
378+
}

0 commit comments

Comments
 (0)