-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Fix sort merge interleave overflow #20922
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3da2ff8
61455f6
714a5a9
57d5120
5ab4676
f62bdee
19fec33
0b0bd6c
390a52a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,11 +16,16 @@ | |
| // under the License. | ||
|
|
||
| use crate::spill::get_record_batch_memory_size; | ||
| use arrow::array::ArrayRef; | ||
| use arrow::compute::interleave; | ||
| use arrow::datatypes::SchemaRef; | ||
| use arrow::error::ArrowError; | ||
| use arrow::record_batch::RecordBatch; | ||
| use datafusion_common::Result; | ||
| use datafusion_common::{DataFusionError, Result}; | ||
| use datafusion_execution::memory_pool::MemoryReservation; | ||
| use log::warn; | ||
| use std::any::Any; | ||
| use std::panic::{AssertUnwindSafe, catch_unwind}; | ||
| use std::sync::Arc; | ||
|
|
||
| #[derive(Debug, Copy, Clone, Default)] | ||
|
|
@@ -126,49 +131,60 @@ impl BatchBuilder { | |
| &self.schema | ||
| } | ||
|
|
||
| /// Drains the in_progress row indexes, and builds a new RecordBatch from them | ||
| /// | ||
| /// Will then drop any batches for which all rows have been yielded to the output | ||
| /// | ||
| /// Returns `None` if no pending rows | ||
| pub fn build_record_batch(&mut self) -> Result<Option<RecordBatch>> { | ||
| if self.is_empty() { | ||
| return Ok(None); | ||
| } | ||
|
|
||
| let columns = (0..self.schema.fields.len()) | ||
| /// Try to interleave all columns using the given index slice. | ||
| fn try_interleave_columns( | ||
| &self, | ||
| indices: &[(usize, usize)], | ||
| ) -> Result<Vec<ArrayRef>> { | ||
| (0..self.schema.fields.len()) | ||
| .map(|column_idx| { | ||
| let arrays: Vec<_> = self | ||
| .batches | ||
| .iter() | ||
| .map(|(_, batch)| batch.column(column_idx).as_ref()) | ||
| .collect(); | ||
| Ok(interleave(&arrays, &self.indices)?) | ||
| recover_offset_overflow_from_panic(|| interleave(&arrays, indices)) | ||
| }) | ||
| .collect::<Result<Vec<_>>>()?; | ||
|
|
||
| self.indices.clear(); | ||
|
|
||
| // New cursors are only created once the previous cursor for the stream | ||
| // is finished. This means all remaining rows from all but the last batch | ||
| // for each stream have been yielded to the newly created record batch | ||
| // | ||
| // We can therefore drop all but the last batch for each stream | ||
| let mut batch_idx = 0; | ||
| let mut retained = 0; | ||
| self.batches.retain(|(stream_idx, batch)| { | ||
| let stream_cursor = &mut self.cursors[*stream_idx]; | ||
| let retain = stream_cursor.batch_idx == batch_idx; | ||
| batch_idx += 1; | ||
|
|
||
| if retain { | ||
| stream_cursor.batch_idx = retained; | ||
| retained += 1; | ||
| } else { | ||
| self.batches_mem_used -= get_record_batch_memory_size(batch); | ||
| } | ||
| retain | ||
| }); | ||
| .collect::<Result<Vec<_>>>() | ||
| } | ||
|
|
||
| /// Builds a record batch from the first `rows_to_emit` buffered rows. | ||
| fn finish_record_batch( | ||
| &mut self, | ||
| rows_to_emit: usize, | ||
| columns: Vec<ArrayRef>, | ||
| ) -> Result<RecordBatch> { | ||
| // Remove consumed indices, keeping any remaining for the next call. | ||
| self.indices.drain(..rows_to_emit); | ||
|
|
||
| // Only clean up fully-consumed batches when all indices are drained, | ||
| // because remaining indices may still reference earlier batches. | ||
| // In the overflow/partial-emit case this may retain some extra memory | ||
| // across a few drain polls, but avoids costly index scanning on the | ||
| // hot path. The retention is bounded and short-lived since leftover | ||
| // rows are drained over subsequent polls. | ||
| if self.indices.is_empty() { | ||
| // New cursors are only created once the previous cursor for the stream | ||
| // is finished. This means all remaining rows from all but the last batch | ||
| // for each stream have been yielded to the newly created record batch | ||
| // | ||
| // We can therefore drop all but the last batch for each stream | ||
| let mut batch_idx = 0; | ||
| let mut retained = 0; | ||
| self.batches.retain(|(stream_idx, batch)| { | ||
| let stream_cursor = &mut self.cursors[*stream_idx]; | ||
| let retain = stream_cursor.batch_idx == batch_idx; | ||
| batch_idx += 1; | ||
|
|
||
| if retain { | ||
| stream_cursor.batch_idx = retained; | ||
| retained += 1; | ||
| } else { | ||
| self.batches_mem_used -= get_record_batch_memory_size(batch); | ||
| } | ||
| retain | ||
| }); | ||
| } | ||
|
|
||
| // Release excess memory back to the pool, but never shrink below | ||
| // initial_reservation to maintain the anti-starvation guarantee | ||
|
|
@@ -178,10 +194,27 @@ impl BatchBuilder { | |
| self.reservation.shrink(self.reservation.size() - target); | ||
| } | ||
|
|
||
| Ok(Some(RecordBatch::try_new( | ||
| Arc::clone(&self.schema), | ||
| columns, | ||
| )?)) | ||
| RecordBatch::try_new(Arc::clone(&self.schema), columns).map_err(Into::into) | ||
| } | ||
|
|
||
| /// Drains the in_progress row indexes, and builds a new RecordBatch from them | ||
| /// | ||
| /// Will then drop any batches for which all rows have been yielded to the output. | ||
| /// If an offset overflow occurs (e.g. string/list offsets exceed i32::MAX), | ||
| /// retries with progressively fewer rows until it succeeds. | ||
| /// | ||
| /// Returns `None` if no pending rows | ||
| pub fn build_record_batch(&mut self) -> Result<Option<RecordBatch>> { | ||
| if self.is_empty() { | ||
| return Ok(None); | ||
| } | ||
|
|
||
| let (rows_to_emit, columns) = | ||
| retry_interleave(self.indices.len(), self.indices.len(), |rows_to_emit| { | ||
| self.try_interleave_columns(&self.indices[..rows_to_emit]) | ||
| })?; | ||
|
|
||
| Ok(Some(self.finish_record_batch(rows_to_emit, columns)?)) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -200,3 +233,146 @@ pub(crate) fn try_grow_reservation_to_at_least( | |
| } | ||
| Ok(()) | ||
| } | ||
|
|
||
| /// Returns true if the error is an Arrow offset overflow. | ||
| fn is_offset_overflow(e: &DataFusionError) -> bool { | ||
| matches!( | ||
| e, | ||
| DataFusionError::ArrowError(boxed, _) | ||
| if matches!(boxed.as_ref(), ArrowError::OffsetOverflowError(_)) | ||
| ) | ||
| } | ||
|
|
||
| fn offset_overflow_error() -> DataFusionError { | ||
| DataFusionError::ArrowError(Box::new(ArrowError::OffsetOverflowError(0)), None) | ||
| } | ||
|
|
||
| fn recover_offset_overflow_from_panic<T, F>(f: F) -> Result<T> | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The retry behavior looks good, but right now it seems like it’s only covered through synthetic helper failures. Since the production path depends on matching Arrow’s panic payload pretty closely, I think it’d be great to add one higher-level regression test closer to That would make it a lot easier to catch future Arrow-side panic-message changes - or refactors in this file before they slip through.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The panic-catching path is intentionally temporary — once apache/arrow-rs#9549 lands in DataFusion,
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added some comments 0b0bd6c |
||
| where | ||
| F: FnOnce() -> std::result::Result<T, ArrowError>, | ||
| { | ||
| // Arrow's interleave can panic on i32 offset overflow with | ||
| // `.expect("overflow")` / `.expect("offset overflow")`. | ||
| // Catch only those specific panics so the caller can retry | ||
| // with fewer rows while unrelated defects still unwind. | ||
| // | ||
| // TODO: remove once arrow-rs#9549 lands — interleave will return | ||
| // OffsetOverflowError directly instead of panicking. | ||
| match catch_unwind(AssertUnwindSafe(f)) { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be avoided after the apache/arrow-rs#9549 gets into DF |
||
| Ok(result) => Ok(result?), | ||
| Err(panic_payload) => { | ||
| if is_arrow_offset_overflow_panic(panic_payload.as_ref()) { | ||
| Err(offset_overflow_error()) | ||
| } else { | ||
| std::panic::resume_unwind(panic_payload); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn retry_interleave<T, F>( | ||
| mut rows_to_emit: usize, | ||
| total_rows: usize, | ||
| mut interleave: F, | ||
| ) -> Result<(usize, T)> | ||
| where | ||
| F: FnMut(usize) -> Result<T>, | ||
| { | ||
| loop { | ||
| match interleave(rows_to_emit) { | ||
| Ok(value) => return Ok((rows_to_emit, value)), | ||
| Err(e) if is_offset_overflow(&e) => { | ||
| rows_to_emit /= 2; | ||
| if rows_to_emit == 0 { | ||
| return Err(e); | ||
| } | ||
| warn!( | ||
| "Interleave offset overflow with {total_rows} rows, retrying with {rows_to_emit}" | ||
| ); | ||
| } | ||
| Err(e) => return Err(e), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn panic_message(payload: &(dyn Any + Send)) -> Option<&str> { | ||
| if let Some(msg) = payload.downcast_ref::<&str>() { | ||
| return Some(msg); | ||
| } | ||
| if let Some(msg) = payload.downcast_ref::<String>() { | ||
| return Some(msg.as_str()); | ||
| } | ||
| None | ||
| } | ||
|
|
||
| /// Returns true if a caught panic payload matches the Arrow offset overflows | ||
| /// raised by interleave's offset builders. | ||
| fn is_arrow_offset_overflow_panic(payload: &(dyn Any + Send)) -> bool { | ||
| matches!(panic_message(payload), Some("overflow" | "offset overflow")) | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
| use arrow::error::ArrowError; | ||
|
|
||
| #[test] | ||
| fn test_retry_interleave_halves_rows_until_success() { | ||
| let mut attempts = Vec::new(); | ||
|
|
||
| let (rows_to_emit, result) = retry_interleave(4, 4, |rows_to_emit| { | ||
| attempts.push(rows_to_emit); | ||
| if rows_to_emit > 1 { | ||
| Err(offset_overflow_error()) | ||
| } else { | ||
| Ok("ok") | ||
| } | ||
| }) | ||
| .unwrap(); | ||
|
|
||
| assert_eq!(rows_to_emit, 1); | ||
| assert_eq!(result, "ok"); | ||
| assert_eq!(attempts, vec![4, 2, 1]); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_recover_offset_overflow_from_panic() { | ||
| let error = recover_offset_overflow_from_panic( | ||
| || -> std::result::Result<(), ArrowError> { panic!("offset overflow") }, | ||
| ) | ||
| .unwrap_err(); | ||
|
|
||
| assert!(is_offset_overflow(&error)); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_recover_offset_overflow_from_panic_rethrows_unrelated_panics() { | ||
| let panic_payload = catch_unwind(AssertUnwindSafe(|| { | ||
| let _ = recover_offset_overflow_from_panic( | ||
| || -> std::result::Result<(), ArrowError> { panic!("capacity overflow") }, | ||
| ); | ||
| })); | ||
|
|
||
| assert!(panic_payload.is_err()); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_is_arrow_offset_overflow_panic() { | ||
| let overflow = Box::new("overflow") as Box<dyn Any + Send>; | ||
| assert!(is_arrow_offset_overflow_panic(overflow.as_ref())); | ||
|
|
||
| let offset_overflow = | ||
| Box::new(String::from("offset overflow")) as Box<dyn Any + Send>; | ||
| assert!(is_arrow_offset_overflow_panic(offset_overflow.as_ref())); | ||
|
|
||
| let capacity_overflow = Box::new("capacity overflow") as Box<dyn Any + Send>; | ||
| assert!(!is_arrow_offset_overflow_panic(capacity_overflow.as_ref())); | ||
|
|
||
| let arithmetic_overflow = | ||
| Box::new(String::from("attempt to multiply with overflow")) | ||
| as Box<dyn Any + Send>; | ||
| assert!(!is_arrow_offset_overflow_panic( | ||
| arithmetic_overflow.as_ref() | ||
| )); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice change. One thing that stood out to me here: now that
build_record_batch()can emit a prefix and leave the remainder buffered, this branch seems to keep every fully-consumed input batch alive untilself.indicesis empty.That seems functionally correct, but it also means overflow cases could retain quite a bit of memory across several follow-up polls - especially for
FETCH-limited queries where we stop pulling new input and just drain leftovers.Would it make sense to either release batches that are no longer referenced by the remaining indices, or at least leave a quick comment here calling out that this retention is intentional? I think that would help future readers understand the tradeoff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, added a comment clarifying this is intentional — the retention is bounded and short-lived since leftover rows drain over subsequent polls. Doing a precise scan of remaining indices to release individual batches would add complexity to the hot path for marginal gain.