Skip to content

Commit 5214078

Browse files
authored
Merge pull request #2154 from FedML-AI/bhargav191098/storage_presigned_urls
Moving from storage object to presigned URLs
2 parents e667ded + aa62a94 commit 5214078

2 files changed

Lines changed: 242 additions & 22 deletions

File tree

python/fedml/api/modules/storage.py

Lines changed: 228 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@
33
import shutil
44

55
import requests
6+
import math
7+
8+
import requests.exceptions
9+
import tqdm
10+
import sys
611
from fedml.api.modules.utils import authenticate
712
from fedml.core.distributed.communication.s3.remote_storage import S3Storage
813
from fedml.core.mlops.mlops_configs import Configs, MLOpsConfigs
@@ -19,11 +24,12 @@ def __init__(self, data: dict):
1924
self.updatedAt = data.get("updateTime", None)
2025
self.size = _get_size(data.get("fileSize",None))
2126
self.tag_list = data.get("tags", None)
27+
self.download_url = data.get("fileUrl", None)
2228

2329

2430
# Todo (alaydshah): Store service name in metadata
2531
# Todo (alaydshah): If data already exists, don't upload again. Instead suggest to use update command
26-
32+
# Todo (bhargav) : Discuss and remove the service variable. Maybe needed sometime later.
2733
def upload(data_path, api_key, name, description, tag_list, service, show_progress, out_progress_to_err, progress_desc,
2834
metadata) -> FedMLResponse:
2935
api_key = authenticate(api_key)
@@ -40,16 +46,16 @@ def upload(data_path, api_key, name, description, tag_list, service, show_progre
4046
if not archive_path:
4147
return FedMLResponse(code=ResponseCode.FAILURE, message=message)
4248

43-
store = _get_storage_service(service)
4449
name = os.path.splitext(os.path.basename(archive_path))[0] if name is None else name
4550
file_name = name + ".zip"
4651
dest_path = os.path.join(user_id, file_name)
4752
file_size = os.path.getsize(archive_path)
4853

49-
file_uploaded_url = store.upload_file_with_progress(src_local_path=archive_path, dest_s3_path=dest_path,
50-
show_progress=show_progress,
51-
out_progress_to_err=out_progress_to_err,
52-
progress_desc=progress_desc, metadata=metadata)
54+
file_uploaded_url, message = _upload_multipart(api_key, file_name, archive_path, show_progress,
55+
out_progress_to_err,
56+
progress_desc, metadata)
57+
58+
5359
os.remove(archive_path)
5460
if not file_uploaded_url:
5561
return FedMLResponse(code=ResponseCode.FAILURE, message=f"Failed to upload file: {archive_path}")
@@ -81,25 +87,36 @@ def download(data_name, api_key, service, dest_path, show_progress=True) -> FedM
8187
if user_id is None:
8288
return FedMLResponse(code=ResponseCode.FAILURE, message=message)
8389

84-
store = _get_storage_service(service)
85-
zip_file_name = data_name + ".zip"
86-
key = os.path.join(user_id, zip_file_name)
87-
path_local = os.path.abspath(zip_file_name)
88-
dest_path = os.path.abspath(dest_path) if dest_path else data_name
89-
if store.download_file_with_progress(path_s3=key, path_local=path_local, show_progress=show_progress):
90-
try:
91-
shutil.unpack_archive(path_local, dest_path)
92-
os.remove(path_local)
93-
abs_dest_path = os.path.abspath(dest_path)
94-
return FedMLResponse(code=ResponseCode.SUCCESS, message=f"Successfully downloaded and unzipped data at "
95-
f"{abs_dest_path}", data=abs_dest_path)
96-
except Exception as e:
97-
error_message = f"Failed to unpack archive: {e}"
90+
metadata_response = get_metadata(data_name, api_key)
91+
if metadata_response.code == ResponseCode.SUCCESS:
92+
metadata = metadata_response.data
93+
if not metadata or not isinstance(metadata, StorageMetadata):
94+
error_message = f"Unable to get the download URL"
9895
logging.error(error_message)
9996
return FedMLResponse(code=ResponseCode.FAILURE, message=error_message)
97+
download_url = metadata.download_url
98+
zip_file_name = data_name + ".zip"
99+
path_local = os.path.abspath(zip_file_name)
100+
dest_path = os.path.abspath(dest_path) if dest_path else data_name
101+
if _download_using_presigned_url(download_url, zip_file_name, show_progress=show_progress):
102+
try:
103+
shutil.unpack_archive(path_local, dest_path)
104+
os.remove(path_local)
105+
abs_dest_path = os.path.abspath(dest_path)
106+
return FedMLResponse(code=ResponseCode.SUCCESS, message=f"Successfully downloaded and unzipped data at "
107+
f"{abs_dest_path}", data=abs_dest_path)
108+
except Exception as e:
109+
error_message = f"Failed to unpack archive: {e}"
110+
logging.error(error_message)
111+
return FedMLResponse(code=ResponseCode.FAILURE, message=error_message)
112+
113+
else:
114+
error_message = "Failed to download data from source"
115+
logging.error(error_message)
116+
return FedMLResponse(code=ResponseCode.FAILURE, message=error_message)
117+
100118
else:
101-
error_message = f"Failed to download data: {data_name}"
102-
logging.error(error_message)
119+
error_message = metadata_response.message
103120
return FedMLResponse(code=ResponseCode.FAILURE, message=error_message)
104121

105122

@@ -196,6 +213,195 @@ def delete(data_name, service, api_key=None) -> FedMLResponse:
196213
logging.error(message, data_name, service)
197214
return FedMLResponse(code=ResponseCode.FAILURE, message=message, data=False)
198215

216+
def _get_num_chunks(file_size, max_chunk_size):
217+
num_chunks = math.ceil(file_size / max_chunk_size)
218+
return num_chunks
219+
220+
221+
def get_chunks(file_path, chunk_size):
222+
with open(file_path, 'rb') as file:
223+
while True:
224+
chunk = file.read(chunk_size)
225+
if not chunk:
226+
break
227+
yield chunk
228+
229+
230+
def _get_presigned_url(api_key, request_url, file_name, part_number=None):
231+
cert_path = MLOpsConfigs.get_cert_path_with_version()
232+
headers = ServerConstants.API_HEADERS
233+
headers["Authorization"] = f"Bearer {api_key}"
234+
params_dict = {'fileKey': file_name}
235+
if part_number is not None:
236+
params_dict['partNumber'] = part_number
237+
if cert_path is None:
238+
try:
239+
requests.session().verify = cert_path
240+
response = requests.get(request_url, verify=True, headers=headers, params=params_dict)
241+
except requests.exceptions.SSLError as err:
242+
MLOpsConfigs.install_root_ca_file()
243+
response = requests.get(request_url, verify=True, headers=headers, params=params_dict)
244+
else:
245+
response = requests.get(request_url, verify=True, headers=headers, params=params_dict)
246+
return response
247+
248+
249+
def _upload_part(url,part_data,session):
250+
response = session.put(url,data=part_data,verify=True)
251+
return response
252+
253+
254+
def _upload_chunk(presigned_url, chunk, part, pbar=None, max_retries=20,session=None):
255+
for retry_attempt in range(max_retries):
256+
try:
257+
response = _upload_part(presigned_url,chunk,session)
258+
except requests.exceptions.RequestException as e:
259+
if retry_attempt < max_retries:
260+
continue
261+
else:
262+
raise requests.exceptions.RequestException
263+
264+
if(pbar is not None):
265+
pbar.update(chunk.__sizeof__())
266+
return {'etag': response.headers['ETag'], 'partNumber': part}
267+
raise requests.exceptions.RequestException
268+
269+
def _process_post_response(response):
270+
if response.status_code != 200:
271+
message = (f"Failed to complete multipart upload with status code = {response.status_code}, "
272+
f"response.content: {response.content}")
273+
logging.error(message)
274+
return None, message
275+
else:
276+
resp_data = response.json()
277+
code = resp_data.get("code", None)
278+
data_url = resp_data.get("data", None)
279+
280+
if code is None or data_url is None or code == "FAILURE":
281+
message = resp_data.get("message", None)
282+
message = (f"Failed to complete multipart upload with following message: {message}, "
283+
f"response.content: {response.content}")
284+
return None, message
285+
286+
return data_url, "Successfully uploaded the data! "
287+
288+
289+
def _complete_multipart_upload(api_key, file_key, part_info, upload_id):
290+
complete_multipart_url = ServerConstants.get_complete_multipart_upload_url()
291+
body_dict = {"fileKey": file_key, 'partETags': part_info, 'uploadId': upload_id}
292+
293+
cert_path = MLOpsConfigs.get_cert_path_with_version()
294+
headers = ServerConstants.API_HEADERS
295+
headers["Authorization"] = f"Bearer {api_key}"
296+
if cert_path is None:
297+
try:
298+
requests.session().verify = cert_path
299+
complete_multipart_response = requests.post(complete_multipart_url, json=body_dict, verify=True,
300+
headers=headers)
301+
except requests.exceptions.SSLError as err:
302+
MLOpsConfigs.install_root_ca_file()
303+
complete_multipart_response = requests.post(complete_multipart_url, json=body_dict, verify=True,
304+
headers=headers)
305+
else:
306+
complete_multipart_response = requests.post(complete_multipart_url, json=body_dict, verify=True,
307+
headers=headers)
308+
309+
return _process_post_response(complete_multipart_response)
310+
311+
312+
def _upload_multipart(api_key: str, file_key, archive_path, show_progress, out_progress_to_err,
313+
progress_desc_text, metadata):
314+
request_url = ServerConstants.get_presigned_multi_part_url()
315+
316+
file_size = os.path.getsize(archive_path)
317+
318+
max_chunk_size = 20 * 1024 * 1024
319+
320+
num_chunks = _get_num_chunks(file_size, max_chunk_size)
321+
322+
upload_id = ""
323+
presigned_urls = []
324+
325+
presigned_url_response = _get_presigned_url(api_key, request_url, file_key, num_chunks)
326+
327+
if presigned_url_response.status_code != 200:
328+
message = (f"Failed to get presigned URL with status code = {presigned_url_response.status_code}, "
329+
f"response.content: {presigned_url_response.content}")
330+
logging.error(message)
331+
return None, message
332+
else:
333+
resp_data = presigned_url_response.json()
334+
code = resp_data.get("code", None)
335+
data = resp_data.get("data", None)
336+
337+
if code is None or data is None or code == "FAILURE":
338+
message = resp_data.get("message", None)
339+
message = (f"Failed getting presigned URL with following message: {message}, "
340+
f"response.content: {presigned_url_response.content}")
341+
return None, message
342+
343+
upload_id = data['uploadId']
344+
presigned_urls = data['urls']
345+
346+
parts = []
347+
chunks = get_chunks(archive_path, max_chunk_size)
348+
part_info = []
349+
chunk_count = 0
350+
successful_chunks = 0
351+
#TODO: (bhargav191098) Using Thread pool and confirming openssl issue
352+
atomic_session = requests.session()
353+
atomic_session.verify = MLOpsConfigs.get_cert_path_with_version()
354+
with tqdm.tqdm(total=file_size, unit="B", unit_scale=True,
355+
file=sys.stderr if out_progress_to_err else sys.stdout,
356+
desc=progress_desc_text, leave=False) as pbar:
357+
for part, chunk in enumerate(chunks, start=1):
358+
presigned_url = presigned_urls[part - 1]
359+
chunk_count += 1
360+
if show_progress:
361+
try:
362+
part_data = _upload_chunk(presigned_url=presigned_url, chunk=chunk, part=part,
363+
pbar=pbar,session=atomic_session)
364+
part_info.append(part_data)
365+
successful_chunks += 1
366+
except Exception as e:
367+
return None, "unsuccessful"
368+
369+
else:
370+
try:
371+
part_data = _upload_chunk(presigned_url=presigned_url, chunk=chunk, part=part,
372+
pbar=pbar,session=atomic_session)
373+
part_info.append(part_data)
374+
successful_chunks += 1
375+
except Exception as e:
376+
return None, "unsuccessful"
377+
378+
if successful_chunks == chunk_count:
379+
return _complete_multipart_upload(api_key, file_key, part_info, upload_id)
380+
else:
381+
return None, "Unsuccessful!"
382+
383+
384+
def _download_using_presigned_url(url, fname, chunk_size=1024 * 1024, show_progress=True):
385+
download_response = requests.get(url, verify=True, stream=True)
386+
if download_response.status_code == 200:
387+
total = int(download_response.headers.get('content-length', 0))
388+
if show_progress:
389+
with open(fname, 'wb') as file, tqdm.tqdm(
390+
desc=fname,
391+
total=total,
392+
unit='B',
393+
unit_scale=True,
394+
unit_divisor=1024,
395+
) as bar:
396+
for data in download_response.iter_content(chunk_size=chunk_size):
397+
size = file.write(data)
398+
bar.update(size)
399+
else:
400+
with open(fname, "wb") as file:
401+
for data in download_response.iter_content(chunk_size=chunk_size):
402+
size = file.write(data)
403+
return True
404+
return False
199405

200406
def _get_user_id_from_api_key(api_key: str) -> (str, str):
201407
user_url = ServerConstants.get_user_url()

python/fedml/computing/scheduler/master/server_constants.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,20 @@ def get_dataset_url():
255255
ServerConstants.get_mlops_url())
256256
return create_dataset_url
257257

258+
@staticmethod
259+
def get_presigned_multi_part_url():
260+
get_presigned_multi_part_url = "{}/system/api/v1/cli/oss/multipart/presigned-url".format(
261+
ServerConstants.get_mlops_url()
262+
)
263+
return get_presigned_multi_part_url
264+
265+
@staticmethod
266+
def get_complete_multipart_upload_url():
267+
complete_multipart_upload_url = "{}/system/api/v1/cli/oss/multipart/upload/complete".format(
268+
ServerConstants.get_mlops_url()
269+
)
270+
return complete_multipart_upload_url
271+
258272
@staticmethod
259273
def list_dataset_url():
260274
list_dataset_url = "{}/fedmlOpsServer/api/v1/cli/dataset/list".format(

0 commit comments

Comments
 (0)