@@ -19,7 +19,7 @@ use crate::aggregates::group_values::multi_group_by::{
1919 GroupColumn , Nulls , nulls_equal_to,
2020} ;
2121use crate :: aggregates:: group_values:: null_builder:: MaybeNullBufferBuilder ;
22- use arrow:: array:: { Array , ArrayRef , AsArray , ByteView , GenericByteViewArray , make_view } ;
22+ use arrow:: array:: { Array , ArrayRef , AsArray , ByteView , GenericByteViewArray } ;
2323use arrow:: buffer:: { Buffer , ScalarBuffer } ;
2424use arrow:: datatypes:: ByteViewType ;
2525use datafusion_common:: Result ;
@@ -115,7 +115,47 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
115115
116116 // Not null row case
117117 self . nulls . append ( false ) ;
118- self . do_append_val_inner ( arr, row) ;
118+ let input_view = unsafe { * arr. views ( ) . get_unchecked ( row) } ;
119+ let view = Self :: copy_from_view (
120+ input_view,
121+ arr,
122+ row,
123+ & mut self . in_progress ,
124+ & mut self . completed ,
125+ self . max_block_size ,
126+ ) ;
127+ self . views . push ( view) ;
128+ }
129+
130+ /// Copy a non-null value from `arr[row]` into our buffers, reusing the
131+ /// input view. For inline values (len <= 12) the view is returned as-is.
132+ /// For non-inline values the data is copied and the view is updated with
133+ /// our buffer_index/offset.
134+ fn copy_from_view (
135+ input_view : u128 ,
136+ arr : & GenericByteViewArray < B > ,
137+ row : usize ,
138+ in_progress : & mut Vec < u8 > ,
139+ completed : & mut Vec < Buffer > ,
140+ max_block_size : usize ,
141+ ) -> u128 {
142+ let len = input_view as u32 ;
143+ if len <= 12 {
144+ return input_view;
145+ }
146+ let value: & [ u8 ] = unsafe { arr. value_unchecked ( row) . as_ref ( ) } ;
147+ let require_cap = in_progress. len ( ) + value. len ( ) ;
148+ if require_cap > max_block_size {
149+ let flushed_block = replace ( in_progress, Vec :: with_capacity ( max_block_size) ) ;
150+ completed. push ( Buffer :: from_vec ( flushed_block) ) ;
151+ }
152+ let buffer_index = completed. len ( ) as u32 ;
153+ let offset = in_progress. len ( ) as u32 ;
154+ in_progress. extend_from_slice ( value) ;
155+ ByteView :: from ( input_view)
156+ . with_buffer_index ( buffer_index)
157+ . with_offset ( offset)
158+ . as_u128 ( )
119159 }
120160
121161 // Don't inline to keep the code small and give LLVM the best chance of
@@ -157,18 +197,57 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
157197 Nulls :: Some
158198 } ;
159199
200+ let input_views = arr. views ( ) ;
201+
160202 match all_null_or_non_null {
161203 Nulls :: Some => {
162- for & row in rows {
163- self . append_val_inner ( array, row) ;
164- }
204+ let Self {
205+ views,
206+ in_progress,
207+ completed,
208+ nulls,
209+ max_block_size,
210+ ..
211+ } = self ;
212+ views. extend ( rows. iter ( ) . map ( |& row| {
213+ if arr. is_null ( row) {
214+ nulls. append ( true ) ;
215+ 0u128
216+ } else {
217+ nulls. append ( false ) ;
218+ let input_view = unsafe { * input_views. get_unchecked ( row) } ;
219+ Self :: copy_from_view (
220+ input_view,
221+ arr,
222+ row,
223+ in_progress,
224+ completed,
225+ * max_block_size,
226+ )
227+ }
228+ } ) ) ;
165229 }
166230
167231 Nulls :: None => {
168232 self . nulls . append_n ( rows. len ( ) , false ) ;
169- for & row in rows {
170- self . do_append_val_inner ( arr, row) ;
171- }
233+ let Self {
234+ views,
235+ in_progress,
236+ completed,
237+ max_block_size,
238+ ..
239+ } = self ;
240+ views. extend ( rows. iter ( ) . map ( |& row| {
241+ let input_view = unsafe { * input_views. get_unchecked ( row) } ;
242+ Self :: copy_from_view (
243+ input_view,
244+ arr,
245+ row,
246+ in_progress,
247+ completed,
248+ * max_block_size,
249+ )
250+ } ) ) ;
172251 }
173252
174253 Nulls :: All => {
@@ -179,46 +258,6 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
179258 }
180259 }
181260
182- fn do_append_val_inner ( & mut self , array : & GenericByteViewArray < B > , row : usize )
183- where
184- B : ByteViewType ,
185- {
186- let value: & [ u8 ] = array. value ( row) . as_ref ( ) ;
187-
188- let value_len = value. len ( ) ;
189- let view = if value_len <= 12 {
190- make_view ( value, 0 , 0 )
191- } else {
192- // Ensure big enough block to hold the value firstly
193- self . ensure_in_progress_big_enough ( value_len) ;
194-
195- // Append value
196- let buffer_index = self . completed . len ( ) ;
197- let offset = self . in_progress . len ( ) ;
198- self . in_progress . extend_from_slice ( value) ;
199-
200- make_view ( value, buffer_index as u32 , offset as u32 )
201- } ;
202-
203- // Append view
204- self . views . push ( view) ;
205- }
206-
207- fn ensure_in_progress_big_enough ( & mut self , value_len : usize ) {
208- debug_assert ! ( value_len > 12 ) ;
209- let require_cap = self . in_progress . len ( ) + value_len;
210-
211- // If current block isn't big enough, flush it and create a new in progress block
212- if require_cap > self . max_block_size {
213- let flushed_block = replace (
214- & mut self . in_progress ,
215- Vec :: with_capacity ( self . max_block_size ) ,
216- ) ;
217- let buffer = Buffer :: from_vec ( flushed_block) ;
218- self . completed . push ( buffer) ;
219- }
220- }
221-
222261 /// Compare the value at `lhs_row` in this builder with
223262 /// the value at `rhs_row` in input `array`
224263 ///
0 commit comments