Skip to content

Commit aa1df99

Browse files
committed
[CoreEngine] make the handshaking protocol work.
1 parent 14429ad commit aa1df99

2 files changed

Lines changed: 16 additions & 55 deletions

File tree

python/fedml/computing/scheduler/slave/base_slave_protocol_manager.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ def __init__(self, args, agent_config=None):
4848
self.topic_report_status = None
4949
self.topic_ota_msg = None
5050
self.topic_request_device_info = None
51+
self.topic_request_device_info_from_mlops = None
5152
self.topic_client_logout = None
5253
self.topic_response_job_status = None
5354
self.topic_report_device_status_in_job = None
@@ -87,6 +88,9 @@ def generate_topics(self):
8788
# The topic for requesting device info from the client.
8889
self.topic_request_device_info = "server/client/request_device_info/" + str(self.edge_id)
8990

91+
# The topic for requesting device info from mlops.
92+
self.topic_request_device_info_from_mlops = f"deploy/mlops/slave_agent/request_device_info/{self.edge_id}"
93+
9094
# The topic for requesting device info from MLOps.
9195
self.topic_client_logout = "mlops/client/logout/" + str(self.edge_id)
9296

@@ -114,6 +118,7 @@ def generate_topics(self):
114118
self.add_subscribe_topic(self.topic_report_status)
115119
self.add_subscribe_topic(self.topic_ota_msg)
116120
self.add_subscribe_topic(self.topic_request_device_info)
121+
self.add_subscribe_topic(self.topic_request_device_info_from_mlops)
117122
self.add_subscribe_topic(self.topic_client_logout)
118123
self.add_subscribe_topic(self.topic_response_job_status)
119124
self.add_subscribe_topic(self.topic_report_device_status_in_job)
@@ -132,6 +137,7 @@ def add_protocol_handler(self):
132137
self.add_message_listener(self.topic_ota_msg, FedMLBaseSlaveProtocolManager.callback_client_ota_msg)
133138
self.add_message_listener(self.topic_report_status, self.callback_report_current_status)
134139
self.add_message_listener(self.topic_request_device_info, self.callback_report_device_info)
140+
self.add_message_listener(self.topic_request_device_info_from_mlops, self.callback_request_device_info_from_mlops)
135141
self.add_message_listener(self.topic_client_logout, self.callback_client_logout)
136142
self.add_message_listener(self.topic_response_job_status, self.callback_response_job_status)
137143
self.add_message_listener(self.topic_report_device_status_in_job, self.callback_response_device_status_in_job)
@@ -402,6 +408,15 @@ def callback_report_device_info(self, topic, payload):
402408
response_payload["context"] = context
403409
self.message_center.send_message(response_topic, json.dumps(response_payload), run_id=run_id)
404410

411+
def callback_request_device_info_from_mlops(self, topic, payload):
412+
self.response_device_info_to_mlops(topic, payload)
413+
414+
def response_device_info_to_mlops(self, topic, payload):
415+
response_topic = f"deploy/slave_agent/mlops/response_device_info"
416+
response_payload = {"run_id": self.run_id, "slave_agent_device_id": self.edge_id,
417+
"fedml_version": fedml.__version__}
418+
self.message_center.send_message(response_topic, json.dumps(response_payload))
419+
405420
def callback_client_logout(self, topic, payload):
406421
payload_json = json.loads(payload)
407422
secret = payload_json.get("auth", None)

python/fedml/computing/scheduler/slave/slave_protocol_manager.py

Lines changed: 1 addition & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,11 @@ class FedMLLaunchSlaveProtocolManager(FedMLBaseSlaveProtocolManager):
1313

1414
def __init__(self, args, agent_config=None):
1515
FedMLBaseSlaveProtocolManager.__init__(self, args, agent_config=agent_config)
16-
self.topic_request_deploy_slave_device_info_from_mlops = None
17-
self.topic_request_deploy_master_device_info_from_mlops = None
18-
self.topic_request_edge_device_info_from_mlops = None
1916

