Skip to content

Commit 7625075

Browse files
committed
[Deploy] Avoid re-download the same model serving package.
1 parent b58720c commit 7625075

1 file changed

Lines changed: 53 additions & 63 deletions

File tree

python/fedml/computing/scheduler/model_scheduler/worker_job_runner.py

Lines changed: 53 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -146,10 +146,10 @@ def run_impl(self, run_extend_queue_list, sender_message_center,
146146
logging.info(f"[Worker] Received model deployment request from master for endpoint {run_id}.")
147147
self.replica_handler = FedMLDeviceReplicaHandler(self.edge_id, self.request_json)
148148
if self.replica_handler is not None:
149-
logging.info(f"=================Worker replica Handler ======================"
150-
f"Reconcile with num diff {self.replica_handler.replica_num_diff} "
151-
f"and version diff {self.replica_handler.replica_version_diff}."
152-
f"=============================================================")
149+
logging.info("\n================= Worker replica Handler ======================\n"
150+
f"Reconcile with num diff {self.replica_handler.replica_num_diff}\n"
151+
f"and version diff {self.replica_handler.replica_version_diff}\n"
152+
"===============================================================\n")
153153
else:
154154
logging.error(f"[Worker] Replica handler is None.")
155155
return False
@@ -178,39 +178,13 @@ def run_impl(self, run_extend_queue_list, sender_message_center,
178178
logging.info("[Worker] No need to reconcile.")
179179
return True
180180

181-
logging.info(
182-
f"================Worker Reconcile Operations ======================\n"
183-
f" op: {op}; op num: {op_num}.\n"
184-
f"==================================================================\n")
185-
186-
# If not rollback, download package from MLOps; otherwise, use the backup package
187-
if op != "rollback":
188-
logging.info("Download and unzip model to local...")
189-
unzip_package_path, _, _ = \
190-
self.update_local_fedml_config(run_id, model_config, model_config_parameters)
191-
if unzip_package_path is None:
192-
logging.info("Failed to update local fedml config.")
193-
self.check_runner_stop_event()
194-
self.status_reporter.report_client_id_status(
195-
self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED,
196-
is_from_model=True, run_id=run_id)
197-
return False
198-
199-
if not os.path.exists(unzip_package_path):
200-
logging.info("Failed to unzip file.")
201-
self.check_runner_stop_event()
202-
self.status_reporter.report_client_id_status(
203-
self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED,
204-
is_from_model=True, run_id=run_id)
205-
return False
206-
else:
207-
logging.info("Try to use backup package to rollback...")
208-
# Find folder under "~/.fedml/fedml-model-client/fedml/model_packages \
209-
# /${end_point_id}_${end_point_name}_${model_name}_${model_version}"
210-
backup_folder_full_path = None
211-
models_root_dir = ClientConstants.get_model_package_dir()
181+
logging.info("\n================ Worker Reconcile Operations ======================\n"
182+
f" op: {op}; op num: {op_num}.\n"
183+
"===================================================================\n")
212184

185+
if op == "rollback":
213186
# Find the version (notified by master) to rollback
187+
logging.info("Try to use backup package to rollback...")
214188
version_diff_dict = self.request_json["replica_version_diff"][str(self.edge_id)]
215189
version_rollback_to = None
216190
for replica_no, rollback_ops in version_diff_dict.items():
@@ -222,39 +196,38 @@ def run_impl(self, run_extend_queue_list, sender_message_center,
222196
return False
223197
model_version = version_rollback_to
224198

225-
# Format the version to match the folder name
226-
model_version_formatted = version_rollback_to.replace(" ", "-")
227-
model_version_formatted = model_version_formatted.replace(":", "-")
228-
229-
last_run_folder_sub_fd = f"{run_id}_{end_point_name}_{model_name}_{model_version_formatted}"
230-
for folder in os.listdir(models_root_dir):
231-
if last_run_folder_sub_fd in folder:
232-
backup_folder_full_path = os.path.join(models_root_dir, folder)
233-
break
234-
if backup_folder_full_path is None:
235-
logging.error(f"No backup folder found for run_id: {self.run_id} edge_id: {self.edge_id} "
236-
f"under {models_root_dir} with sub folder {last_run_folder_sub_fd}, rollback failed.")
237-
return False
199+
# Construct the parent folder name for the package
200+
model_version_formatted = model_version.replace(" ", "-")
201+
model_version_formatted = model_version_formatted.replace(":", "-")
202+
models_root_dir = ClientConstants.get_model_package_dir()
203+
parent_fd = f"{run_id}_{end_point_name}_{model_name}_{model_version_formatted}"
238204

239-
# Inside backup folder, find unzipped package with prefix unzip_fedml_run
240-
unzip_package_path_parent = None
241-
for folder in os.listdir(backup_folder_full_path):
242-
if folder.startswith("unzip_fedml_run"):
243-
unzip_package_path_parent = os.path.join(backup_folder_full_path, folder)
244-
break
245-
246-
# Inside unzip folder, find the unzipped package, should be the only one
247-
unzip_package_path = None
248-
for folder in os.listdir(unzip_package_path_parent):
249-
if os.path.isdir(os.path.join(unzip_package_path_parent, folder)):
250-
unzip_package_path = os.path.join(unzip_package_path_parent, folder)
251-
break
205+
# Check if the package is already downloaded
206+
unzip_package_path = ""
207+
if os.path.exists(os.path.join(models_root_dir, parent_fd)):
208+
unzip_package_path = self.find_previous_downloaded_pkg(os.path.join(models_root_dir, parent_fd))
252209

210+
# Download the package if not found
211+
if unzip_package_path == "":
212+
logging.info("Download and unzip model to local...")
213+
unzip_package_path, _, _ = \
214+
self.update_local_fedml_config(run_id, model_config, model_config_parameters)
253215
if unzip_package_path is None:
254-
logging.error(f"No unzipped package found for run_id: {self.run_id} edge_id: {self.edge_id} "
255-
f"under {backup_folder_full_path}, rollback failed.")
216+
logging.info("Failed to update local fedml config.")
217+
self.check_runner_stop_event()
218+
self.status_reporter.report_client_id_status(
219+
self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED,
220+
is_from_model=True, run_id=run_id)
256221
return False
257222

223+
if not os.path.exists(unzip_package_path):
224+
logging.info("Failed to unzip file.")
225+
self.check_runner_stop_event()
226+
self.status_reporter.report_client_id_status(
227+
self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED,
228+
is_from_model=True, run_id=run_id)
229+
return False
230+
258231
self.check_runner_stop_event()
259232

260233
running_model_name, inference_output_url, inference_model_version, model_metadata, model_config = \
@@ -535,3 +508,20 @@ def build_dynamic_args(self, run_id, run_config, package_conf_object, base_dir):
535508
# Override
536509
def build_dynamic_constrain_variables(self, run_id, run_config):
537510
pass
511+
512+
@staticmethod
513+
def find_previous_downloaded_pkg(parent_dir) -> str:
514+
unzip_fd = ""
515+
res = ""
516+
517+
for folder in os.listdir(parent_dir):
518+
if folder.startswith("unzip_fedml_run"):
519+
unzip_fd = os.path.join(parent_dir, folder)
520+
break
521+
522+
for folder in os.listdir(unzip_fd):
523+
if os.path.isdir(os.path.join(unzip_fd, folder)):
524+
res = os.path.join(unzip_fd, folder)
525+
break
526+
527+
return res

0 commit comments

Comments
 (0)