@@ -71,6 +71,8 @@ impl PyTableProvider {
7171 PyTable :: new ( table_provider)
7272 }
7373}
74+ const MAX_TABLE_BYTES_TO_DISPLAY : usize = 2 * 1024 * 1024 ; // 2 MB
75+ const MIN_TABLE_ROWS_TO_DISPLAY : usize = 20 ;
7476
7577/// A PyDataFrame is a representation of a logical plan and an API to compose statements.
7678/// Use it to build a plan and `.collect()` to execute the plan and collect the result.
@@ -79,16 +81,12 @@ impl PyTableProvider {
7981#[ derive( Clone ) ]
8082pub struct PyDataFrame {
8183 df : Arc < DataFrame > ,
82- display_config : Arc < PyDataframeDisplayConfig > ,
8384}
8485
8586impl PyDataFrame {
8687 /// creates a new PyDataFrame
87- pub fn new ( df : DataFrame , display_config : PyDataframeDisplayConfig ) -> Self {
88- Self {
89- df : Arc :: new ( df) ,
90- display_config : Arc :: new ( display_config) ,
91- }
88+ pub fn new ( df : DataFrame ) -> Self {
89+ Self { df : Arc :: new ( df) }
9290 }
9391}
9492
@@ -118,12 +116,7 @@ impl PyDataFrame {
118116 fn __repr__ ( & self , py : Python ) -> PyDataFusionResult < String > {
119117 let ( batches, has_more) = wait_for_future (
120118 py,
121- collect_record_batches_to_display (
122- self . df . as_ref ( ) . clone ( ) ,
123- 10 ,
124- 10 ,
125- self . display_config . max_table_bytes ,
126- ) ,
119+ collect_record_batches_to_display ( self . df . as_ref ( ) . clone ( ) , 10 , 10 ) ,
127120 ) ?;
128121 if batches. is_empty ( ) {
129122 // This should not be reached, but do it for safety since we index into the vector below
@@ -146,9 +139,8 @@ impl PyDataFrame {
146139 py,
147140 collect_record_batches_to_display (
148141 self . df . as_ref ( ) . clone ( ) ,
149- self . display_config . min_table_rows ,
142+ MIN_TABLE_ROWS_TO_DISPLAY ,
150143 usize:: MAX ,
151- self . display_config . max_table_bytes ,
152144 ) ,
153145 ) ?;
154146 if batches. is_empty ( ) {
@@ -189,7 +181,7 @@ impl PyDataFrame {
189181 fn describe ( & self , py : Python ) -> PyDataFusionResult < Self > {
190182 let df = self . df . as_ref ( ) . clone ( ) ;
191183 let stat_df = wait_for_future ( py, df. describe ( ) ) ?;
192- Ok ( Self :: new ( stat_df, ( * self . display_config ) . clone ( ) ) )
184+ Ok ( Self :: new ( stat_df) )
193185 }
194186
195187 /// Returns the schema from the logical plan
@@ -219,31 +211,31 @@ impl PyDataFrame {
219211 fn select_columns ( & self , args : Vec < PyBackedStr > ) -> PyDataFusionResult < Self > {
220212 let args = args. iter ( ) . map ( |s| s. as_ref ( ) ) . collect :: < Vec < & str > > ( ) ;
221213 let df = self . df . as_ref ( ) . clone ( ) . select_columns ( & args) ?;
222- Ok ( Self :: new ( df, ( * self . display_config ) . clone ( ) ) )
214+ Ok ( Self :: new ( df) )
223215 }
224216
225217 #[ pyo3( signature = ( * args) ) ]
226218 fn select ( & self , args : Vec < PyExpr > ) -> PyDataFusionResult < Self > {
227219 let expr = args. into_iter ( ) . map ( |e| e. into ( ) ) . collect ( ) ;
228220 let df = self . df . as_ref ( ) . clone ( ) . select ( expr) ?;
229- Ok ( Self :: new ( df, ( * self . display_config ) . clone ( ) ) )
221+ Ok ( Self :: new ( df) )
230222 }
231223
232224 #[ pyo3( signature = ( * args) ) ]
233225 fn drop ( & self , args : Vec < PyBackedStr > ) -> PyDataFusionResult < Self > {
234226 let cols = args. iter ( ) . map ( |s| s. as_ref ( ) ) . collect :: < Vec < & str > > ( ) ;
235227 let df = self . df . as_ref ( ) . clone ( ) . drop_columns ( & cols) ?;
236- Ok ( Self :: new ( df, ( * self . display_config ) . clone ( ) ) )
228+ Ok ( Self :: new ( df) )
237229 }
238230
239231 fn filter ( & self , predicate : PyExpr ) -> PyDataFusionResult < Self > {
240232 let df = self . df . as_ref ( ) . clone ( ) . filter ( predicate. into ( ) ) ?;
241- Ok ( Self :: new ( df, ( * self . display_config ) . clone ( ) ) )
233+ Ok ( Self :: new ( df) )
242234 }
243235
244236 fn with_column ( & self , name : & str , expr : PyExpr ) -> PyDataFusionResult < Self > {
245237 let df = self . df . as_ref ( ) . clone ( ) . with_column ( name, expr. into ( ) ) ?;
246- Ok ( Self :: new ( df, ( * self . display_config ) . clone ( ) ) )
238+ Ok ( Self :: new ( df) )
247239 }
248240
249241 fn with_columns ( & self , exprs : Vec < PyExpr > ) -> PyDataFusionResult < Self > {
@@ -253,7 +245,7 @@ impl PyDataFrame {
253245 let name = format ! ( "{}" , expr. schema_name( ) ) ;
254246 df = df. with_column ( name. as_str ( ) , expr) ?
255247 }
256- Ok ( Self :: new ( df, ( * self . display_config ) . clone ( ) ) )
248+ Ok ( Self :: new ( df) )
257249 }
258250
259251 /// Rename one column by applying a new projection. This is a no-op if the column to be
@@ -264,27 +256,27 @@ impl PyDataFrame {
264256 . as_ref ( )
265257 . clone ( )
266258 . with_column_renamed ( old_name, new_name) ?;
267- Ok ( Self :: new ( df, ( * self . display_config ) . clone ( ) ) )
259+ Ok ( Self :: new ( df) )
268260 }
269261
270262 fn aggregate ( & self , group_by : Vec < PyExpr > , aggs : Vec < PyExpr > ) -> PyDataFusionResult < Self > {
271263 let group_by = group_by. into_iter ( ) . map ( |e| e. into ( ) ) . collect ( ) ;
272264 let aggs = aggs. into_iter ( ) . map ( |e| e. into ( ) ) . collect ( ) ;
273265 let df = self . df . as_ref ( ) . clone ( ) . aggregate ( group_by, aggs) ?;
274- Ok ( Self :: new ( df, ( * self . display_config ) . clone ( ) ) )
266+ Ok ( Self :: new ( df) )
275267 }
276268
277269 #[ pyo3( signature = ( * exprs) ) ]
278270 fn sort ( & self , exprs : Vec < PySortExpr > ) -> PyDataFusionResult < Self > {
279271 let exprs = to_sort_expressions ( exprs) ;
280272 let df = self . df . as_ref ( ) . clone ( ) . sort ( exprs) ?;
281- Ok ( Self :: new ( df, ( * self . display_config ) . clone ( ) ) )
273+ Ok ( Self :: new ( df) )
282274 }
283275
284276 #[ pyo3( signature = ( count, offset=0 ) ) ]
285277 fn limit ( & self , count : usize , offset : usize ) -> PyDataFusionResult < Self > {
286278 let df = self . df . as_ref ( ) . clone ( ) . limit ( offset, Some ( count) ) ?;
287- Ok ( Self :: new ( df, ( * self . display_config ) . clone ( ) ) )
279+ Ok ( Self :: new ( df) )
288280 }
289281
290282 /// Executes the plan, returning a list of `RecordBatch`es.
@@ -301,7 +293,7 @@ impl PyDataFrame {
301293 /// Cache DataFrame.
302294 fn cache ( & self , py : Python ) -> PyDataFusionResult < Self > {
303295 let df = wait_for_future ( py, self . df . as_ref ( ) . clone ( ) . cache ( ) ) ?;
304- Ok ( Self :: new ( df, ( * self . display_config ) . clone ( ) ) )
296+ Ok ( Self :: new ( df) )
305297 }
306298
307299 /// Executes this DataFrame and collects all results into a vector of vector of RecordBatch
@@ -326,7 +318,7 @@ impl PyDataFrame {
326318 /// Filter out duplicate rows
327319 fn distinct ( & self ) -> PyDataFusionResult < Self > {
328320 let df = self . df . as_ref ( ) . clone ( ) . distinct ( ) ?;
329- Ok ( Self :: new ( df, ( * self . display_config ) . clone ( ) ) )
321+ Ok ( Self :: new ( df) )
330322 }
331323
332324 fn join (
@@ -360,7 +352,7 @@ impl PyDataFrame {
360352 & right_keys,
361353 None ,
362354 ) ?;
363- Ok ( Self :: new ( df, ( * self . display_config ) . clone ( ) ) )
355+ Ok ( Self :: new ( df) )
364356 }
365357
366358 fn join_on (
@@ -389,7 +381,7 @@ impl PyDataFrame {
389381 . as_ref ( )
390382 . clone ( )
391383 . join_on ( right. df . as_ref ( ) . clone ( ) , join_type, exprs) ?;
392- Ok ( Self :: new ( df, ( * self . display_config ) . clone ( ) ) )
384+ Ok ( Self :: new ( df) )
393385 }
394386
395387 /// Print the query plan
@@ -422,7 +414,7 @@ impl PyDataFrame {
422414 . as_ref ( )
423415 . clone ( )
424416 . repartition ( Partitioning :: RoundRobinBatch ( num) ) ?;
425- Ok ( Self :: new ( new_df, ( * self . display_config ) . clone ( ) ) )
417+ Ok ( Self :: new ( new_df) )
426418 }
427419
428420 /// Repartition a `DataFrame` based on a logical partitioning scheme.
@@ -434,7 +426,7 @@ impl PyDataFrame {
434426 . as_ref ( )
435427 . clone ( )
436428 . repartition ( Partitioning :: Hash ( expr, num) ) ?;
437- Ok ( Self :: new ( new_df, ( * self . display_config ) . clone ( ) ) )
429+ Ok ( Self :: new ( new_df) )
438430 }
439431
440432 /// Calculate the union of two `DataFrame`s, preserving duplicate rows.The
@@ -450,7 +442,7 @@ impl PyDataFrame {
450442 self . df . as_ref ( ) . clone ( ) . union ( py_df. df . as_ref ( ) . clone ( ) ) ?
451443 } ;
452444
453- Ok ( Self :: new ( new_df, ( * self . display_config ) . clone ( ) ) )
445+ Ok ( Self :: new ( new_df) )
454446 }
455447
456448 /// Calculate the distinct union of two `DataFrame`s. The
@@ -461,7 +453,7 @@ impl PyDataFrame {
461453 . as_ref ( )
462454 . clone ( )
463455 . union_distinct ( py_df. df . as_ref ( ) . clone ( ) ) ?;
464- Ok ( Self :: new ( new_df, ( * self . display_config ) . clone ( ) ) )
456+ Ok ( Self :: new ( new_df) )
465457 }
466458
467459 #[ pyo3( signature = ( column, preserve_nulls=true ) ) ]
@@ -502,13 +494,13 @@ impl PyDataFrame {
502494 . as_ref ( )
503495 . clone ( )
504496 . intersect ( py_df. df . as_ref ( ) . clone ( ) ) ?;
505- Ok ( Self :: new ( new_df, ( * self . display_config ) . clone ( ) ) )
497+ Ok ( Self :: new ( new_df) )
506498 }
507499
508500 /// Calculate the exception of two `DataFrame`s. The two `DataFrame`s must have exactly the same schema
509501 fn except_all ( & self , py_df : PyDataFrame ) -> PyDataFusionResult < Self > {
510502 let new_df = self . df . as_ref ( ) . clone ( ) . except ( py_df. df . as_ref ( ) . clone ( ) ) ?;
511- Ok ( Self :: new ( new_df, ( * self . display_config ) . clone ( ) ) )
503+ Ok ( Self :: new ( new_df) )
512504 }
513505
514506 /// Write a `DataFrame` to a CSV file.
@@ -806,7 +798,6 @@ async fn collect_record_batches_to_display(
806798 df : DataFrame ,
807799 min_rows : usize ,
808800 max_rows : usize ,
809- max_table_bytes : usize ,
810801) -> Result < ( Vec < RecordBatch > , bool ) , DataFusionError > {
811802 let partitioned_stream = df. execute_stream_partitioned ( ) . await ?;
812803 let mut stream = futures:: stream:: iter ( partitioned_stream) . flatten ( ) ;
@@ -815,7 +806,7 @@ async fn collect_record_batches_to_display(
815806 let mut record_batches = Vec :: default ( ) ;
816807 let mut has_more = false ;
817808
818- while ( size_estimate_so_far < max_table_bytes && rows_so_far < max_rows)
809+ while ( size_estimate_so_far < MAX_TABLE_BYTES_TO_DISPLAY && rows_so_far < max_rows)
819810 || rows_so_far < min_rows
820811 {
821812 let mut rb = match stream. next ( ) . await {
@@ -830,8 +821,8 @@ async fn collect_record_batches_to_display(
830821 if rows_in_rb > 0 {
831822 size_estimate_so_far += rb. get_array_memory_size ( ) ;
832823
833- if size_estimate_so_far > max_table_bytes {
834- let ratio = max_table_bytes as f32 / size_estimate_so_far as f32 ;
824+ if size_estimate_so_far > MAX_TABLE_BYTES_TO_DISPLAY {
825+ let ratio = MAX_TABLE_BYTES_TO_DISPLAY as f32 / size_estimate_so_far as f32 ;
835826 let total_rows = rows_in_rb + rows_so_far;
836827
837828 let mut reduced_row_num = ( total_rows as f32 * ratio) . round ( ) as usize ;
0 commit comments