2017
# Override
2118
def generate_topics(self):
2219
super().generate_topics()
2320

24-
# The topic for requesting device info from mlops.
25-
self.topic_request_edge_device_info_from_mlops = f"deploy/mlops/slave_agent/request_device_info/{self.edge_id}"
26-
27-
self.add_subscribe_topic(self.topic_request_edge_device_info_from_mlops)
28-
2921
# Override
3022
def add_protocol_handler(self):
3123
super().add_protocol_handler()
@@ -106,54 +98,8 @@ def _init_extra_items(self):
10698
os.environ["FEDML_DEPLOY_MASTER_ID"] = str(self.model_device_server_id)
10799
os.environ["FEDML_DEPLOY_WORKER_IDS"] = str(self.model_device_client_edge_id_list)
108100

109-
# Subscribe handshaking messages from MLOps.
110-
self.subscribe_handshaking_messages_from_mlops()
111-
112101
# Start the monitor process
113102
self.args = copy.deepcopy(in_args)
114103
self.mlops_metrics.stop_device_realtime_perf()
115104
self.mlops_metrics.report_device_realtime_perf(self.args, self.args.agent_config["mqtt_config"])
116-
pass
117-
118-
def callback_response_device_info_to_mlops(self, topic, payload):
119-
payload_json = json.loads(payload)
120-
server_id = payload_json.get("server_id", 0)
121-
run_id = payload_json.get("run_id", 0)
122-
listen_edge_id = str(topic).split("/")[-1]
123-
context = payload_json.get("context", None)
124-
response_topic = f"deploy/slave_agent/mlops/response_device_info"
125-
if self.mlops_metrics is not None and self.model_device_client_edge_id_list is not None and \
126-
self.model_device_server_id is not None:
127-
device_info_json = {
128-
"edge_id": listen_edge_id,
129-
"fedml_version": fedml.__version__,
130-
"user_id": self.args.user
131-
}
132-
salve_device_ids = list()
133-
for model_client_edge_id in self.model_device_client_edge_id_list:
134-
salve_device_ids.append(model_client_edge_id)
135-
response_payload = {"slave_device_id": self.model_device_client_edge_id_list[0],
136-
"slave_device_id_list": salve_device_ids,
137-
"master_device_id": self.model_device_server_id,
138-
"run_id": run_id, "edge_id": listen_edge_id,
139-
"edge_info": device_info_json}
140-
if context is not None:
141-
response_payload["context"] = context
142-
self.message_center.send_message(response_topic, json.dumps(response_payload), run_id=run_id)
143-
144-
def subscribe_handshaking_messages_from_mlops(self):
145-
# The topic for requesting deployment master device info from mlops.
146-
self.topic_request_deploy_master_device_info_from_mlops = f"deploy/mlops/master_agent/request_device_info/{self.model_device_server_id}"
147-
148-
# The topic for requesting deployment slave device info from mlops.
149-
self.topic_request_deploy_slave_device_info_from_mlops = f"deploy/mlops/slave_agent/request_device_info/{self.model_device_client_edge_id_list[0]}"
150-
151-
self.add_subscribe_topic(self.topic_request_deploy_master_device_info_from_mlops)
152-
self.add_subscribe_topic(self.topic_request_deploy_slave_device_info_from_mlops)
153-
154-
self.add_message_listener(
155-
self.topic_request_edge_device_info_from_mlops, self.callback_response_device_info_to_mlops)
156-
self.add_message_listener(
157-
self.topic_request_deploy_master_device_info_from_mlops, self.callback_response_device_info_to_mlops)
158-
self.add_message_listener(
159-
self.topic_request_deploy_slave_device_info_from_mlops, self.callback_response_device_info_to_mlops)
105+
pass

0 commit comments

Comments
 (0)