2020import random
2121import time
2222from io import BytesIO
23+ from typing import List , NamedTuple , Optional
2324
2425import pytest
2526
3940all_params = config ._get_params ()
4041
4142
43+ class DownloadResult (NamedTuple ):
44+ total_bytes : int
45+ measured_start_time : float
46+ measured_end_time : float
47+
48+
4249async def create_client ():
4350 """Initializes async client and gets the current event loop."""
4451 return AsyncGrpcClient ()
4552
4653
54+ def _aggregate_download_results (results : List [DownloadResult ]) -> DownloadResult :
55+ if not results :
56+ raise ValueError ("At least one download result is required." )
57+
58+ total_bytes = sum (result .total_bytes for result in results )
59+ measured_start_time = min (result .measured_start_time for result in results )
60+ measured_end_time = max (result .measured_end_time for result in results )
61+ if measured_end_time <= measured_start_time :
62+ raise ValueError ("Measured elapsed time must be positive." )
63+
64+ return DownloadResult (
65+ total_bytes = total_bytes ,
66+ measured_start_time = measured_start_time ,
67+ measured_end_time = measured_end_time ,
68+ )
69+
70+
71+ def _calculate_average_throughput_mib_s (
72+ download_bytes_list : List [int ], download_elapsed_times : List [float ]
73+ ) -> float :
74+ total_bytes_downloaded = sum (download_bytes_list )
75+ total_elapsed_time = sum (download_elapsed_times )
76+ if total_elapsed_time <= 0 :
77+ raise ValueError ("Total measured elapsed time must be positive." )
78+
79+ return (total_bytes_downloaded / total_elapsed_time ) / (1024 * 1024 )
80+
81+
82+ def _build_download_result (
83+ total_bytes_downloaded : int ,
84+ measured_start_time : Optional [float ],
85+ measured_end_time : Optional [float ],
86+ ) -> DownloadResult :
87+ if measured_start_time is None or measured_end_time is None :
88+ raise ValueError ("No downloads completed during the measured interval." )
89+
90+ return DownloadResult (
91+ total_bytes = total_bytes_downloaded ,
92+ measured_start_time = measured_start_time ,
93+ measured_end_time = measured_end_time ,
94+ )
95+
96+
4797# --- Global Variables for Worker Process ---
4898worker_loop = None
4999worker_client = None
@@ -78,15 +128,18 @@ def _download_time_based_json(client, filename, params):
78128
79129 offset = 0
80130 is_warming_up = True
81- start_time = time .monotonic ()
131+ start_time = time .perf_counter ()
82132 warmup_end_time = start_time + params .warmup_duration
83133 test_end_time = warmup_end_time + params .duration
134+ measured_start_time = None
135+ measured_end_time = None
84136
85- while time .monotonic () < test_end_time :
86- current_time = time .monotonic ()
137+ while time .perf_counter () < test_end_time :
138+ current_time = time .perf_counter ()
87139 if is_warming_up and current_time >= warmup_end_time :
88140 is_warming_up = False
89141 total_bytes_downloaded = 0 # Reset counter after warmup
142+ measured_start_time = current_time
90143
91144 bytes_in_iteration = 0
92145 # For JSON, we can't batch ranges like gRPC, so we download one by one
@@ -110,8 +163,11 @@ def _download_time_based_json(client, filename, params):
110163
111164 if not is_warming_up :
112165 total_bytes_downloaded += bytes_in_iteration
166+ measured_end_time = time .perf_counter ()
113167
114- return total_bytes_downloaded
168+ return _build_download_result (
169+ total_bytes_downloaded , measured_start_time , measured_end_time
170+ )
115171
116172
117173async def _download_time_based_async (client , filename , params ):
@@ -122,15 +178,18 @@ async def _worker_coro():
122178 total_bytes_downloaded = 0
123179 offset = 0
124180 is_warming_up = True
125- start_time = time .monotonic ()
181+ start_time = time .perf_counter ()
126182 warmup_end_time = start_time + params .warmup_duration
127183 test_end_time = warmup_end_time + params .duration
184+ measured_start_time = None
185+ measured_end_time = None
128186
129- while time .monotonic () < test_end_time :
130- current_time = time .monotonic ()
187+ while time .perf_counter () < test_end_time :
188+ current_time = time .perf_counter ()
131189 if is_warming_up and current_time >= warmup_end_time :
132190 is_warming_up = False
133191 total_bytes_downloaded = 0 # Reset counter after warmup
192+ measured_start_time = current_time
134193
135194 ranges = []
136195 if params .pattern == "rand" :
@@ -153,13 +212,16 @@ async def _worker_coro():
153212
154213 if not is_warming_up :
155214 total_bytes_downloaded += params .chunk_size_bytes * params .num_ranges
156- return total_bytes_downloaded
215+ measured_end_time = time .perf_counter ()
216+ return _build_download_result (
217+ total_bytes_downloaded , measured_start_time , measured_end_time
218+ )
157219
158220 tasks = [asyncio .create_task (_worker_coro ()) for _ in range (params .num_coros )]
159221 results = await asyncio .gather (* tasks )
160222
161223 await mrd .close ()
162- return sum (results )
224+ return _aggregate_download_results (results )
163225
164226
165227def _download_files_worker (process_idx , filename , params , bucket_type ):
@@ -175,7 +237,8 @@ def download_files_mp_mc_wrapper(pool, files_names, params, bucket_type):
175237 args = [(i , files_names [i ], params , bucket_type ) for i in range (len (files_names ))]
176238
177239 results = pool .starmap (_download_files_worker , args )
178- return sum (results )
240+ agg_res = _aggregate_download_results (results )
241+ return agg_res .total_bytes , agg_res .measured_end_time - agg_res .measured_start_time
179242
180243
181244@pytest .mark .parametrize (
@@ -198,9 +261,14 @@ def test_downloads_multi_proc_multi_coro(
198261 )
199262
200263 download_bytes_list = []
264+ download_elapsed_times = []
201265
202266 def target_wrapper (* args , ** kwargs ):
203- download_bytes_list .append (download_files_mp_mc_wrapper (pool , * args , ** kwargs ))
267+ total_bytes , measured_elapsed_time = download_files_mp_mc_wrapper (
268+ pool , * args , ** kwargs
269+ )
270+ download_bytes_list .append (total_bytes )
271+ download_elapsed_times .append (measured_elapsed_time )
204272 return
205273
206274 try :
@@ -214,10 +282,9 @@ def target_wrapper(*args, **kwargs):
214282 finally :
215283 pool .close ()
216284 pool .join ()
217- total_bytes_downloaded = sum (download_bytes_list )
218- throughput_mib_s = (
219- total_bytes_downloaded / params .duration / params .rounds
220- ) / (1024 * 1024 )
285+ throughput_mib_s = _calculate_average_throughput_mib_s (
286+ download_bytes_list , download_elapsed_times
287+ )
221288 benchmark .extra_info ["avg_throughput_mib_s" ] = f"{ throughput_mib_s :.2f} "
222289 print (
223290 f"Avg Throughput of { params .rounds } round(s): { throughput_mib_s :.2f} MiB/s"
@@ -226,6 +293,6 @@ def target_wrapper(*args, **kwargs):
226293 benchmark ,
227294 params ,
228295 download_bytes_list = download_bytes_list ,
229- duration = params . duration ,
296+ duration = download_elapsed_times ,
230297 )
231298 publish_resource_metrics (benchmark , m )
0 commit comments