2525from datafusion import functions as f
2626
2727
28- def run (
28+ def run_simple_aggregation (
2929 n_batches : int = 8 ,
3030 batch_size : int = 1_000_000 ,
3131 n_partitions : int | None = None ,
3232) -> None :
33- """Aggregate column 'a' across partitions and report runtime ."""
33+ """Simple aggregation benchmark (original) ."""
3434 ctx = SessionContext ()
3535 batches = []
3636 for i in range (n_batches ):
@@ -50,13 +50,219 @@ def run(
5050 start = time .perf_counter ()
5151 df .aggregate ([], [f .sum (col ("a" ))]).collect ()
5252 duration = time .perf_counter () - start
53- print (f"{ n_batches } batches aggregated in { duration :.3f} s" )
53+ print (f"Simple aggregation: { n_batches } batches in { duration :.3f} s" )
54+
55+
56+ def run_complex_computations (
57+ n_batches : int = 8 ,
58+ batch_size : int = 1_000_000 ,
59+ n_partitions : int | None = None ,
60+ ) -> None :
61+ """CPU-intensive computations with multiple columns."""
62+ ctx = SessionContext ()
63+ batches = []
64+ for i in range (n_batches ):
65+ start = i * batch_size
66+ # Create multiple columns with different data types
67+ arr_a = pa .array (range (start , start + batch_size ))
68+ arr_b = pa .array ([x * 2.5 + 1.0 for x in range (start , start + batch_size )])
69+ arr_c = pa .array ([x % 1000 for x in range (start , start + batch_size )])
70+ batches .append (pa .record_batch ([arr_a , arr_b , arr_c ], names = ["a" , "b" , "c" ]))
71+
72+ if n_partitions is None :
73+ n_partitions = n_batches
74+ n_partitions = max (1 , min (n_partitions , n_batches ))
75+ partition_size = math .ceil (len (batches ) / n_partitions )
76+ partitions = [
77+ batches [i : i + partition_size ] for i in range (0 , len (batches ), partition_size )
78+ ]
79+ df = ctx .create_dataframe (partitions )
80+
81+ # CPU-intensive transformations
82+ df = df .select (
83+ col ("a" ),
84+ col ("b" ),
85+ col ("c" ),
86+ # Complex mathematical operations
87+ (col ("a" ) * col ("b" ) + col ("c" ) * col ("c" )).alias ("poly1" ),
88+ (col ("a" ) * col ("a" ) * col ("a" ) + col ("b" ) * col ("b" )).alias ("poly2" ),
89+ (col ("a" ) / (col ("b" ) + 1.0 ) * col ("c" )).alias ("ratio" ),
90+ # More expensive operations
91+ f .sqrt (col ("a" ) + col ("b" )).alias ("sqrt_sum" ),
92+ (col ("a" ) * col ("a" )).alias ("power2" ),
93+ (col ("b" ) * col ("b" ) * col ("b" )).alias ("power3" ),
94+ )
95+
96+ # Multiple filtering operations
97+ df = df .filter (col ("a" ) % 100 < 50 )
98+ df = df .filter (col ("poly1" ) > 1000 )
99+ df = df .filter (col ("ratio" ) < 10000 )
100+
101+ # Group by with multiple aggregations
102+ df = df .aggregate (
103+ [col ("c" ) % 10 ],
104+ [
105+ f .sum (col ("poly1" )).alias ("sum_poly1" ),
106+ f .avg (col ("poly2" )).alias ("avg_poly2" ),
107+ f .max (col ("ratio" )).alias ("max_ratio" ),
108+ f .min (col ("sqrt_sum" )).alias ("min_sqrt" ),
109+ f .count (col ("a" )).alias ("count_rows" ),
110+ ]
111+ )
112+
113+ start = time .perf_counter ()
114+ result = df .collect ()
115+ duration = time .perf_counter () - start
116+ print (f"Complex computations: { n_batches } batches in { duration :.3f} s, { len (result )} result rows" )
117+
118+
119+ def run_string_processing (
120+ n_batches : int = 8 ,
121+ batch_size : int = 500_000 , # Smaller batches for string operations
122+ n_partitions : int | None = None ,
123+ ) -> None :
124+ """CPU-intensive string processing operations."""
125+ ctx = SessionContext ()
126+ batches = []
127+ for i in range (n_batches ):
128+ start = i * batch_size
129+ # Create string data
130+ arr_id = pa .array ([f"user_{ x :08d} " for x in range (start , start + batch_size )])
131+ arr_email = pa .array ([f"user{ x } @example{ x % 10 } .com" for x in range (start , start + batch_size )])
132+ arr_category = pa .array ([f"category_{ x % 100 :03d} " for x in range (start , start + batch_size )])
133+ arr_value = pa .array ([x * 1.5 for x in range (start , start + batch_size )])
134+
135+ batches .append (pa .record_batch (
136+ [arr_id , arr_email , arr_category , arr_value ],
137+ names = ["id" , "email" , "category" , "value" ]
138+ ))
139+
140+ if n_partitions is None :
141+ n_partitions = n_batches
142+ n_partitions = max (1 , min (n_partitions , n_batches ))
143+ partition_size = math .ceil (len (batches ) / n_partitions )
144+ partitions = [
145+ batches [i : i + partition_size ] for i in range (0 , len (batches ), partition_size )
146+ ]
147+ df = ctx .create_dataframe (partitions )
148+
149+ # String processing operations
150+ df = df .select (
151+ col ("id" ),
152+ col ("email" ),
153+ col ("category" ),
154+ col ("value" ),
155+ # String manipulations (CPU intensive)
156+ f .length (col ("email" )).alias ("email_length" ),
157+ f .upper (col ("category" )).alias ("category_upper" ),
158+ f .lower (col ("email" )).alias ("email_lower" ),
159+ f .length (col ("id" )).alias ("id_length" ),
160+ )
161+
162+ # String-based filtering
163+ df = df .filter (f .length (col ("email" )) > 15 )
164+ df = df .filter (f .length (col ("category_upper" )) > 10 )
165+ df = df .filter (col ("email_length" ) < 50 )
166+
167+ # Group by operations with string processing
168+ df = df .aggregate (
169+ [col ("category" )], # Group by full category
170+ [
171+ f .sum (col ("value" )).alias ("total_value" ),
172+ f .avg (col ("email_length" )).alias ("avg_email_len" ),
173+ f .max (col ("id_length" )).alias ("max_id_len" ),
174+ f .count (col ("id" )).alias ("count_users" ),
175+ ]
176+ )
177+
178+ start = time .perf_counter ()
179+ result = df .collect ()
180+ duration = time .perf_counter () - start
181+ print (f"String processing: { n_batches } batches in { duration :.3f} s, { len (result )} result rows" )
182+
183+
184+ def run_window_functions (
185+ n_batches : int = 8 ,
186+ batch_size : int = 1_000_000 ,
187+ n_partitions : int | None = None ,
188+ ) -> None :
189+ """CPU-intensive window function operations."""
190+ ctx = SessionContext ()
191+ batches = []
192+ for i in range (n_batches ):
193+ start = i * batch_size
194+ arr_a = pa .array (range (start , start + batch_size ))
195+ arr_group = pa .array ([x % 1000 for x in range (start , start + batch_size )])
196+ arr_value = pa .array ([x * 2.5 + (x % 100 ) for x in range (start , start + batch_size )])
197+
198+ batches .append (pa .record_batch (
199+ [arr_a , arr_group , arr_value ],
200+ names = ["a" , "group_id" , "value" ]
201+ ))
202+
203+ if n_partitions is None :
204+ n_partitions = n_batches
205+ n_partitions = max (1 , min (n_partitions , n_batches ))
206+ partition_size = math .ceil (len (batches ) / n_partitions )
207+ partitions = [
208+ batches [i : i + partition_size ] for i in range (0 , len (batches ), partition_size )
209+ ]
210+ df = ctx .create_dataframe (partitions )
211+
212+ # Note: Window functions in DataFusion Python may have limited support
213+ # Using group-by operations that require sorting and complex aggregations
214+ df = df .filter (col ("value" ) > 100 )
215+ df = df .select (
216+ col ("group_id" ),
217+ col ("value" ),
218+ (col ("value" ) * col ("value" )).alias ("value_squared" ),
219+ f .sqrt (col ("value" )).alias ("value_sqrt" ),
220+ )
221+
222+ # Multiple aggregations per group (CPU intensive)
223+ df = df .aggregate (
224+ [col ("group_id" )],
225+ [
226+ f .sum (col ("value" )).alias ("sum_value" ),
227+ f .avg (col ("value_squared" )).alias ("avg_squared" ),
228+ f .max (col ("value_sqrt" )).alias ("max_sqrt" ),
229+ f .min (col ("value" )).alias ("min_value" ),
230+ f .count (col ("value" )).alias ("count_rows" ),
231+ ]
232+ )
233+
234+ start = time .perf_counter ()
235+ result = df .collect ()
236+ duration = time .perf_counter () - start
237+ print (f"Window/groupby operations: { n_batches } batches in { duration :.3f} s, { len (result )} result rows" )
238+
239+
240+ def run (
241+ n_batches : int = 8 ,
242+ batch_size : int = 1_000_000 ,
243+ n_partitions : int | None = None ,
244+ workload : str = "all" ,
245+ ) -> None :
246+ """Run the specified workload(s)."""
247+ if workload == "simple" or workload == "all" :
248+ run_simple_aggregation (n_batches , batch_size , n_partitions )
249+
250+ if workload == "complex" or workload == "all" :
251+ run_complex_computations (n_batches , batch_size , n_partitions )
252+
253+ if workload == "strings" or workload == "all" :
254+ run_string_processing (n_batches , batch_size // 2 , n_partitions ) # Use smaller batches for strings
255+
256+ if workload == "groupby" or workload == "all" :
257+ run_window_functions (n_batches , batch_size , n_partitions )
54258
55259
56260if __name__ == "__main__" :
57261 import argparse
58262
59- parser = argparse .ArgumentParser ()
263+ parser = argparse .ArgumentParser (
264+ description = "CPU-intensive benchmarks to demonstrate multi-threading benefits"
265+ )
60266 parser .add_argument (
61267 "--batches" ,
62268 type = int ,
@@ -75,9 +281,22 @@ def run(
75281 default = None ,
76282 help = "number of partitions to create (defaults to one per batch)" ,
77283 )
284+ parser .add_argument (
285+ "--workload" ,
286+ type = str ,
287+ default = "all" ,
288+ choices = ["simple" , "complex" , "strings" , "groupby" , "all" ],
289+ help = "type of workload to run: simple (basic aggregation), complex (mathematical operations), strings (string processing), groupby (group-by operations), or all" ,
290+ )
78291 args = parser .parse_args ()
292+
293+ print (f"\n \n Running benchmark with { args .batches } batches, { args .batch_size } rows per batch" )
294+ print (f"Partitions: { args .partitions or args .batches } , Workload: { args .workload } " )
295+ print ("-" * 60 )
296+
79297 run (
80298 n_batches = args .batches ,
81299 batch_size = args .batch_size ,
82300 n_partitions = args .partitions ,
301+ workload = args .workload ,
83302 )
0 commit comments