Skip to content

Commit 41b76eb

Browse files
authored
Merge pull request #2104 from FedML-AI/alexleung/dev_branch_latest
Alexleung/dev branch latest
2 parents 5d8bbc5 + 5254793 commit 41b76eb

6 files changed

Lines changed: 27 additions & 42 deletions

File tree

python/fedml/computing/scheduler/master/base_master_protocol_manager.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def __init__(self, args, agent_config=None):
3333
self.agent_config = agent_config
3434
self.topic_start_train = None
3535
self.topic_stop_train = None
36-
self.topic_exit_train = None
36+
self.topic_complete_job = None
3737
self.topic_report_status = None
3838
self.topic_ota_msg = None
3939
self.topic_response_device_info = None
@@ -62,8 +62,8 @@ def generate_topics(self):
6262
# The topi for stopping training
6363
self.topic_stop_train = "mlops/flserver_agent_" + str(self.edge_id) + "/stop_train"
6464

65-
# The topi for exiting training
66-
self.topic_exit_train = GeneralConstants.get_topic_exit_train(self.edge_id)
65+
# The topic for completing job
66+
self.topic_complete_job = GeneralConstants.get_topic_complete_job(self.edge_id)
6767

6868
# The topic for reporting current device status.
6969
self.topic_report_status = "mlops/report_device_status"
@@ -93,7 +93,7 @@ def generate_topics(self):
9393
self.subscribed_topics.clear()
9494
self.add_subscribe_topic(self.topic_start_train)
9595
self.add_subscribe_topic(self.topic_stop_train)
96-
self.add_subscribe_topic(self.topic_exit_train)
96+
self.add_subscribe_topic(self.topic_complete_job)
9797
self.add_subscribe_topic(self.topic_report_status)
9898
self.add_subscribe_topic(self.topic_ota_msg)
9999
self.add_subscribe_topic(self.topic_response_device_info)
@@ -108,7 +108,7 @@ def add_protocol_handler(self):
108108
# Add the message listeners for all topics
109109
self.add_message_listener(self.topic_start_train, self.callback_start_train)
110110
self.add_message_listener(self.topic_stop_train, self.callback_stop_train)
111-
self.add_message_listener(self.topic_exit_train, self.callback_exit_train)
111+
self.add_message_listener(self.topic_complete_job, self.callback_complete_job)
112112
self.add_message_listener(self.topic_ota_msg, FedMLBaseMasterProtocolManager.callback_server_ota_msg)
113113
self.add_message_listener(self.topic_report_status, self.callback_report_current_status)
114114
self.add_message_listener(self.topic_response_device_info, self.callback_response_device_info)
@@ -281,7 +281,7 @@ def callback_stop_train(self, topic, payload, use_payload=None):
281281
run_id, args=self.args, server_id=server_id, request_json=request_json,
282282
run_as_cloud_agent=self.run_as_cloud_agent)
283283

284-
def callback_exit_train(self, topic, payload):
284+
def callback_complete_job(self, topic, payload):
285285
# Parse the parameters.
286286
request_json = json.loads(payload)
287287
run_id = request_json.get("runId", None)
@@ -291,10 +291,10 @@ def callback_exit_train(self, topic, payload):
291291
if server_id is None:
292292
server_id = request_json.get("server_id", None)
293293

294-
# Stop the job runner
295-
self._get_job_runner_manager().complete_job_runner(
296-
run_id, args=self.args, server_id=server_id, request_json=request_json,
297-
run_as_cloud_agent=self.run_as_cloud_agent, run_as_cloud_server=self.run_as_cloud_server)
294+
self._process_job_complete_status(run_id, server_id, request_json)
295+
296+
def _process_job_complete_status(self, run_id, server_id, complete_payload):
297+
pass
298298

299299
def callback_run_logs(self, topic, payload):
300300
run_id = str(topic).split('/')[-1]

python/fedml/computing/scheduler/master/master_protocol_manager.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,10 @@ def _init_extra_items(self):
3434
# Override
3535
def print_connected_info(self):
3636
super().print_connected_info()
37+
38+
# Override
39+
def _process_job_complete_status(self, run_id, server_id, complete_payload):
40+
# Complete the job runner
41+
self._get_job_runner_manager().complete_job_runner(
42+
run_id, args=self.args, server_id=server_id, request_json=complete_payload,
43+
run_as_cloud_agent=self.run_as_cloud_agent, run_as_cloud_server=self.run_as_cloud_server)

python/fedml/computing/scheduler/model_scheduler/master_protocol_manager.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,6 @@ def callback_deployment_result_message(self, topic=None, payload=None):
103103
logging.info(f"Received deployment result")
104104
FedMLDeployJobRunnerManager.get_instance().save_deployment_result(topic, payload)
105105

106-
def callback_exit_train(self, topic, payload):
107-
pass
108-
109106
def callback_delete_deployment(self, topic, payload):
110107
logging.info("[Master] callback_delete_deployment")
111108
# Parse payload as the model message object.

python/fedml/computing/scheduler/scheduler_core/general_constants.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,11 @@ def get_ip_address(request_json, infer_host=None):
208208
return ip
209209

210210
@staticmethod
211-
def get_topic_exit_train(server_id):
212-
topic_exit_train = f"status_center/flserver_agent_{server_id}/exit_train"
213-
return topic_exit_train
211+
def get_topic_complete_job(server_id):
212+
topic_complete_job = f"status_center/master_agent_{server_id}/complete_job"
213+
return topic_complete_job
214+
215+
@staticmethod
216+
def get_payload_complete_job(run_id, server_id):
217+
payload_complete_job = {"runId": run_id, "serverId": server_id}
218+
return payload_complete_job

python/fedml/computing/scheduler/scheduler_core/status_manager_protocols.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,9 @@ def process_job_completed_status(self, master_id, status):
6767
# self.stop_cloud_server()
6868
# self.remove_listener_for_run_metrics(self.run_id)
6969
# self.remove_listener_for_run_logs(self.run_id)
70-
payload_exit_train = {"runId": self.run_id, "serverId": master_id}
7170
self.message_center.receive_message(
72-
GeneralConstants.get_topic_exit_train(master_id), json.dumps(payload_exit_train))
71+
GeneralConstants.get_topic_complete_job(master_id),
72+
json.dumps(GeneralConstants.get_payload_complete_job(self.run_id, master_id)))
7373

7474
if self.status_center.is_deployment_status_center and status == ServerConstants.MSG_MLOPS_SERVER_STATUS_FAILED:
7575
self.report_deployment_status(self.run_id, GeneralConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_FAILED)

python/fedml/computing/scheduler/scheduler_core/task_protocol_manager.py

Lines changed: 0 additions & 24 deletions
This file was deleted.

0 commit comments

Comments
 (0)