Skip to content

Commit 967cf0a

Browse files
committed
Fix sort merge interleave overflow
1 parent fd97799 commit 967cf0a

2 files changed

Lines changed: 73 additions & 36 deletions

File tree

datafusion/physical-plan/src/sorts/builder.rs

Lines changed: 69 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
use crate::spill::get_record_batch_memory_size;
1919
use arrow::compute::interleave;
2020
use arrow::datatypes::SchemaRef;
21+
use arrow::error::ArrowError;
2122
use arrow::record_batch::RecordBatch;
22-
use datafusion_common::Result;
23+
use datafusion_common::{DataFusionError, Result};
2324
use datafusion_execution::memory_pool::MemoryReservation;
2425
use std::sync::Arc;
2526

@@ -104,53 +105,87 @@ impl BatchBuilder {
104105
&self.schema
105106
}
106107

108+
/// Try to interleave all columns using the given index slice.
109+
fn try_interleave_columns(
110+
&self,
111+
indices: &[(usize, usize)],
112+
) -> Result<Vec<Arc<dyn arrow::array::Array>>> {
113+
(0..self.schema.fields.len())
114+
.map(|column_idx| {
115+
let arrays: Vec<_> = self
116+
.batches
117+
.iter()
118+
.map(|(_, batch)| batch.column(column_idx).as_ref())
119+
.collect();
120+
Ok(interleave(&arrays, indices)?)
121+
})
122+
.collect::<Result<Vec<_>>>()
123+
}
124+
107125
/// Drains the in_progress row indexes, and builds a new RecordBatch from them
108126
///
109-
/// Will then drop any batches for which all rows have been yielded to the output
127+
/// Will then drop any batches for which all rows have been yielded to the output.
128+
/// If an offset overflow occurs (e.g. string/list offsets exceed i32::MAX),
129+
/// retries with progressively fewer rows until it succeeds.
110130
///
111131
/// Returns `None` if no pending rows
112132
pub fn build_record_batch(&mut self) -> Result<Option<RecordBatch>> {
113133
if self.is_empty() {
114134
return Ok(None);
115135
}
116136

117-
let columns = (0..self.schema.fields.len())
118-
.map(|column_idx| {
119-
let arrays: Vec<_> = self
120-
.batches
121-
.iter()
122-
.map(|(_, batch)| batch.column(column_idx).as_ref())
123-
.collect();
124-
Ok(interleave(&arrays, &self.indices)?)
125-
})
126-
.collect::<Result<Vec<_>>>()?;
127-
128-
self.indices.clear();
129-
130-
// New cursors are only created once the previous cursor for the stream
131-
// is finished. This means all remaining rows from all but the last batch
132-
// for each stream have been yielded to the newly created record batch
133-
//
134-
// We can therefore drop all but the last batch for each stream
135-
let mut batch_idx = 0;
136-
let mut retained = 0;
137-
self.batches.retain(|(stream_idx, batch)| {
138-
let stream_cursor = &mut self.cursors[*stream_idx];
139-
let retain = stream_cursor.batch_idx == batch_idx;
140-
batch_idx += 1;
141-
142-
if retain {
143-
stream_cursor.batch_idx = retained;
144-
retained += 1;
145-
} else {
146-
self.reservation.shrink(get_record_batch_memory_size(batch));
137+
// Try with progressively fewer rows on offset overflow.
138+
let mut end = self.indices.len();
139+
let columns = loop {
140+
match self.try_interleave_columns(&self.indices[..end]) {
141+
Ok(columns) => break columns,
142+
Err(e) if is_offset_overflow(&e) && end > 1 => {
143+
end /= 2;
144+
}
145+
Err(e) => return Err(e),
147146
}
148-
retain
149-
});
147+
};
148+
149+
// Remove consumed indices, keeping any remaining for the next call.
150+
self.indices.drain(..end);
151+
152+
// Only clean up fully-consumed batches when all indices are drained,
153+
// because remaining indices may still reference earlier batches.
154+
if self.indices.is_empty() {
155+
// New cursors are only created once the previous cursor for the stream
156+
// is finished. This means all remaining rows from all but the last batch
157+
// for each stream have been yielded to the newly created record batch
158+
//
159+
// We can therefore drop all but the last batch for each stream
160+
let mut batch_idx = 0;
161+
let mut retained = 0;
162+
self.batches.retain(|(stream_idx, batch)| {
163+
let stream_cursor = &mut self.cursors[*stream_idx];
164+
let retain = stream_cursor.batch_idx == batch_idx;
165+
batch_idx += 1;
166+
167+
if retain {
168+
stream_cursor.batch_idx = retained;
169+
retained += 1;
170+
} else {
171+
self.reservation.shrink(get_record_batch_memory_size(batch));
172+
}
173+
retain
174+
});
175+
}
150176

151177
Ok(Some(RecordBatch::try_new(
152178
Arc::clone(&self.schema),
153179
columns,
154180
)?))
155181
}
156182
}
183+
184+
/// Returns `true` if the error is an Arrow offset overflow error.
185+
fn is_offset_overflow(e: &DataFusionError) -> bool {
186+
matches!(
187+
e,
188+
DataFusionError::ArrowError(err, _)
189+
if matches!(err.as_ref(), ArrowError::OffsetOverflowError(_))
190+
)
191+
}

datafusion/physical-plan/src/sorts/merge.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,9 +288,11 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
288288
}
289289
}
290290

291-
self.produced += self.in_progress.len();
291+
let before = self.in_progress.len();
292+
let result = self.in_progress.build_record_batch();
293+
self.produced += before - self.in_progress.len();
292294

293-
return Poll::Ready(self.in_progress.build_record_batch().transpose());
295+
return Poll::Ready(result.transpose());
294296
}
295297
}
296298

0 commit comments

Comments
 (0)