Skip to content

Commit 34fdba0

Browse files
authored
Merge pull request #2157 from FedML-AI/raphael/unify-connectivity
[Deploy] Report worker's connectivity when it finished.
2 parents af026fb + 4a9622c commit 34fdba0

6 files changed

Lines changed: 97 additions & 53 deletions

File tree

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import os
2+
from fedml.computing.scheduler.model_scheduler.device_client_constants import ClientConstants
3+
4+
5+
def return_this_device_connectivity_type() -> str:
6+
"""
7+
Return -> "http" | "http_proxy" |"mqtt"
8+
"""
9+
# Get the environmental variable's value and convert to lower case.
10+
env_conn_type = os.getenv(ClientConstants.ENV_CONNECTION_TYPE_KEY, "").lower()
11+
if env_conn_type in [
12+
ClientConstants.WORKER_CONNECTIVITY_TYPE_HTTP,
13+
ClientConstants.WORKER_CONNECTIVITY_TYPE_HTTP_PROXY,
14+
ClientConstants.WORKER_CONNECTIVITY_TYPE_MQTT
15+
]:
16+
return env_conn_type
17+
else:
18+
return ClientConstants.WORKER_CONNECTIVITY_TYPE_DEFAULT

python/fedml/computing/scheduler/model_scheduler/device_client_constants.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,12 @@ class ClientConstants(object):
9797
INFERENCE_INFERENCE_SERVER_VERSION = "v2"
9898
INFERENCE_REQUEST_TIMEOUT = 30
9999

100+
ENV_CONNECTION_TYPE_KEY = "FEDML_CONNECTION_TYPE"
101+
WORKER_CONNECTIVITY_TYPE_HTTP = "http"
102+
WORKER_CONNECTIVITY_TYPE_HTTP_PROXY = "http_proxy"
103+
WORKER_CONNECTIVITY_TYPE_MQTT = "mqtt"
104+
WORKER_CONNECTIVITY_TYPE_DEFAULT = WORKER_CONNECTIVITY_TYPE_HTTP
105+
100106
MSG_MODELOPS_DEPLOYMENT_STATUS_INITIALIZING = "INITIALIZING"
101107
MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYING = "DEPLOYING"
102108
MSG_MODELOPS_DEPLOYMENT_STATUS_INFERRING = "INFERRING"

python/fedml/computing/scheduler/model_scheduler/device_model_cache.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -344,9 +344,13 @@ def get_result_item_info(self, result_item):
344344
result_payload = result_item_json["result"]
345345
return device_id, replica_no, result_payload
346346

347-
def get_idle_device(self, end_point_id, end_point_name,
348-
model_name, model_version,
349-
check_end_point_status=True, limit_specific_model_version=False):
347+
def get_idle_device(self,
348+
end_point_id,
349+
end_point_name,
350+
model_name,
351+
model_version,
352+
check_end_point_status=True,
353+
limit_specific_model_version=False):
350354
# Deprecated the model status logic, query directly from the deployment result list
351355
idle_device_list = list()
352356

python/fedml/computing/scheduler/model_scheduler/device_model_inference.py

