Skip to content

Commit 8dcb444

Browse files
xudong963claude
andcommitted
Cherry-pick: Fix sort merge interleave overflow (apache#20922)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 74772cb commit 8dcb444

4 files changed

Lines changed: 389 additions & 47 deletions

File tree

datafusion/physical-plan/src/sorts/builder.rs

Lines changed: 210 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,16 @@
1616
// under the License.
1717

1818
use crate::spill::get_record_batch_memory_size;
19+
use arrow::array::ArrayRef;
1920
use arrow::compute::interleave;
2021
use arrow::datatypes::SchemaRef;
22+
use arrow::error::ArrowError;
2123
use arrow::record_batch::RecordBatch;
22-
use datafusion_common::Result;
24+
use datafusion_common::{DataFusionError, Result};
2325
use datafusion_execution::memory_pool::MemoryReservation;
26+
use log::warn;
27+
use std::any::Any;
28+
use std::panic::{catch_unwind, AssertUnwindSafe};
2429
use std::sync::Arc;
2530

2631
#[derive(Debug, Copy, Clone, Default)]
@@ -126,49 +131,56 @@ impl BatchBuilder {
126131
&self.schema
127132
}
128133

129-
/// Drains the in_progress row indexes, and builds a new RecordBatch from them
130-
///
131-
/// Will then drop any batches for which all rows have been yielded to the output
132-
///
133-
/// Returns `None` if no pending rows
134-
pub fn build_record_batch(&mut self) -> Result<Option<RecordBatch>> {
135-
if self.is_empty() {
136-
return Ok(None);
137-
}
138-
139-
let columns = (0..self.schema.fields.len())
134+
/// Try to interleave all columns using the given index slice.
135+
fn try_interleave_columns(
136+
&self,
137+
indices: &[(usize, usize)],
138+
) -> Result<Vec<ArrayRef>> {
139+
(0..self.schema.fields.len())
140140
.map(|column_idx| {
141141
let arrays: Vec<_> = self
142142
.batches
143143
.iter()
144144
.map(|(_, batch)| batch.column(column_idx).as_ref())
145145
.collect();
146-
Ok(interleave(&arrays, &self.indices)?)
146+
recover_offset_overflow_from_panic(|| interleave(&arrays, indices))
147147
})
148-
.collect::<Result<Vec<_>>>()?;
149-
150-
self.indices.clear();
151-
152-
// New cursors are only created once the previous cursor for the stream
153-
// is finished. This means all remaining rows from all but the last batch
154-
// for each stream have been yielded to the newly created record batch
155-
//
156-
// We can therefore drop all but the last batch for each stream
157-
let mut batch_idx = 0;
158-
let mut retained = 0;
159-
self.batches.retain(|(stream_idx, batch)| {
160-
let stream_cursor = &mut self.cursors[*stream_idx];
161-
let retain = stream_cursor.batch_idx == batch_idx;
162-
batch_idx += 1;
163-
164-
if retain {
165-
stream_cursor.batch_idx = retained;
166-
retained += 1;
167-
} else {
168-
self.batches_mem_used -= get_record_batch_memory_size(batch);
169-
}
170-
retain
171-
});
148+
.collect::<Result<Vec<_>>>()
149+
}
150+
151+
/// Builds a record batch from the first `rows_to_emit` buffered rows.
152+
fn finish_record_batch(
153+
&mut self,
154+
rows_to_emit: usize,
155+
columns: Vec<ArrayRef>,
156+
) -> Result<RecordBatch> {
157+
// Remove consumed indices, keeping any remaining for the next call.
158+
self.indices.drain(..rows_to_emit);
159+
160+
// Only clean up fully-consumed batches when all indices are drained,
161+
// because remaining indices may still reference earlier batches.
162+
if self.indices.is_empty() {
163+
// New cursors are only created once the previous cursor for the stream
164+
// is finished. This means all remaining rows from all but the last batch
165+
// for each stream have been yielded to the newly created record batch
166+
//
167+
// We can therefore drop all but the last batch for each stream
168+
let mut batch_idx = 0;
169+
let mut retained = 0;
170+
self.batches.retain(|(stream_idx, batch)| {
171+
let stream_cursor = &mut self.cursors[*stream_idx];
172+
let retain = stream_cursor.batch_idx == batch_idx;
173+
batch_idx += 1;
174+
175+
if retain {
176+
stream_cursor.batch_idx = retained;
177+
retained += 1;
178+
} else {
179+
self.batches_mem_used -= get_record_batch_memory_size(batch);
180+
}
181+
retain
182+
});
183+
}
172184

173185
// Release excess memory back to the pool, but never shrink below
174186
// initial_reservation to maintain the anti-starvation guarantee
@@ -178,10 +190,27 @@ impl BatchBuilder {
178190
self.reservation.shrink(self.reservation.size() - target);
179191
}
180192

181-
Ok(Some(RecordBatch::try_new(
182-
Arc::clone(&self.schema),
183-
columns,
184-
)?))
193+
RecordBatch::try_new(Arc::clone(&self.schema), columns).map_err(Into::into)
194+
}
195+
196+
/// Drains the in_progress row indexes, and builds a new RecordBatch from them
197+
///
198+
/// Will then drop any batches for which all rows have been yielded to the output.
199+
/// If an offset overflow occurs (e.g. string/list offsets exceed i32::MAX),
200+
/// retries with progressively fewer rows until it succeeds.
201+
///
202+
/// Returns `None` if no pending rows
203+
pub fn build_record_batch(&mut self) -> Result<Option<RecordBatch>> {
204+
if self.is_empty() {
205+
return Ok(None);
206+
}
207+
208+
let (rows_to_emit, columns) =
209+
retry_interleave(self.indices.len(), self.indices.len(), |rows_to_emit| {
210+
self.try_interleave_columns(&self.indices[..rows_to_emit])
211+
})?;
212+
213+
Ok(Some(self.finish_record_batch(rows_to_emit, columns)?))
185214
}
186215
}
187216

@@ -200,3 +229,143 @@ pub(crate) fn try_grow_reservation_to_at_least(
200229
}
201230
Ok(())
202231
}
232+
233+
/// Returns true if the error is an Arrow offset overflow.
234+
fn is_offset_overflow(e: &DataFusionError) -> bool {
235+
matches!(
236+
e,
237+
DataFusionError::ArrowError(boxed, _)
238+
if matches!(boxed.as_ref(), ArrowError::OffsetOverflowError(_))
239+
)
240+
}
241+
242+
fn offset_overflow_error() -> DataFusionError {
243+
DataFusionError::ArrowError(Box::new(ArrowError::OffsetOverflowError(0)), None)
244+
}
245+
246+
fn recover_offset_overflow_from_panic<T, F>(f: F) -> Result<T>
247+
where
248+
F: FnOnce() -> std::result::Result<T, ArrowError>,
249+
{
250+
// Arrow's interleave can panic on i32 offset overflow with
251+
// `.expect("overflow")` / `.expect("offset overflow")`.
252+
// Catch only those specific panics so the caller can retry
253+
// with fewer rows while unrelated defects still unwind.
254+
match catch_unwind(AssertUnwindSafe(f)) {
255+
Ok(result) => Ok(result?),
256+
Err(panic_payload) => {
257+
if is_arrow_offset_overflow_panic(panic_payload.as_ref()) {
258+
Err(offset_overflow_error())
259+
} else {
260+
std::panic::resume_unwind(panic_payload);
261+
}
262+
}
263+
}
264+
}
265+
266+
fn retry_interleave<T, F>(
267+
mut rows_to_emit: usize,
268+
total_rows: usize,
269+
mut interleave: F,
270+
) -> Result<(usize, T)>
271+
where
272+
F: FnMut(usize) -> Result<T>,
273+
{
274+
loop {
275+
match interleave(rows_to_emit) {
276+
Ok(value) => return Ok((rows_to_emit, value)),
277+
Err(e) if is_offset_overflow(&e) => {
278+
rows_to_emit /= 2;
279+
if rows_to_emit == 0 {
280+
return Err(e);
281+
}
282+
warn!(
283+
"Interleave offset overflow with {total_rows} rows, retrying with {rows_to_emit}"
284+
);
285+
}
286+
Err(e) => return Err(e),
287+
}
288+
}
289+
}
290+
291+
fn panic_message(payload: &(dyn Any + Send)) -> Option<&str> {
292+
if let Some(msg) = payload.downcast_ref::<&str>() {
293+
return Some(msg);
294+
}
295+
if let Some(msg) = payload.downcast_ref::<String>() {
296+
return Some(msg.as_str());
297+
}
298+
None
299+
}
300+
301+
/// Returns true if a caught panic payload matches the Arrow offset overflows
302+
/// raised by interleave's offset builders.
303+
fn is_arrow_offset_overflow_panic(payload: &(dyn Any + Send)) -> bool {
304+
matches!(panic_message(payload), Some("overflow" | "offset overflow"))
305+
}
306+
307+
#[cfg(test)]
308+
mod tests {
309+
use super::*;
310+
use arrow::error::ArrowError;
311+
312+
#[test]
313+
fn test_retry_interleave_halves_rows_until_success() {
314+
let mut attempts = Vec::new();
315+
316+
let (rows_to_emit, result) = retry_interleave(4, 4, |rows_to_emit| {
317+
attempts.push(rows_to_emit);
318+
if rows_to_emit > 1 {
319+
Err(offset_overflow_error())
320+
} else {
321+
Ok("ok")
322+
}
323+
})
324+
.unwrap();
325+
326+
assert_eq!(rows_to_emit, 1);
327+
assert_eq!(result, "ok");
328+
assert_eq!(attempts, vec![4, 2, 1]);
329+
}
330+
331+
#[test]
332+
fn test_recover_offset_overflow_from_panic() {
333+
let error = recover_offset_overflow_from_panic(
334+
|| -> std::result::Result<(), ArrowError> { panic!("offset overflow") },
335+
)
336+
.unwrap_err();
337+
338+
assert!(is_offset_overflow(&error));
339+
}
340+
341+
#[test]
342+
fn test_recover_offset_overflow_from_panic_rethrows_unrelated_panics() {
343+
let panic_payload = catch_unwind(AssertUnwindSafe(|| {
344+
let _ = recover_offset_overflow_from_panic(
345+
|| -> std::result::Result<(), ArrowError> { panic!("capacity overflow") },
346+
);
347+
}));
348+
349+
assert!(panic_payload.is_err());
350+
}
351+
352+
#[test]
353+
fn test_is_arrow_offset_overflow_panic() {
354+
let overflow = Box::new("overflow") as Box<dyn Any + Send>;
355+
assert!(is_arrow_offset_overflow_panic(overflow.as_ref()));
356+
357+
let offset_overflow =
358+
Box::new(String::from("offset overflow")) as Box<dyn Any + Send>;
359+
assert!(is_arrow_offset_overflow_panic(offset_overflow.as_ref()));
360+
361+
let capacity_overflow = Box::new("capacity overflow") as Box<dyn Any + Send>;
362+
assert!(!is_arrow_offset_overflow_panic(capacity_overflow.as_ref()));
363+
364+
let arithmetic_overflow =
365+
Box::new(String::from("attempt to multiply with overflow"))
366+
as Box<dyn Any + Send>;
367+
assert!(!is_arrow_offset_overflow_panic(
368+
arithmetic_overflow.as_ref()
369+
));
370+
}
371+
}

0 commit comments

Comments
 (0)