88import requests .exceptions
99import tqdm
1010import sys
11- from concurrent .futures import ThreadPoolExecutor
12- import concurrent .futures
1311from fedml .api .modules .utils import authenticate
1412from fedml .core .distributed .communication .s3 .remote_storage import S3Storage
1513from fedml .core .mlops .mlops_configs import Configs , MLOpsConfigs
@@ -31,7 +29,7 @@ def __init__(self, data: dict):
3129
3230# Todo (alaydshah): Store service name in metadata
3331# Todo (alaydshah): If data already exists, don't upload again. Instead suggest to use update command
34-
32+ # Todo (bhargav) : Discuss and remove the service variable. Maybe needed sometime later.
3533def upload (data_path , api_key , name , description , tag_list , service , show_progress , out_progress_to_err , progress_desc ,
3634 metadata ) -> FedMLResponse :
3735 api_key = authenticate (api_key )
@@ -118,8 +116,7 @@ def download(data_name, api_key, service, dest_path, show_progress=True) -> FedM
118116 return FedMLResponse (code = ResponseCode .FAILURE , message = error_message )
119117
120118 else :
121- error_message = "Unable to get the download URL"
122- logging .error (error_message )
119+ error_message = metadata_response .message
123120 return FedMLResponse (code = ResponseCode .FAILURE , message = error_message )
124121
125122
@@ -288,6 +285,7 @@ def _process_post_response(response):
288285
289286 return data_url , "Successfully uploaded the data! "
290287
288+
291289def _complete_multipart_upload (api_key , file_key , part_info , upload_id ):
292290 complete_multipart_url = ServerConstants .get_complete_multipart_upload_url ()
293291 body_dict = {"fileKey" : file_key , 'partETags' : part_info , 'uploadId' : upload_id }
@@ -310,6 +308,7 @@ def _complete_multipart_upload(api_key, file_key, part_info, upload_id):
310308
311309 return _process_post_response (complete_multipart_response )
312310
311+
313312def _upload_multipart (api_key : str , file_key , archive_path , show_progress , out_progress_to_err ,
314313 progress_desc_text , metadata ):
315314 request_url = ServerConstants .get_presigned_multi_part_url ()
@@ -349,7 +348,7 @@ def _upload_multipart(api_key: str, file_key, archive_path, show_progress, out_p
349348 part_info = []
350349 chunk_count = 0
351350 successful_chunks = 0
352-
351+ #TODO: (bhargav191098) Using Thread pool and confirming openssl issue
353352 atomic_session = requests .session ()
354353 atomic_session .verify = MLOpsConfigs .get_cert_path_with_version ()
355354 with tqdm .tqdm (total = file_size , unit = "B" , unit_scale = True ,
@@ -358,7 +357,6 @@ def _upload_multipart(api_key: str, file_key, archive_path, show_progress, out_p
358357 for part , chunk in enumerate (chunks , start = 1 ):
359358 presigned_url = presigned_urls [part - 1 ]
360359 chunk_count += 1
361- # Upload chunk to presigned_url in a separate thread from the thread pool of 10 workers.
362360 if show_progress :
363361 try :
364362 part_data = _upload_chunk (presigned_url = presigned_url , chunk = chunk , part = part ,
0 commit comments