Skip to content

Commit ef10249

Browse files
authored
Merge branch 'main' into feat_migrate_ffi_to_stabby
2 parents 08b4117 + 07ddfd7 commit ef10249

3 files changed

Lines changed: 393 additions & 44 deletions

File tree

datafusion/physical-plan/src/sorts/builder.rs

Lines changed: 217 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,16 @@
1616
// under the License.
1717

1818
use crate::spill::get_record_batch_memory_size;
19+
use arrow::array::ArrayRef;
1920
use arrow::compute::interleave;
2021
use arrow::datatypes::SchemaRef;
22+
use arrow::error::ArrowError;
2123
use arrow::record_batch::RecordBatch;
22-
use datafusion_common::Result;
24+
use datafusion_common::{DataFusionError, Result};
2325
use datafusion_execution::memory_pool::MemoryReservation;
26+
use log::warn;
27+
use std::any::Any;
28+
use std::panic::{AssertUnwindSafe, catch_unwind};
2429
use std::sync::Arc;
2530

2631
#[derive(Debug, Copy, Clone, Default)]
@@ -126,49 +131,60 @@ impl BatchBuilder {
126131
&self.schema
127132
}
128133

129-
/// Drains the in_progress row indexes, and builds a new RecordBatch from them
130-
///
131-
/// Will then drop any batches for which all rows have been yielded to the output
132-
///
133-
/// Returns `None` if no pending rows
134-
pub fn build_record_batch(&mut self) -> Result<Option<RecordBatch>> {
135-
if self.is_empty() {
136-
return Ok(None);
137-
}
138-
139-
let columns = (0..self.schema.fields.len())
134+
/// Try to interleave all columns using the given index slice.
135+
fn try_interleave_columns(
136+
&self,
137+
indices: &[(usize, usize)],
138+
) -> Result<Vec<ArrayRef>> {
139+
(0..self.schema.fields.len())
140140
.map(|column_idx| {
141141
let arrays: Vec<_> = self
142142
.batches
143143
.iter()
144144
.map(|(_, batch)| batch.column(column_idx).as_ref())
145145
.collect();
146-
Ok(interleave(&arrays, &self.indices)?)
146+
recover_offset_overflow_from_panic(|| interleave(&arrays, indices))
147147
})
148-
.collect::<Result<Vec<_>>>()?;
149-
150-
self.indices.clear();
151-
152-
// New cursors are only created once the previous cursor for the stream
153-
// is finished. This means all remaining rows from all but the last batch
154-
// for each stream have been yielded to the newly created record batch
155-
//
156-
// We can therefore drop all but the last batch for each stream
157-
let mut batch_idx = 0;
158-
let mut retained = 0;
159-
self.batches.retain(|(stream_idx, batch)| {
160-
let stream_cursor = &mut self.cursors[*stream_idx];
161-
let retain = stream_cursor.batch_idx == batch_idx;
162-
batch_idx += 1;
163-
164-
if retain {
165-
stream_cursor.batch_idx = retained;
166-
retained += 1;
167-
} else {
168-
self.batches_mem_used -= get_record_batch_memory_size(batch);
169-
}
170-
retain
171-
});
148+
.collect::<Result<Vec<_>>>()
149+
}
150+
151+
/// Builds a record batch from the first `rows_to_emit` buffered rows.
152+
fn finish_record_batch(
153+
&mut self,
154+
rows_to_emit: usize,
155+
columns: Vec<ArrayRef>,
156+
) -> Result<RecordBatch> {
157+
// Remove consumed indices, keeping any remaining for the next call.
158+
self.indices.drain(..rows_to_emit);
159+
160+
// Only clean up fully-consumed batches when all indices are drained,
161+
// because remaining indices may still reference earlier batches.
162+
// In the overflow/partial-emit case this may retain some extra memory
163+
// across a few drain polls, but avoids costly index scanning on the
164+
// hot path. The retention is bounded and short-lived since leftover
165+
// rows are drained over subsequent polls.
166+
if self.indices.is_empty() {
167+
// New cursors are only created once the previous cursor for the stream
168+
// is finished. This means all remaining rows from all but the last batch
169+
// for each stream have been yielded to the newly created record batch
170+
//
171+
// We can therefore drop all but the last batch for each stream
172+
let mut batch_idx = 0;
173+
let mut retained = 0;
174+
self.batches.retain(|(stream_idx, batch)| {
175+
let stream_cursor = &mut self.cursors[*stream_idx];
176+
let retain = stream_cursor.batch_idx == batch_idx;
177+
batch_idx += 1;
178+
179+
if retain {
180+
stream_cursor.batch_idx = retained;
181+
retained += 1;
182+
} else {
183+
self.batches_mem_used -= get_record_batch_memory_size(batch);
184+
}
185+
retain
186+
});
187+
}
172188

173189
// Release excess memory back to the pool, but never shrink below
174190
// initial_reservation to maintain the anti-starvation guarantee
@@ -178,10 +194,27 @@ impl BatchBuilder {
178194
self.reservation.shrink(self.reservation.size() - target);
179195
}
180196

181-
Ok(Some(RecordBatch::try_new(
182-
Arc::clone(&self.schema),
183-
columns,
184-
)?))
197+
RecordBatch::try_new(Arc::clone(&self.schema), columns).map_err(Into::into)
198+
}
199+
200+
/// Drains the in_progress row indexes, and builds a new RecordBatch from them
201+
///
202+
/// Will then drop any batches for which all rows have been yielded to the output.
203+
/// If an offset overflow occurs (e.g. string/list offsets exceed i32::MAX),
204+
/// retries with progressively fewer rows until it succeeds.
205+
///
206+
/// Returns `None` if no pending rows
207+
pub fn build_record_batch(&mut self) -> Result<Option<RecordBatch>> {
208+
if self.is_empty() {
209+
return Ok(None);
210+
}
211+
212+
let (rows_to_emit, columns) =
213+
retry_interleave(self.indices.len(), self.indices.len(), |rows_to_emit| {
214+
self.try_interleave_columns(&self.indices[..rows_to_emit])
215+
})?;
216+
217+
Ok(Some(self.finish_record_batch(rows_to_emit, columns)?))
185218
}
186219
}
187220