Lines changed: 42 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,8 @@ async def _predict(
210210
return inference_response
211211

212212
# Found idle inference device
213-
idle_device, end_point_id, model_id, model_name, model_version, inference_host, inference_output_url = \
213+
idle_device, end_point_id, model_id, model_name, model_version, inference_host, inference_output_url,\
214+
connectivity_type = \
214215
found_idle_inference_device(in_end_point_id, in_end_point_name, in_model_name, in_model_version)
215216
if idle_device is None or idle_device == "":
216217
FEDML_MODEL_CACHE.update_pending_requests_counter(end_point_id, decrease=True)
@@ -235,13 +236,16 @@ async def _predict(
235236
stream_flag = input_json.get("stream", False)
236237
input_list["stream"] = input_list.get("stream", stream_flag)
237238
output_list = input_json.get("outputs", [])
239+
240+
# main execution of redirecting the inference request to the idle device
238241
inference_response = await send_inference_request(
239242
idle_device,
240243
end_point_id,
241244
inference_output_url,
242245
input_list,
243246
output_list,
244-
inference_type=in_return_type)
247+
inference_type=in_return_type,
248+
connectivity_type=connectivity_type)
245249

246250
# Calculate model metrics
247251
try:
@@ -304,37 +308,40 @@ def found_idle_inference_device(end_point_id, end_point_name, in_model_name, in_
304308
inference_host = ""
305309
inference_output_url = ""
306310
model_version = ""
311+
connectivity_type = ""
312+
307313
# Found idle device (TODO: optimize the algorithm to search best device for inference)
308314
payload, idle_device = FEDML_MODEL_CACHE. \
309315
get_idle_device(end_point_id, end_point_name, in_model_name, in_model_version)
310-
if payload is not None:
311-
logging.info("found idle deployment result {}".format(payload))
312-
deployment_result = payload
313-
model_name = deployment_result["model_name"]
314-
model_version = deployment_result["model_version"]
315-
model_id = deployment_result["model_id"]
316-
end_point_id = deployment_result["end_point_id"]
317-
inference_output_url = deployment_result["model_url"]
316+
if payload:
317+
model_name = payload["model_name"]
318+
model_version = payload["model_version"]
319+
model_id = payload["model_id"]
320+
end_point_id = payload["end_point_id"]
321+
inference_output_url = payload["model_url"]
322+
connectivity_type = \
323+
payload.get("connectivity_type",
324+
ClientConstants.WORKER_CONNECTIVITY_TYPE_DEFAULT)
318325
url_parsed = urlparse(inference_output_url)
319326
inference_host = url_parsed.hostname
320327
else:
321328
logging.info("not found idle deployment result")
322329

323-
return idle_device, end_point_id, model_id, model_name, model_version, inference_host, inference_output_url
330+
res = (idle_device, end_point_id, model_id, model_name, model_version, inference_host, inference_output_url,
331+
connectivity_type)
332+
logging.info(f"found idle device with metrics: {res}")
333+
334+
return res
324335

325336

326337
async def send_inference_request(idle_device, end_point_id, inference_url, input_list, output_list,
327-
inference_type="default", has_public_ip=True):
338+
inference_type="default",
339+
connectivity_type=ClientConstants.WORKER_CONNECTIVITY_TYPE_DEFAULT):
328340
request_timeout_sec = FEDML_MODEL_CACHE.get_endpoint_settings(end_point_id) \
329341
.get("request_timeout_sec", ClientConstants.INFERENCE_REQUEST_TIMEOUT)
330342

331343
try:
332-
http_infer_available = os.getenv("FEDML_INFERENCE_HTTP_AVAILABLE", True)
333-
if not http_infer_available:
334-
if http_infer_available == "False" or http_infer_available == "false":
335-
http_infer_available = False
336-
337-
if http_infer_available:
344+
if connectivity_type == ClientConstants.WORKER_CONNECTIVITY_TYPE_HTTP:
338345
response_ok = await FedMLHttpInference.is_inference_ready(
339346
inference_url,
340347
timeout=request_timeout_sec)
@@ -347,22 +354,23 @@ async def send_inference_request(idle_device, end_point_id, inference_url, input
347354
timeout=request_timeout_sec)
348355
logging.info(f"Use http inference. return {response_ok}")
349356
return inference_response
350-
351-
response_ok = await FedMLHttpProxyInference.is_inference_ready(
352-
inference_url,
353-
timeout=request_timeout_sec)
354-
if response_ok:
355-
response_ok, inference_response = await FedMLHttpProxyInference.run_http_proxy_inference_with_request(
356-
end_point_id,
357+
elif connectivity_type == ClientConstants.WORKER_CONNECTIVITY_TYPE_HTTP_PROXY:
358+
logging.warning("Use http proxy inference.")
359+
response_ok = await FedMLHttpProxyInference.is_inference_ready(
357360
inference_url,
358-
input_list,
359-
output_list,
360-
inference_type=inference_type,
361361
timeout=request_timeout_sec)
362-
logging.info(f"Use http proxy inference. return {response_ok}")
363-
return inference_response
364-
365-
if not has_public_ip:
362+
if response_ok:
363+
response_ok, inference_response = await FedMLHttpProxyInference.run_http_proxy_inference_with_request(
364+
end_point_id,
365+
inference_url,
366+
input_list,
367+
output_list,
368+
inference_type=inference_type,
369+
timeout=request_timeout_sec)
370+
logging.info(f"Use http proxy inference. return {response_ok}")
371+
return inference_response
372+
elif connectivity_type == ClientConstants.WORKER_CONNECTIVITY_TYPE_MQTT:
373+
logging.warning("Use mqtt inference.")
366374
agent_config = {"mqtt_config": Settings.mqtt_config}
367375
mqtt_inference = FedMLMqttInference(
368376
agent_config=agent_config,
@@ -385,7 +393,8 @@ async def send_inference_request(idle_device, end_point_id, inference_url, input
385393

386394
logging.info(f"Use mqtt inference. return {response_ok}.")
387395
return inference_response
388-
return {"error": True, "message": "Failed to use http, http-proxy for inference, no response from replica."}
396+
else:
397+
return {"error": True, "message": "Failed to use http, http-proxy for inference, no response from replica."}
389398
except Exception as e:
390399
inference_response = {"error": True,
391400
"message": f"Exception when using http, http-proxy and mqtt "

python/fedml/computing/scheduler/model_scheduler/master_job_runner.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -250,14 +250,6 @@ def process_deployment_result_message(self, topic=None, payload=None):
250250
logging.info(f"Endpoint {end_point_id}; Device {device_id}; replica {replica_no}; "
251251
f"run_operation {run_operation} model status {model_status}.")
252252

253-
# OPTIONAL DEBUG PARAMS
254-
# this_run_controller = self.model_runner_mapping[run_id_str].replica_controller
255-
# logging.info(f"The current replica controller state is "
256-
# f"Total version diff num {this_run_controller.total_replica_version_diff_num}")
257-
# logging.info(f"self.request_json now {self.request_json}") # request_json will be deprecated
258-
# this_run_request_json = self.request_json
259-
# logging.info(f"self.request_json now {this_run_request_json}")
260-
261253
# Set redis + sqlite deployment result
262254
FedMLModelCache.get_instance().set_redis_params(self.redis_addr, self.redis_port, self.redis_password)
263255

python/fedml/computing/scheduler/model_scheduler/worker_job_runner.py

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
from abc import ABC
1010
import yaml
1111
from fedml.computing.scheduler.comm_utils.job_utils import JobRunnerUtils
12+
from fedml.computing.scheduler.comm_utils.network_util import return_this_device_connectivity_type
13+
1214
from fedml.core.mlops import MLOpsRuntimeLog
1315
from fedml.computing.scheduler.comm_utils import file_utils
1416
from .device_client_constants import ClientConstants
@@ -234,8 +236,11 @@ def run_impl(self, run_extend_queue_list, sender_message_center,
234236
running_model_name, inference_output_url, inference_model_version, model_metadata, model_config = \
235237
"", "", model_version, {}, {}
236238

239+
# ip and connectivity
240+
worker_ip = GeneralConstants.get_ip_address(self.request_json)
241+
connectivity = return_this_device_connectivity_type()
242+
237243
if op == "add":
238-
worker_ip = GeneralConstants.get_ip_address(self.request_json)
239244
for rank in range(prev_rank + 1, prev_rank + 1 + op_num):
240245
try:
241246
running_model_name, inference_output_url, inference_model_version, model_metadata, model_config = \
@@ -269,7 +274,9 @@ def run_impl(self, run_extend_queue_list, sender_message_center,
269274
result_payload = self.send_deployment_results(
270275
end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED,
271276
model_id, model_name, inference_output_url, model_version, inference_port_external,
272-
inference_engine, model_metadata, model_config, replica_no=rank + 1)
277+
inference_engine, model_metadata, model_config, replica_no=rank + 1,
278+
connectivity=connectivity
279+
)
273280

274281
if inference_port_external != inference_port:
275282
# Save internal port to local db
@@ -278,7 +285,9 @@ def run_impl(self, run_extend_queue_list, sender_message_center,
278285
result_payload = self.construct_deployment_results(
279286
end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED,
280287
model_id, model_name, inference_output_url, model_version, inference_port,
281-
inference_engine, model_metadata, model_config, replica_no=rank + 1)
288+
inference_engine, model_metadata, model_config, replica_no=rank + 1,
289+
connectivity=connectivity
290+
)
282291

283292
FedMLModelDatabase.get_instance().set_deployment_result(
284293
run_id, end_point_name, model_name, model_version, self.edge_id,
@@ -326,7 +335,6 @@ def run_impl(self, run_extend_queue_list, sender_message_center,
326335
return True
327336
elif op == "update" or op == "rollback":
328337
# Update is combine of delete and add
329-
worker_ip = GeneralConstants.get_ip_address(self.request_json)
330338
for rank in replica_rank_to_update:
331339
# Delete a replica (container) if exists
332340
self.replica_handler.remove_replica(rank)
@@ -402,15 +410,19 @@ def run_impl(self, run_extend_queue_list, sender_message_center,
402410
result_payload = self.send_deployment_results(
403411
end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED,
404412
model_id, model_name, inference_output_url, model_version, inference_port_external,
405-
inference_engine, model_metadata, model_config, replica_no=rank + 1)
413+
inference_engine, model_metadata, model_config, replica_no=rank + 1,
414+
connectivity=connectivity
415+
)
406416

407417
if inference_port_external != inference_port: # Save internal port to local db
408418
logging.info("inference_port_external {} != inference_port {}".format(
409419
inference_port_external, inference_port))
410420
result_payload = self.construct_deployment_results(
411421
end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED,
412422
model_id, model_name, inference_output_url, model_version, inference_port,
413-
inference_engine, model_metadata, model_config, replica_no=rank + 1)
423+
inference_engine, model_metadata, model_config, replica_no=rank + 1,
424+
connectivity=connectivity
425+
)
414426

415427
FedMLModelDatabase.get_instance().set_deployment_result(
416428
run_id, end_point_name, model_name, model_version, self.edge_id,
@@ -433,7 +445,8 @@ def run_impl(self, run_extend_queue_list, sender_message_center,
433445
def construct_deployment_results(self, end_point_name, device_id, model_status,
434446
model_id, model_name, model_inference_url,
435447
model_version, inference_port, inference_engine,
436-
model_metadata, model_config, replica_no=1):
448+
model_metadata, model_config, replica_no=1,
449+
connectivity=ClientConstants.WORKER_CONNECTIVITY_TYPE_DEFAULT):
437450
deployment_results_payload = {"end_point_id": self.run_id, "end_point_name": end_point_name,
438451
"model_id": model_id, "model_name": model_name,
439452
"model_url": model_inference_url, "model_version": model_version,
@@ -444,6 +457,7 @@ def construct_deployment_results(self, end_point_name, device_id, model_status,
444457
"model_status": model_status,
445458
"inference_port": inference_port,
446459
"replica_no": replica_no,
460+
"connectivity_type": connectivity,
447461
}
448462
return deployment_results_payload
449463

@@ -466,15 +480,16 @@ def construct_deployment_status(self, end_point_name, device_id,
466480
def send_deployment_results(self, end_point_name, device_id, model_status,
467481
model_id, model_name, model_inference_url,
468482
model_version, inference_port, inference_engine,
469-
model_metadata, model_config, replica_no=1):
483+
model_metadata, model_config, replica_no=1,
484+
connectivity=ClientConstants.WORKER_CONNECTIVITY_TYPE_DEFAULT):
470485
deployment_results_topic = "model_device/model_device/return_deployment_result/{}/{}".format(
471486
self.run_id, device_id)
472487

473488
deployment_results_payload = self.construct_deployment_results(
474489
end_point_name, device_id, model_status,
475490
model_id, model_name, model_inference_url,
476491
model_version, inference_port, inference_engine,
477-
model_metadata, model_config, replica_no=replica_no)
492+
model_metadata, model_config, replica_no=replica_no, connectivity=connectivity)
478493

479494
logging.info("[client] send_deployment_results: topic {}, payload {}.".format(deployment_results_topic,
480495
deployment_results_payload))

0 commit comments

Comments
 (0)