@@ -173,36 +173,23 @@ impl BatchBuilder {
173173 return Ok ( None ) ;
174174 }
175175
176- let columns = ( 0 ..self . schema . fields . len ( ) )
177- . map ( |column_idx| {
178- let arrays: Vec < _ > = self
179- . batches
180- . iter ( )
181- . map ( |( _, batch) | batch. column ( column_idx) . as_ref ( ) )
182- . collect ( ) ;
183- Ok ( interleave ( & arrays, & self . indices ) ?)
184- } )
185- . collect :: < Result < Vec < _ > > > ( ) ?;
186-
187- self . indices . clear ( ) ;
188-
189- // New cursors are only created once the previous cursor for the stream
190- // is finished. This means all remaining rows from all but the last batch
191- // for each stream have been yielded to the newly created record batch
192- //
193- // We can therefore drop all but the last batch for each stream
194- let mut batch_idx = 0 ;
195- let mut retained = 0 ;
196- self . batches . retain ( |( stream_idx, batch) | {
197- let stream_cursor = & mut self . cursors [ * stream_idx] ;
198- let retain = stream_cursor. batch_idx == batch_idx;
199- batch_idx += 1 ;
200-
201- if retain {
202- stream_cursor. batch_idx = retained;
203- retained += 1 ;
204- } else {
205- self . batches_mem_used -= get_record_batch_memory_size ( batch) ;
176+ // Try interleaving all indices. On offset overflow, halve and retry.
177+ let mut end = self . indices . len ( ) ;
178+ let columns = loop {
179+ match self . try_interleave_columns ( & self . indices [ ..end] ) {
180+ Ok ( cols) => break cols,
181+ Err ( e) if is_offset_overflow ( & e) => {
182+ end /= 2 ;
183+ if end == 0 {
184+ return Err ( e) ;
185+ }
186+ warn ! (
187+ "Interleave offset overflow with {} rows, retrying with {}" ,
188+ self . indices. len( ) ,
189+ end
190+ ) ;
191+ }
192+ Err ( e) => return Err ( e) ,
206193 }
207194 } ;
208195
@@ -228,7 +215,7 @@ impl BatchBuilder {
228215 stream_cursor. batch_idx = retained;
229216 retained += 1 ;
230217 } else {
231- self . reservation . shrink ( get_record_batch_memory_size ( batch) ) ;
218+ self . batches_mem_used -= get_record_batch_memory_size ( batch) ;
232219 }
233220 retain
234221 } ) ;
@@ -249,8 +236,6 @@ impl BatchBuilder {
249236 }
250237}
251238
252- <<<<<<< HEAD
253- <<<<<<< HEAD
254239/// Try to grow `reservation` so it covers at least `needed` bytes.
255240///
256241/// When a reservation has been pre-loaded with bytes (e.g. via
@@ -265,18 +250,15 @@ pub(crate) fn try_grow_reservation_to_at_least(
265250 reservation. try_grow ( needed - reservation. size ( ) ) ?;
266251 }
267252 Ok ( ( ) )
268- =======
269- /// Returns `true` if the error is an Arrow offset overflow error.
270- =======
253+ }
254+
271255/// Returns true if the error is an Arrow offset overflow.
272- >>>>>>> a53942d48 ( add log)
273256fn is_offset_overflow ( e : & DataFusionError ) -> bool {
274257 matches ! (
275258 e,
276259 DataFusionError :: ArrowError ( boxed, _)
277260 if matches!( boxed. as_ref( ) , ArrowError :: OffsetOverflowError ( _) )
278261 )
279- >>>>>>> 967 cf0a65 ( Fix sort merge interleave overflow)
280262}
281263
282264/// Returns true if a caught panic payload looks like an Arrow offset overflow.
0 commit comments