@@ -22,7 +22,8 @@ use arrow::error::ArrowError;
2222use arrow:: record_batch:: RecordBatch ;
2323use datafusion_common:: { DataFusionError , Result } ;
2424use datafusion_execution:: memory_pool:: MemoryReservation ;
25- use std:: panic:: { catch_unwind, AssertUnwindSafe } ;
25+ use log:: warn;
26+ use std:: panic:: { AssertUnwindSafe , catch_unwind} ;
2627use std:: sync:: Arc ;
2728
2829#[ derive( Debug , Copy , Clone , Default ) ]
@@ -156,6 +157,11 @@ impl BatchBuilder {
156157 match self . try_interleave_columns ( & self . indices [ ..end] ) {
157158 Ok ( columns) => break columns,
158159 Err ( e) if is_offset_overflow ( & e) && end > 1 => {
160+ warn ! (
161+ "Interleave offset overflow with {} rows, retrying with {}" ,
162+ end,
163+ end / 2
164+ ) ;
159165 end /= 2 ;
160166 }
161167 Err ( e) => return Err ( e) ,
@@ -197,12 +203,12 @@ impl BatchBuilder {
197203 }
198204}
199205
200- /// Returns ` true` if the error is an Arrow offset overflow error .
206+ /// Returns true if the error is an Arrow offset overflow.
201207fn is_offset_overflow ( e : & DataFusionError ) -> bool {
202208 matches ! (
203209 e,
204- DataFusionError :: ArrowError ( err , _)
205- if matches!( err . as_ref( ) , ArrowError :: OffsetOverflowError ( _) )
210+ DataFusionError :: ArrowError ( boxed , _)
211+ if matches!( boxed . as_ref( ) , ArrowError :: OffsetOverflowError ( _) )
206212 )
207213}
208214
@@ -222,7 +228,9 @@ mod tests {
222228 use super :: * ;
223229 use arrow:: array:: StringArray ;
224230 use arrow:: datatypes:: { DataType , Field , Schema } ;
225- use datafusion_execution:: memory_pool:: { MemoryConsumer , MemoryPool , UnboundedMemoryPool } ;
231+ use datafusion_execution:: memory_pool:: {
232+ MemoryConsumer , MemoryPool , UnboundedMemoryPool ,
233+ } ;
226234
227235 /// Test that interleaving string columns whose combined byte length
228236 /// exceeds i32::MAX does not panic. Arrow's `interleave` panics with
@@ -234,11 +242,7 @@ mod tests {
234242 // Each string is ~768 MB. Three rows total → ~2.3 GB > i32::MAX.
235243 let big_str: String = "x" . repeat ( 768 * 1024 * 1024 ) ;
236244
237- let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new(
238- "s" ,
239- DataType :: Utf8 ,
240- false ,
241- ) ] ) ) ;
245+ let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new( "s" , DataType :: Utf8 , false ) ] ) ) ;
242246
243247 let pool: Arc < dyn MemoryPool > = Arc :: new ( UnboundedMemoryPool :: default ( ) ) ;
244248 let reservation = MemoryConsumer :: new ( "test" ) . register ( & pool) ;
@@ -253,8 +257,7 @@ mod tests {
253257 for stream_idx in 0 ..3 {
254258 let array = StringArray :: from ( vec ! [ big_str. as_str( ) ] ) ;
255259 let batch =
256- RecordBatch :: try_new ( Arc :: clone ( & schema) , vec ! [ Arc :: new( array) ] )
257- . unwrap ( ) ;
260+ RecordBatch :: try_new ( Arc :: clone ( & schema) , vec ! [ Arc :: new( array) ] ) . unwrap ( ) ;
258261 builder. push_batch ( stream_idx, batch) . unwrap ( ) ;
259262 builder. push_row ( stream_idx) ;
260263 }
0 commit comments