@@ -200,3 +233,146 @@ pub(crate) fn try_grow_reservation_to_at_least(
200233
}
201234
Ok(())
202235
}
236+
237+
/// Returns true if the error is an Arrow offset overflow.
238+
fn is_offset_overflow(e: &DataFusionError) -> bool {
239+
matches!(
240+
e,
241+
DataFusionError::ArrowError(boxed, _)
242+
if matches!(boxed.as_ref(), ArrowError::OffsetOverflowError(_))
243+
)
244+
}
245+
246+
fn offset_overflow_error() -> DataFusionError {
247+
DataFusionError::ArrowError(Box::new(ArrowError::OffsetOverflowError(0)), None)
248+
}
249+
250+
fn recover_offset_overflow_from_panic<T, F>(f: F) -> Result<T>
251+
where
252+
F: FnOnce() -> std::result::Result<T, ArrowError>,
253+
{
254+
// Arrow's interleave can panic on i32 offset overflow with
255+
// `.expect("overflow")` / `.expect("offset overflow")`.
256+
// Catch only those specific panics so the caller can retry
257+
// with fewer rows while unrelated defects still unwind.
258+
//
259+
// TODO: remove once arrow-rs#9549 lands — interleave will return
260+
// OffsetOverflowError directly instead of panicking.
261+
match catch_unwind(AssertUnwindSafe(f)) {
262+
Ok(result) => Ok(result?),
263+
Err(panic_payload) => {
264+
if is_arrow_offset_overflow_panic(panic_payload.as_ref()) {
265+
Err(offset_overflow_error())
266+
} else {
267+
std::panic::resume_unwind(panic_payload);
268+
}
269+
}
270+
}
271+
}
272+
273+
fn retry_interleave<T, F>(
274+
mut rows_to_emit: usize,
275+
total_rows: usize,
276+
mut interleave: F,
277+
) -> Result<(usize, T)>
278+
where
279+
F: FnMut(usize) -> Result<T>,
280+
{
281+
loop {
282+
match interleave(rows_to_emit) {
283+
Ok(value) => return Ok((rows_to_emit, value)),
284+
Err(e) if is_offset_overflow(&e) => {
285+
rows_to_emit /= 2;
286+
if rows_to_emit == 0 {
287+
return Err(e);
288+
}
289+
warn!(
290+
"Interleave offset overflow with {total_rows} rows, retrying with {rows_to_emit}"
291+
);
292+
}
293+
Err(e) => return Err(e),
294+
}
295+
}
296+
}
297+
298+
fn panic_message(payload: &(dyn Any + Send)) -> Option<&str> {
299+
if let Some(msg) = payload.downcast_ref::<&str>() {
300+
return Some(msg);
301+
}
302+
if let Some(msg) = payload.downcast_ref::<String>() {
303+
return Some(msg.as_str());
304+
}
305+
None
306+
}
307+
308+
/// Returns true if a caught panic payload matches the Arrow offset overflows
309+
/// raised by interleave's offset builders.
310+
fn is_arrow_offset_overflow_panic(payload: &(dyn Any + Send)) -> bool {
311+
matches!(panic_message(payload), Some("overflow" | "offset overflow"))
312+
}
313+
314+
#[cfg(test)]
315+
mod tests {
316+
use super::*;
317+
use arrow::error::ArrowError;
318+
319+
#[test]
320+
fn test_retry_interleave_halves_rows_until_success() {
321+
let mut attempts = Vec::new();
322+
323+
let (rows_to_emit, result) = retry_interleave(4, 4, |rows_to_emit| {
324+
attempts.push(rows_to_emit);
325+
if rows_to_emit > 1 {
326+
Err(offset_overflow_error())
327+
} else {
328+
Ok("ok")
329+
}
330+
})
331+
.unwrap();
332+
333+
assert_eq!(rows_to_emit, 1);
334+
assert_eq!(result, "ok");
335+
assert_eq!(attempts, vec![4, 2, 1]);
336+
}
337+
338+
#[test]
339+
fn test_recover_offset_overflow_from_panic() {
340+
let error = recover_offset_overflow_from_panic(
341+
|| -> std::result::Result<(), ArrowError> { panic!("offset overflow") },
342+
)
343+
.unwrap_err();
344+
345+
assert!(is_offset_overflow(&error));
346+
}
347+
348+
#[test]
349+
fn test_recover_offset_overflow_from_panic_rethrows_unrelated_panics() {
350+
let panic_payload = catch_unwind(AssertUnwindSafe(|| {
351+
let _ = recover_offset_overflow_from_panic(
352+
|| -> std::result::Result<(), ArrowError> { panic!("capacity overflow") },
353+
);
354+
}));
355+
356+
assert!(panic_payload.is_err());
357+
}
358+
359+
#[test]
360+
fn test_is_arrow_offset_overflow_panic() {
361+
let overflow = Box::new("overflow") as Box<dyn Any + Send>;
362+
assert!(is_arrow_offset_overflow_panic(overflow.as_ref()));
363+
364+
let offset_overflow =
365+
Box::new(String::from("offset overflow")) as Box<dyn Any + Send>;
366+
assert!(is_arrow_offset_overflow_panic(offset_overflow.as_ref()));
367+
368+
let capacity_overflow = Box::new("capacity overflow") as Box<dyn Any + Send>;
369+
assert!(!is_arrow_offset_overflow_panic(capacity_overflow.as_ref()));
370+
371+
let arithmetic_overflow =
372+
Box::new(String::from("attempt to multiply with overflow"))
373+
as Box<dyn Any + Send>;
374+
assert!(!is_arrow_offset_overflow_panic(
375+
arithmetic_overflow.as_ref()
376+
));
377+
}
378+
}

0 commit comments

Comments
 (0)