2020import random
2121import time
2222from io import BytesIO
23+ from typing import NamedTuple
2324
2425import pytest
2526
3940all_params = config ._get_params ()
4041
4142
43+ class DownloadResult (NamedTuple ):
44+ total_bytes : int
45+ measured_start_time : float
46+ measured_end_time : float
47+
48+
4249async def create_client ():
4350 """Initializes async client and gets the current event loop."""
4451 return AsyncGrpcClient ()
4552
4653
54+ def _aggregate_download_results (results ):
55+ if not results :
56+ raise ValueError ("At least one download result is required." )
57+
58+ total_bytes = sum (result .total_bytes for result in results )
59+ measured_start_time = min (result .measured_start_time for result in results )
60+ measured_end_time = max (result .measured_end_time for result in results )
61+ measured_elapsed_time = measured_end_time - measured_start_time
62+ if measured_elapsed_time <= 0 :
63+ raise ValueError ("Measured elapsed time must be positive." )
64+
65+ return total_bytes , measured_elapsed_time
66+
67+
68+ def _calculate_average_throughput_mib_s (download_bytes_list , download_elapsed_times ):
69+ total_bytes_downloaded = sum (download_bytes_list )
70+ total_elapsed_time = sum (download_elapsed_times )
71+ if total_elapsed_time <= 0 :
72+ raise ValueError ("Total measured elapsed time must be positive." )
73+
74+ return (total_bytes_downloaded / total_elapsed_time ) / (1024 * 1024 )
75+
76+
77+ def _record_measured_start (measured_start_time , current_time ):
78+ if measured_start_time is None :
79+ return current_time
80+ return measured_start_time
81+
82+
83+ def _build_download_result (
84+ total_bytes_downloaded , measured_start_time , measured_end_time
85+ ):
86+ if measured_start_time is None or measured_end_time is None :
87+ raise ValueError ("No downloads completed during the measured interval." )
88+
89+ return DownloadResult (
90+ total_bytes = total_bytes_downloaded ,
91+ measured_start_time = measured_start_time ,
92+ measured_end_time = measured_end_time ,
93+ )
94+
95+
4796# --- Global Variables for Worker Process ---
4897worker_loop = None
4998worker_client = None
@@ -78,15 +127,20 @@ def _download_time_based_json(client, filename, params):
78127
79128 offset = 0
80129 is_warming_up = True
81- start_time = time .monotonic ()
130+ start_time = time .perf_counter ()
82131 warmup_end_time = start_time + params .warmup_duration
83132 test_end_time = warmup_end_time + params .duration
133+ measured_start_time = None
134+ measured_end_time = None
84135
85- while time .monotonic () < test_end_time :
86- current_time = time .monotonic ()
136+ while time .perf_counter () < test_end_time :
137+ current_time = time .perf_counter ()
87138 if is_warming_up and current_time >= warmup_end_time :
88139 is_warming_up = False
89140 total_bytes_downloaded = 0 # Reset counter after warmup
141+ measured_start_time = _record_measured_start (
142+ measured_start_time , current_time
143+ )
90144
91145 bytes_in_iteration = 0
92146 # For JSON, we can't batch ranges like gRPC, so we download one by one
@@ -110,8 +164,11 @@ def _download_time_based_json(client, filename, params):
110164
111165 if not is_warming_up :
112166 total_bytes_downloaded += bytes_in_iteration
167+ measured_end_time = time .perf_counter ()
113168
114- return total_bytes_downloaded
169+ return _build_download_result (
170+ total_bytes_downloaded , measured_start_time , measured_end_time
171+ )
115172
116173
117174async def _download_time_based_async (client , filename , params ):
@@ -122,15 +179,20 @@ async def _worker_coro():
122179 total_bytes_downloaded = 0
123180 offset = 0
124181 is_warming_up = True
125- start_time = time .monotonic ()
182+ start_time = time .perf_counter ()
126183 warmup_end_time = start_time + params .warmup_duration
127184 test_end_time = warmup_end_time + params .duration
185+ measured_start_time = None
186+ measured_end_time = None
128187
129- while time .monotonic () < test_end_time :
130- current_time = time .monotonic ()
188+ while time .perf_counter () < test_end_time :
189+ current_time = time .perf_counter ()
131190 if is_warming_up and current_time >= warmup_end_time :
132191 is_warming_up = False
133192 total_bytes_downloaded = 0 # Reset counter after warmup
193+ measured_start_time = _record_measured_start (
194+ measured_start_time , current_time
195+ )
134196
135197 ranges = []
136198 if params .pattern == "rand" :
@@ -153,13 +215,22 @@ async def _worker_coro():
153215
154216 if not is_warming_up :
155217 total_bytes_downloaded += params .chunk_size_bytes * params .num_ranges
156- return total_bytes_downloaded
218+ measured_end_time = time .perf_counter ()
219+ return _build_download_result (
220+ total_bytes_downloaded , measured_start_time , measured_end_time
221+ )
157222
158223 tasks = [asyncio .create_task (_worker_coro ()) for _ in range (params .num_coros )]
159224 results = await asyncio .gather (* tasks )
160225
161226 await mrd .close ()
162- return sum (results )
227+ total_bytes , measured_elapsed_time = _aggregate_download_results (results )
228+ measured_start_time = min (result .measured_start_time for result in results )
229+ return DownloadResult (
230+ total_bytes = total_bytes ,
231+ measured_start_time = measured_start_time ,
232+ measured_end_time = measured_start_time + measured_elapsed_time ,
233+ )
163234
164235
165236def _download_files_worker (process_idx , filename , params , bucket_type ):
@@ -175,7 +246,7 @@ def download_files_mp_mc_wrapper(pool, files_names, params, bucket_type):
175246 args = [(i , files_names [i ], params , bucket_type ) for i in range (len (files_names ))]
176247
177248 results = pool .starmap (_download_files_worker , args )
178- return sum (results )
249+ return _aggregate_download_results (results )
179250
180251
181252@pytest .mark .parametrize (
@@ -198,9 +269,14 @@ def test_downloads_multi_proc_multi_coro(
198269 )
199270
200271 download_bytes_list = []
272+ download_elapsed_times = []
201273
202274 def target_wrapper (* args , ** kwargs ):
203- download_bytes_list .append (download_files_mp_mc_wrapper (pool , * args , ** kwargs ))
275+ total_bytes , measured_elapsed_time = download_files_mp_mc_wrapper (
276+ pool , * args , ** kwargs
277+ )
278+ download_bytes_list .append (total_bytes )
279+ download_elapsed_times .append (measured_elapsed_time )
204280 return
205281
206282 try :
@@ -214,10 +290,9 @@ def target_wrapper(*args, **kwargs):
214290 finally :
215291 pool .close ()
216292 pool .join ()
217- total_bytes_downloaded = sum (download_bytes_list )
218- throughput_mib_s = (
219- total_bytes_downloaded / params .duration / params .rounds
220- ) / (1024 * 1024 )
293+ throughput_mib_s = _calculate_average_throughput_mib_s (
294+ download_bytes_list , download_elapsed_times
295+ )
221296 benchmark .extra_info ["avg_throughput_mib_s" ] = f"{ throughput_mib_s :.2f} "
222297 print (
223298 f"Avg Throughput of { params .rounds } round(s): { throughput_mib_s :.2f} MiB/s"
@@ -226,6 +301,6 @@ def target_wrapper(*args, **kwargs):
226301 benchmark ,
227302 params ,
228303 download_bytes_list = download_bytes_list ,
229- duration = params . duration ,
304+ duration = download_elapsed_times ,
230305 )
231306 publish_resource_metrics (benchmark , m )
0 commit comments