1919
2020import math
2121import time
22+ from typing import Callable
2223
2324import pyarrow as pa
24- from datafusion import SessionContext , col
25+ from datafusion import SessionContext , col , DataFrame
2526from datafusion import functions as f
2627
2728
28- def run_simple_aggregation (
29- n_batches : int = 8 ,
30- batch_size : int = 1_000_000 ,
31- n_partitions : int | None = None ,
32- ) -> None :
33- """Simple aggregation benchmark (original)."""
34- ctx = SessionContext ()
35- batches = []
36- for i in range (n_batches ):
37- start = i * batch_size
38- arr = pa .array (range (start , start + batch_size ))
39- batches .append (pa .record_batch ([arr ], names = ["a" ]))
40-
29+ def create_partitions (batches : list [pa .RecordBatch ], n_partitions : int | None = None ) -> list [list [pa .RecordBatch ]]:
30+ """Create partitions from batches."""
4131 if n_partitions is None :
42- n_partitions = n_batches
43- n_partitions = max (1 , min (n_partitions , n_batches ))
32+ n_partitions = len ( batches )
33+ n_partitions = max (1 , min (n_partitions , len ( batches ) ))
4434 partition_size = math .ceil (len (batches ) / n_partitions )
45- partitions = [
35+ return [
4636 batches [i : i + partition_size ] for i in range (0 , len (batches ), partition_size )
4737 ]
48- df = ctx .create_dataframe (partitions )
4938
39+
40+ def create_dataframe_from_batches (
41+ batches : list [pa .RecordBatch ],
42+ n_partitions : int | None = None
43+ ) -> DataFrame :
44+ """Create a DataFrame from batches with proper partitioning."""
45+ ctx = SessionContext ()
46+ partitions = create_partitions (batches , n_partitions )
47+ return ctx .create_dataframe (partitions )
48+
49+
50+ def time_execution (func : Callable [[], any ], description : str ) -> None :
51+ """Time the execution of a function and print results."""
5052 start = time .perf_counter ()
51- df . aggregate ([], [ f . sum ( col ( "a" ))]). collect ()
53+ result = func ()
5254 duration = time .perf_counter () - start
53- print (f"Simple aggregation: { n_batches } batches in { duration :.3f} s" )
55+
56+ if hasattr (result , '__len__' ):
57+ print (f"{ description } in { duration :.3f} s, { len (result )} result rows" )
58+ else :
59+ print (f"{ description } in { duration :.3f} s" )
5460
5561
56- def run_complex_computations (
57- n_batches : int = 8 ,
58- batch_size : int = 1_000_000 ,
59- n_partitions : int | None = None ,
60- ) -> None :
61- """CPU-intensive computations with multiple columns."""
62- ctx = SessionContext ()
62+ def create_numeric_batches (n_batches : int , batch_size : int ) -> list [pa .RecordBatch ]:
63+ """Create batches with numeric data for simple aggregation."""
64+ batches = []
65+ for i in range (n_batches ):
66+ start = i * batch_size
67+ arr = pa .array (range (start , start + batch_size ))
68+ batches .append (pa .record_batch ([arr ], names = ["a" ]))
69+ return batches
70+
71+
72+ def create_multi_column_batches (n_batches : int , batch_size : int ) -> list [pa .RecordBatch ]:
73+ """Create batches with multiple columns for complex computations."""
6374 batches = []
6475 for i in range (n_batches ):
6576 start = i * batch_size
66- # Create multiple columns with different data types
6777 arr_a = pa .array (range (start , start + batch_size ))
6878 arr_b = pa .array ([x * 2.5 + 1.0 for x in range (start , start + batch_size )])
6979 arr_c = pa .array ([x % 1000 for x in range (start , start + batch_size )])
7080 batches .append (pa .record_batch ([arr_a , arr_b , arr_c ], names = ["a" , "b" , "c" ]))
81+ return batches
7182
72- if n_partitions is None :
73- n_partitions = n_batches
74- n_partitions = max (1 , min (n_partitions , n_batches ))
75- partition_size = math .ceil (len (batches ) / n_partitions )
76- partitions = [
77- batches [i : i + partition_size ] for i in range (0 , len (batches ), partition_size )
78- ]
79- df = ctx .create_dataframe (partitions )
83+
84+ def create_string_batches (n_batches : int , batch_size : int ) -> list [pa .RecordBatch ]:
85+ """Create batches with string data for string processing."""
86+ batches = []
87+ for i in range (n_batches ):
88+ start = i * batch_size
89+ arr_id = pa .array ([f"user_{ x :08d} " for x in range (start , start + batch_size )])
90+ arr_email = pa .array ([f"user{ x } @example{ x % 10 } .com" for x in range (start , start + batch_size )])
91+ arr_category = pa .array ([f"category_{ x % 100 :03d} " for x in range (start , start + batch_size )])
92+ arr_value = pa .array ([x * 1.5 for x in range (start , start + batch_size )])
93+
94+ batches .append (pa .record_batch (
95+ [arr_id , arr_email , arr_category , arr_value ],
96+ names = ["id" , "email" , "category" , "value" ]
97+ ))
98+ return batches
99+
100+
101+ def create_groupby_batches (n_batches : int , batch_size : int ) -> list [pa .RecordBatch ]:
102+ """Create batches for group-by operations."""
103+ batches = []
104+ for i in range (n_batches ):
105+ start = i * batch_size
106+ arr_a = pa .array (range (start , start + batch_size ))
107+ arr_group = pa .array ([x % 1000 for x in range (start , start + batch_size )])
108+ arr_value = pa .array ([x * 2.5 + (x % 100 ) for x in range (start , start + batch_size )])
109+
110+ batches .append (pa .record_batch (
111+ [arr_a , arr_group , arr_value ],
112+ names = ["a" , "group_id" , "value" ]
113+ ))
114+ return batches
115+
116+
117+ def run_simple_aggregation (
118+ n_batches : int = 8 ,
119+ batch_size : int = 1_000_000 ,
120+ n_partitions : int | None = None ,
121+ ) -> None :
122+ """Simple aggregation benchmark (original)."""
123+ batches = create_numeric_batches (n_batches , batch_size )
124+ df = create_dataframe_from_batches (batches , n_partitions )
125+
126+ def execute ():
127+ return df .aggregate ([], [f .sum (col ("a" ))]).collect ()
128+
129+ time_execution (execute , f"Simple aggregation: { n_batches } batches" )
130+
131+
132+ def run_complex_computations (
133+ n_batches : int = 8 ,
134+ batch_size : int = 1_000_000 ,
135+ n_partitions : int | None = None ,
136+ ) -> None :
137+ """CPU-intensive computations with multiple columns."""
138+ batches = create_multi_column_batches (n_batches , batch_size )
139+ df = create_dataframe_from_batches (batches , n_partitions )
80140
81141 # CPU-intensive transformations
82142 df = df .select (
@@ -110,10 +170,10 @@ def run_complex_computations(
110170 ]
111171 )
112172
113- start = time . perf_counter ()
114- result = df .collect ()
115- duration = time . perf_counter () - start
116- print ( f"Complex computations: { n_batches } batches in { duration :.3f } s, { len ( result ) } result rows " )
173+ def execute ():
174+ return df .collect ()
175+
176+ time_execution ( execute , f"Complex computations: { n_batches } batches" )
117177
118178
119179def run_string_processing (
@@ -122,29 +182,8 @@ def run_string_processing(
122182 n_partitions : int | None = None ,
123183) -> None :
124184 """CPU-intensive string processing operations."""
125- ctx = SessionContext ()
126- batches = []
127- for i in range (n_batches ):
128- start = i * batch_size
129- # Create string data
130- arr_id = pa .array ([f"user_{ x :08d} " for x in range (start , start + batch_size )])
131- arr_email = pa .array ([f"user{ x } @example{ x % 10 } .com" for x in range (start , start + batch_size )])
132- arr_category = pa .array ([f"category_{ x % 100 :03d} " for x in range (start , start + batch_size )])
133- arr_value = pa .array ([x * 1.5 for x in range (start , start + batch_size )])
134-
135- batches .append (pa .record_batch (
136- [arr_id , arr_email , arr_category , arr_value ],
137- names = ["id" , "email" , "category" , "value" ]
138- ))
139-
140- if n_partitions is None :
141- n_partitions = n_batches
142- n_partitions = max (1 , min (n_partitions , n_batches ))
143- partition_size = math .ceil (len (batches ) / n_partitions )
144- partitions = [
145- batches [i : i + partition_size ] for i in range (0 , len (batches ), partition_size )
146- ]
147- df = ctx .create_dataframe (partitions )
185+ batches = create_string_batches (n_batches , batch_size )
186+ df = create_dataframe_from_batches (batches , n_partitions )
148187
149188 # String processing operations
150189 df = df .select (
@@ -175,10 +214,10 @@ def run_string_processing(
175214 ]
176215 )
177216
178- start = time . perf_counter ()
179- result = df .collect ()
180- duration = time . perf_counter () - start
181- print ( f"String processing: { n_batches } batches in { duration :.3f } s, { len ( result ) } result rows " )
217+ def execute ():
218+ return df .collect ()
219+
220+ time_execution ( execute , f"String processing: { n_batches } batches" )
182221
183222
184223def run_window_functions (
@@ -187,27 +226,8 @@ def run_window_functions(
187226 n_partitions : int | None = None ,
188227) -> None :
189228 """CPU-intensive window function operations."""
190- ctx = SessionContext ()
191- batches = []
192- for i in range (n_batches ):
193- start = i * batch_size
194- arr_a = pa .array (range (start , start + batch_size ))
195- arr_group = pa .array ([x % 1000 for x in range (start , start + batch_size )])
196- arr_value = pa .array ([x * 2.5 + (x % 100 ) for x in range (start , start + batch_size )])
197-
198- batches .append (pa .record_batch (
199- [arr_a , arr_group , arr_value ],
200- names = ["a" , "group_id" , "value" ]
201- ))
202-
203- if n_partitions is None :
204- n_partitions = n_batches
205- n_partitions = max (1 , min (n_partitions , n_batches ))
206- partition_size = math .ceil (len (batches ) / n_partitions )
207- partitions = [
208- batches [i : i + partition_size ] for i in range (0 , len (batches ), partition_size )
209- ]
210- df = ctx .create_dataframe (partitions )
229+ batches = create_groupby_batches (n_batches , batch_size )
230+ df = create_dataframe_from_batches (batches , n_partitions )
211231
212232 # Note: Window functions in DataFusion Python may have limited support
213233 # Using group-by operations that require sorting and complex aggregations
@@ -231,10 +251,10 @@ def run_window_functions(
231251 ]
232252 )
233253
234- start = time . perf_counter ()
235- result = df .collect ()
236- duration = time . perf_counter () - start
237- print ( f"Window/groupby operations: { n_batches } batches in { duration :.3f } s, { len ( result ) } result rows " )
254+ def execute ():
255+ return df .collect ()
256+
257+ time_execution ( execute , f"Window/groupby operations: { n_batches } batches" )
238258
239259
240260def run (
0 commit comments