Skip to content

Commit 3fbcc2c

Browse files
authored
Merge pull request #2126 from FedML-AI/alexleung/dev_v070_for_refactor
Alexleung/dev v070 for refactor
2 parents bfecbd9 + 84d6156 commit 3fbcc2c

12 files changed

Lines changed: 113 additions & 100 deletions

python/fedml/computing/scheduler/master/base_master_job_runner.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,10 @@ def run_server_job_impl(self, process_event, completed_event,
285285
self.args.run_id = self.run_id
286286
MLOpsRuntimeLog.get_instance(self.args).init_logs(log_level=logging.INFO)
287287

288+
self.status_reporter.report_server_id_status(
289+
run_id, ServerConstants.MSG_MLOPS_SERVER_STATUS_RUNNING, edge_id=self.edge_id,
290+
server_id=self.edge_id, server_agent_id=self.edge_id)
291+
288292
# get training params
289293
private_local_data_dir = data_config.get("privateLocalData", "")
290294
is_using_local_data = 0
@@ -562,7 +566,7 @@ def detect_edges_status(
562566
return True, active_edge_info_dict, inactivate_edges
563567

564568
def report_exception_status(self, run_id):
565-
self.status_reporter.report_job_status(run_id, ServerConstants.MSG_MLOPS_SERVER_STATUS_EXCEPTION)
569+
self.mlops_metrics.report_job_status(run_id, ServerConstants.MSG_MLOPS_SERVER_STATUS_EXCEPTION)
566570

567571
def callback_run_logs(self, topic, payload):
568572
run_id = str(topic).split('/')[-1]
@@ -618,12 +622,6 @@ def send_training_request_to_edges(self, request_json, active_edge_info_dict=Non
618622
f"request GPU count {request_num_gpus}"
619623
logging.error(err_info)
620624

621-
# Bug fix: This mqtt message needs to be sent so platform can clean up the failed run and change the
622-
# status from running to failed.
623-
self.mlops_metrics.report_server_training_status(
624-
run_id, GeneralConstants.MSG_MLOPS_SERVER_STATUS_FAILED, edge_id=self.edge_id
625-
)
626-
627625
self.status_reporter.report_server_id_status(
628626
run_id, GeneralConstants.MSG_MLOPS_SERVER_STATUS_FAILED, edge_id=self.edge_id,
629627
server_id=self.edge_id, server_agent_id=self.server_agent_id)

python/fedml/computing/scheduler/master/base_master_protocol_manager.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -267,19 +267,26 @@ def callback_stop_train(self, topic, payload, use_payload=None):
267267
server_id = request_json.get("serverId", None)
268268
if server_id is None:
269269
server_id = request_json.get("server_id", None)
270+
edge_ids = request_json.get("edgeids", None)
270271

271-
# Broadcast the job status to all edges
272-
self.rebuild_status_center(self.get_status_queue())
273-
self.status_reporter.report_job_status(run_id, GeneralConstants.MSG_MLOPS_SERVER_STATUS_KILLED)
272+
# Stop the job runner
273+
self._get_job_runner_manager().stop_job_runner(
274+
run_id, args=self.args, server_id=server_id, request_json=request_json,
275+
run_as_cloud_agent=self.run_as_cloud_agent)
274276

275277
# Cleanup the cached object
276278
if self.running_request_json.get(run_id_str, None) is not None:
277279
self.running_request_json.pop(run_id_str)
278280

279-
# Stop the job runner
280-
self._get_job_runner_manager().stop_job_runner(
281-
run_id, args=self.args, server_id=server_id, request_json=request_json,
282-
run_as_cloud_agent=self.run_as_cloud_agent)
281+
# Reset all edge status and server status
282+
for iter_edge_id in edge_ids:
283+
self.generate_status_report(run_id, iter_edge_id, server_agent_id=server_id).\
284+
report_client_id_status(iter_edge_id, GeneralConstants.MSG_MLOPS_SERVER_STATUS_KILLED,
285+
run_id=run_id, server_id=server_id)
286+
287+
# To be compatible to the previous version of edge devices, we just send the stopping train message to edges.
288+
# Currently, the latest version of edge devices don't need to process the stopping train message.
289+
self.send_training_stop_request_to_edges(edge_ids, payload=payload, run_id=run_id)
283290

284291
def callback_complete_job(self, topic, payload):
285292
# Parse the parameters.
@@ -505,13 +512,12 @@ def send_training_stop_request_to_edges(
505512
self, edge_id_list, payload=None, run_id=0):
506513
if payload is None:
507514
payload_obj = {"runId": run_id, "edgeids": edge_id_list}
508-
else:
509-
payload_obj = json.loads(payload)
515+
payload = json.dumps(payload_obj)
510516

511517
for edge_id in edge_id_list:
512518
topic_stop_train = "flserver_agent/" + str(edge_id) + "/stop_train"
513519
logging.info("stop_train: send topic " + topic_stop_train)
514-
self.message_center.send_message(topic_stop_train, json.dumps(payload_obj))
520+
self.message_center.send_message(topic_stop_train, payload)
515521

516522
def send_training_stop_request_to_specific_edge(self, edge_id, payload):
517523
topic_stop_train = "flserver_agent/" + str(edge_id) + "/stop_train"
@@ -536,7 +542,7 @@ def send_status_msg_to_edges(self, edge_id_list, run_id, server_id, context=None
536542
self.send_status_check_msg(run_id, edge_id, self.edge_id, context=context)
537543

538544
def report_exception_status(self, run_id):
539-
self.status_reporter.report_job_status(run_id, GeneralConstants.MSG_MLOPS_SERVER_STATUS_EXCEPTION)
545+
self.mlops_metrics.report_job_status(run_id, GeneralConstants.MSG_MLOPS_SERVER_STATUS_EXCEPTION)
540546

541547
@staticmethod
542548
def get_start_train_topic_with_edge_id(edge_id):

python/fedml/computing/scheduler/master/deploy_job_launcher.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ def deploy_model(serving_devices, request_json, run_id):
4040
"", random_list[1], None,
4141
in_model_id=model_id, in_model_version=model_version,
4242
endpoint_name=endpoint_name, endpoint_id=endpoint_id, run_id=run_id)
43+
return endpoint_id
44+
return None
4345

4446
def check_model_device_ready_and_deploy(self, request_json, run_id, master_device_id,
4547
slave_device_id, run_edge_ids=None):
@@ -88,3 +90,6 @@ def check_model_device_ready_and_deploy(self, request_json, run_id, master_devic
8890

8991
# Start to deploy the model
9092
FedMLDeployJobLauncher.deploy_model(serving_devices, request_json, run_id=run_id)
93+
94+
95+

python/fedml/computing/scheduler/model_scheduler/master_job_runner.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ def __init__(self, args, run_id=0, request_json=None, agent_config=None, edge_id
5151
self.deployed_replica_payload = None
5252
self.slave_deployment_results_map = dict()
5353
self.deployment_result_queue = Queue()
54+
self.is_fresh_endpoint = True
5455

5556
# Override
5657
def _generate_job_runner_instance(self, args, run_id=None, request_json=None, agent_config=None, edge_id=None, ):
@@ -75,6 +76,7 @@ def run_impl(
7576
inference_end_point_id, use_gpu, memory_size, model_version, inference_port = \
7677
FedMLDeployMasterJobRunner.parse_model_run_params(self.request_json)
7778
self.run_id = run_id
79+
self.is_fresh_endpoint = self.request_json.get("is_fresh_endpoint", True)
7880

7981
# Print request parameters.
8082
logging.info("model deployment request: {}".format(self.request_json))
@@ -246,7 +248,7 @@ def process_deployment_result_message(self, topic=None, payload=None):
246248
f"{self.request_json}")
247249
return
248250

249-
logging.info(f"End point {end_point_id}; Device {device_id}; replica {replica_no}; "
251+
logging.info(f"Endpoint {end_point_id}; Device {device_id}; replica {replica_no}; "
250252
f"run_operation {run_operation} model status {model_status}.")
251253

252254
# OPTIONAL DEBUG PARAMS
@@ -280,7 +282,7 @@ def process_deployment_result_message(self, topic=None, payload=None):
280282
logging.error(f"Unsupported model status {model_status}.")
281283

282284
# Avoid endless loop, if the rollback also failed, we should report the failure to the MLOps
283-
if self.replica_controller.under_rollback:
285+
if self.replica_controller.under_rollback or self.is_fresh_endpoint:
284286
self.send_deployment_status(
285287
end_point_id, end_point_name, payload_json["model_name"], "",
286288
ServerConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_FAILED,

python/fedml/computing/scheduler/model_scheduler/master_protocol_manager.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from .master_job_runner_manager import FedMLDeployJobRunnerManager
1212
from ..scheduler_core.general_constants import GeneralConstants
1313
from ..scheduler_core.endpoint_sync_protocol import FedMLEndpointSyncProtocol
14+
from ..scheduler_core.compute_cache_manager import ComputeCacheManager
1415

1516

1617
class FedMLDeployMasterProtocolManager(FedMLBaseMasterProtocolManager):
@@ -108,6 +109,10 @@ def callback_delete_deployment(self, topic, payload):
108109
# Parse payload as the model message object.
109110
model_msg_object = FedMLModelMsgObject(topic, payload)
110111

112+
# Get the launch job id
113+
ComputeCacheManager.get_instance().set_redis_params(self.redis_addr, self.redis_port, self.redis_password)
114+
launch_job_id = ComputeCacheManager.get_instance().get_gpu_cache().get_endpoint_run_id_map(model_msg_object.run_id)
115+
111116
# Delete SQLite records
112117
FedMLServerDataInterface.get_instance().delete_job_from_db(model_msg_object.run_id)
113118
FedMLModelDatabase.get_instance().delete_deployment_result(
@@ -135,6 +140,12 @@ def callback_delete_deployment(self, topic, payload):
135140
model_msg_object.run_id, model_msg_object.end_point_name, model_msg_object.model_id,
136141
model_msg_object.model_name, model_msg_object.model_version)
137142

143+
# Report the launch job status with killed status.
144+
if launch_job_id is not None:
145+
self.generate_status_report(model_msg_object.run_id, self.edge_id, server_agent_id=self.edge_id).\
146+
report_server_id_status(launch_job_id, GeneralConstants.MSG_MLOPS_SERVER_STATUS_KILLED,
147+
server_id=self.edge_id, server_agent_id=self.edge_id)
148+
138149
def callback_start_deployment(self, topic, payload):
139150
# noinspection PyBroadException
140151
try:
@@ -174,6 +185,11 @@ def callback_start_deployment(self, topic, payload):
174185
# Set redis config
175186
FedMLModelCache.get_instance().set_redis_params(self.redis_addr, self.redis_port, self.redis_password)
176187

188+
# Query if the endpoint exists
189+
endpoint_device_info = FedMLModelCache.get_instance(self.redis_addr, self.redis_port).get_end_point_device_info(
190+
request_json["end_point_id"])
191+
request_json["is_fresh_endpoint"] = True if endpoint_device_info is None else False
192+
177193
# Save the user setting (about replica number) of this run to Redis, if existed, update it
178194
FedMLModelCache.get_instance(self.redis_addr, self.redis_port).set_user_setting_replica_num(
179195
end_point_id=run_id, end_point_name=end_point_name, model_name=model_name, model_version=model_version,

python/fedml/computing/scheduler/scheduler_core/message_common.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ def __init__(self, topic=None, payload=None, status_msg_body: dict = None):
4949
self.payload = payload
5050
self.run_id = None
5151
self.edge_id = None
52+
self.server_id = None
5253
self.status = None
5354
if status_msg_body is not None:
5455
self.from_message_body(status_msg_body=status_msg_body)
@@ -61,6 +62,7 @@ def from_message_body(self, status_msg_body: dict = None):
6162
self.run_id = payload_json.get("run_id", None)
6263
self.run_id = payload_json.get("runId", None) if self.run_id is None else self.run_id
6364
self.edge_id = payload_json.get("edge_id", None)
65+
self.server_id = payload_json.get("server_id", None)
6466
self.status = payload_json.get("status", None)
6567

6668
def get_message_body(self):

python/fedml/computing/scheduler/scheduler_core/scheduler_base_job_runner_manager.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ def stop_job_runner(self, run_id):
3939
if self.job_runners.get(run_id_str, None) is not None:
4040
self.job_runners[run_id_str].trigger_stop_event()
4141

42+
def stop_all_job_runner(self):
43+
for run_id, job_runner in self.job_runners.items():
44+
job_runner.trigger_stop_event()
45+
4246
def complete_job_runner(self, run_id):
4347
run_id_str = str(run_id)
4448
if self.job_runners.get(run_id_str, None) is not None:

python/fedml/computing/scheduler/scheduler_core/status_center.py

Lines changed: 40 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ class FedMLStatusCenter(object):
8181
TOPIC_SLAVE_JOB_LAUNCH_SUFFIX = "/start_train"
8282
TOPIC_SLAVE_JOB_STOP_PREFIX = "flserver_agent/"
8383
TOPIC_SLAVE_JOB_STOP_SUFFIX = "/stop_train"
84+
ALLOWED_MAX_JOB_STATUS_CACHE_NUM = 1000
8485

8586
def __init__(self, message_queue=None):
8687
self.status_queue = message_queue
@@ -203,38 +204,43 @@ def run_status_dispatcher(self, status_event, status_queue,
203204
status_entity = FedMLStatusEntity(status_msg_body=message_body)
204205

205206
# Generate status manager instance
206-
if status_manager_instances.get(status_entity.run_id) is None:
207-
status_manager_instances[status_entity.run_id] = FedMLStatusManager(
208-
run_id=status_entity.run_id, edge_id=status_entity.edge_id, status_center=self,
207+
run_id_str = str(status_entity.run_id)
208+
run_id_int = int(status_entity.run_id)
209+
if status_manager_instances.get(run_id_str) is None:
210+
if len(status_manager_instances.keys()) >= FedMLStatusCenter.ALLOWED_MAX_JOB_STATUS_CACHE_NUM:
211+
for iter_run_id, iter_status_mgr in status_manager_instances.items():
212+
if iter_status_mgr.is_job_completed():
213+
status_manager_instances.pop(iter_run_id)
214+
break
215+
status_manager_instances[run_id_str] = FedMLStatusManager(
216+
run_id=run_id_int, edge_id=status_entity.edge_id,
217+
server_id=status_entity.server_id, status_center=self,
209218
message_center=message_center)
210219
else:
211-
status_manager_instances[status_entity.run_id].edge_id = status_entity.edge_id
220+
status_manager_instances[run_id_str].edge_id = status_entity.edge_id
221+
if status_entity.server_id is not None and str(status_entity.server_id) != "0":
222+
status_manager_instances[run_id_str].server_id = status_entity.server_id
212223

213224
# if the job status is completed then continue
214-
if status_manager_instances[status_entity.run_id].is_job_completed():
225+
if status_manager_instances[run_id_str].is_job_completed():
215226
continue
216227

217228
# Process the master and slave status.
218229
if message_entity.topic.startswith(FedMLStatusCenter.TOPIC_MASTER_STATUS_PREFIX):
219230
# Process the job status
220-
status_manager_instances[status_entity.run_id].status_center_process_master_status(
231+
status_manager_instances[run_id_str].status_center_process_master_status(
221232
message_entity.topic, message_entity.payload)
222233

223234
# Save the job status
224-
status_manager_instances[status_entity.run_id].save_job_status()
225-
226-
# Popup the status manager instance when the job status is completed
227-
if status_manager_instances[status_entity.run_id].is_job_completed():
228-
status_manager_instances.pop(status_entity.run_id)
229-
continue
235+
status_manager_instances[run_id_str].save_job_status()
230236

231237
elif message_entity.topic.startswith(FedMLStatusCenter.TOPIC_SLAVE_STATUS_PREFIX):
232238
# Process the slave device status
233-
status_manager_instances[status_entity.run_id].status_center_process_slave_status(
239+
status_manager_instances[run_id_str].status_center_process_slave_status(
234240
message_entity.topic, message_entity.payload)
235241

236242
# Save the device status in job
237-
status_manager_instances[status_entity.run_id].save_device_status_in_job(status_entity.edge_id)
243+
status_manager_instances[run_id_str].save_device_status_in_job(status_entity.edge_id)
238244

239245
except Exception as e:
240246
if message_entity is not None:
@@ -295,40 +301,49 @@ def run_status_dispatcher_in_slave(self, status_event, status_queue,
295301
status_entity = FedMLStatusEntity(status_msg_body=message_body)
296302

297303
# Generate status manager instance
298-
if status_manager_instances.get(status_entity.run_id) is None:
299-
status_manager_instances[status_entity.run_id] = FedMLStatusManager(
300-
run_id=status_entity.run_id, edge_id=status_entity.edge_id, status_center=self,
304+
run_id_str = str(status_entity.run_id)
305+
run_id_int = int(status_entity.run_id)
306+
if status_manager_instances.get(run_id_str) is None:
307+
if len(status_manager_instances.keys()) >= FedMLStatusCenter.ALLOWED_MAX_JOB_STATUS_CACHE_NUM:
308+
for iter_run_id, iter_status_mgr in status_manager_instances.items():
309+
if iter_status_mgr.is_job_completed():
310+
status_manager_instances.pop(iter_run_id)
311+
break
312+
313+
status_manager_instances[run_id_str] = FedMLStatusManager(
314+
run_id=run_id_int, edge_id=status_entity.edge_id, status_center=self,
301315
message_center=message_center)
302316
else:
303-
status_manager_instances[status_entity.run_id].edge_id = status_entity.edge_id
317+
status_manager_instances[run_id_str].edge_id = status_entity.edge_id
304318

305319
# Process the slave status
306320
if message_entity.topic.startswith(FedMLStatusCenter.TOPIC_SLAVE_STATUS_PREFIX):
307321
# Report the slave status to master
308-
status_manager_instances[status_entity.run_id]. \
322+
status_manager_instances[run_id_str]. \
309323
status_center_process_slave_status_to_master_in_slave_agent(
310324
message_entity.topic, message_entity.payload
311325
)
312326
elif message_entity.topic.startswith(FedMLStatusCenter.TOPIC_SLAVE_STATUS_TO_MLOPS_PREFIX):
313327
# Report slave status to mlops (Active/IDLE message)
314-
status_manager_instances[status_entity.run_id]. \
328+
status_manager_instances[run_id_str]. \
315329
status_center_process_slave_status_to_mlops_in_slave_agent(
316330
message_entity.topic, message_entity.payload
317331
)
318332
elif (message_entity.topic.startswith(FedMLStatusCenter.TOPIC_SLAVE_JOB_LAUNCH_PREFIX) and
319333
message_entity.topic.endswith(FedMLStatusCenter.TOPIC_SLAVE_JOB_LAUNCH_SUFFIX)):
334+
pass
320335
# Async request the job status from master when launching the job
321-
job_launch_message_map[status_entity.run_id] = {"topic": message_entity.topic,
322-
"payload": message_entity.payload}
323-
# status_manager_instances[status_entity.run_id]. \
336+
# job_launch_message_map[run_id_str] = {"topic": message_entity.topic,
337+
# "payload": message_entity.payload}
338+
# status_manager_instances[run_id_str]. \
324339
# status_center_request_job_status_from_master_in_slave_agent(
325340
# message_entity.topic, message_entity.payload
326341
# )
327342
elif (message_entity.topic.startswith(FedMLStatusCenter.TOPIC_SLAVE_JOB_STOP_PREFIX) and
328343
message_entity.topic.endswith(FedMLStatusCenter.TOPIC_SLAVE_JOB_STOP_SUFFIX)):
329344
# Cleanup when stopped the job
330-
if job_launch_message_map.get(status_entity.run_id, None) is not None:
331-
job_launch_message_map.pop(status_entity.run_id)
345+
if job_launch_message_map.get(run_id_str, None) is not None:
346+
job_launch_message_map.pop(run_id_str)
332347

333348
except Exception as e:
334349
if message_entity is not None:

0 commit comments

Comments
 (0)