Skip to content

Commit 3da2ff8

Browse files
committed
Fix sort merge interleave overflow
1 parent d138c36 commit 3da2ff8

2 files changed

Lines changed: 65 additions & 6 deletions

File tree

datafusion/physical-plan/src/sorts/builder.rs

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
use crate::spill::get_record_batch_memory_size;
1919
use arrow::compute::interleave;
2020
use arrow::datatypes::SchemaRef;
21+
use arrow::error::ArrowError;
2122
use arrow::record_batch::RecordBatch;
22-
use datafusion_common::Result;
23+
use datafusion_common::{DataFusionError, Result};
2324
use datafusion_execution::memory_pool::MemoryReservation;
2425
use std::sync::Arc;
2526

@@ -126,9 +127,28 @@ impl BatchBuilder {
126127
&self.schema
127128
}
128129

130+
/// Try to interleave all columns using the given index slice.
131+
fn try_interleave_columns(
132+
&self,
133+
indices: &[(usize, usize)],
134+
) -> Result<Vec<Arc<dyn arrow::array::Array>>> {
135+
(0..self.schema.fields.len())
136+
.map(|column_idx| {
137+
let arrays: Vec<_> = self
138+
.batches
139+
.iter()
140+
.map(|(_, batch)| batch.column(column_idx).as_ref())
141+
.collect();
142+
Ok(interleave(&arrays, indices)?)
143+
})
144+
.collect::<Result<Vec<_>>>()
145+
}
146+
129147
/// Drains the in_progress row indexes, and builds a new RecordBatch from them
130148
///
131-
/// Will then drop any batches for which all rows have been yielded to the output
149+
/// Will then drop any batches for which all rows have been yielded to the output.
150+
/// If an offset overflow occurs (e.g. string/list offsets exceed i32::MAX),
151+
/// retries with progressively fewer rows until it succeeds.
132152
///
133153
/// Returns `None` if no pending rows
134154
pub fn build_record_batch(&mut self) -> Result<Option<RecordBatch>> {
@@ -167,8 +187,35 @@ impl BatchBuilder {
167187
} else {
168188
self.batches_mem_used -= get_record_batch_memory_size(batch);
169189
}
170-
retain
171-
});
190+
};
191+
192+
// Remove consumed indices, keeping any remaining for the next call.
193+
self.indices.drain(..end);
194+
195+
// Only clean up fully-consumed batches when all indices are drained,
196+
// because remaining indices may still reference earlier batches.
197+
if self.indices.is_empty() {
198+
// New cursors are only created once the previous cursor for the stream
199+
// is finished. This means all remaining rows from all but the last batch
200+
// for each stream have been yielded to the newly created record batch
201+
//
202+
// We can therefore drop all but the last batch for each stream
203+
let mut batch_idx = 0;
204+
let mut retained = 0;
205+
self.batches.retain(|(stream_idx, batch)| {
206+
let stream_cursor = &mut self.cursors[*stream_idx];
207+
let retain = stream_cursor.batch_idx == batch_idx;
208+
batch_idx += 1;
209+
210+
if retain {
211+
stream_cursor.batch_idx = retained;
212+
retained += 1;
213+
} else {
214+
self.reservation.shrink(get_record_batch_memory_size(batch));
215+
}
216+
retain
217+
});
218+
}
172219

173220
// Release excess memory back to the pool, but never shrink below
174221
// initial_reservation to maintain the anti-starvation guarantee
@@ -185,6 +232,7 @@ impl BatchBuilder {
185232
}
186233
}
187234

235+
<<<<<<< HEAD
188236
/// Try to grow `reservation` so it covers at least `needed` bytes.
189237
///
190238
/// When a reservation has been pre-loaded with bytes (e.g. via
@@ -199,4 +247,13 @@ pub(crate) fn try_grow_reservation_to_at_least(
199247
reservation.try_grow(needed - reservation.size())?;
200248
}
201249
Ok(())
250+
=======
251+
/// Returns `true` if the error is an Arrow offset overflow error.
252+
fn is_offset_overflow(e: &DataFusionError) -> bool {
253+
matches!(
254+
e,
255+
DataFusionError::ArrowError(err, _)
256+
if matches!(err.as_ref(), ArrowError::OffsetOverflowError(_))
257+
)
258+
>>>>>>> 967cf0a65 (Fix sort merge interleave overflow)
202259
}

datafusion/physical-plan/src/sorts/merge.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,9 +288,11 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
288288
}
289289
}
290290

291-
self.produced += self.in_progress.len();
291+
let before = self.in_progress.len();
292+
let result = self.in_progress.build_record_batch();
293+
self.produced += before - self.in_progress.len();
292294

293-
return Poll::Ready(self.in_progress.build_record_batch().transpose());
295+
return Poll::Ready(result.transpose());
294296
}
295297
}
296298

0 commit comments

Comments
 (0)