Skip to content

Commit bec28a6

Browse files
authored
Merge pull request #2167 from FedML-AI/raphael/hotfix-inference-port
[Deploy] Finalize Gateway/Proxy port during device binding.
2 parents 8530973 + 9c8ce99 commit bec28a6

7 files changed

Lines changed: 69 additions & 29 deletions

File tree

python/fedml/api/__init__.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
from fedml.computing.scheduler.scheduler_entry.cluster_manager import FedMLClusterModelList
2525
from fedml.computing.scheduler.scheduler_entry.run_manager import FedMLRunStartedModel, FedMLGpuDevices, \
2626
FedMLRunModelList, FeatureEntryPoint
27+
from fedml.computing.scheduler.model_scheduler.device_server_constants import ServerConstants
28+
from fedml.computing.scheduler.model_scheduler.device_client_constants import ClientConstants
2729

2830

2931
def fedml_login(api_key: str = None):
@@ -209,16 +211,18 @@ def fedml_build(platform, type, source_folder, entry_point, config_folder, dest_
209211
return build.build(platform, type, source_folder, entry_point, config_folder, dest_folder, ignore)
210212

211213

212-
def login(api_key, computing, server, supplier):
213-
device_bind(api_key, computing, server, supplier)
214+
def login(api_key, computing, server, supplier,
215+
master_inference_gateway_port: int = ServerConstants.MODEL_INFERENCE_DEFAULT_PORT,
216+
worker_inference_proxy_port: int = ClientConstants.LOCAL_CLIENT_API_PORT):
217+
device_bind(api_key, computing, server, supplier, master_inference_gateway_port, worker_inference_proxy_port)
214218

215219

216220
def logout(computing, server):
217221
device_unbind(computing, server)
218222

219223

220-
def device_bind(api_key, computing, server, supplier):
221-
device.bind(api_key, computing, server, supplier)
224+
def device_bind(api_key, computing, server, supplier, master_inference_gateway_port, worker_inference_proxy_port):
225+
device.bind(api_key, computing, server, supplier, master_inference_gateway_port, worker_inference_proxy_port)
222226

223227

224228
def device_unbind(computing, server):

python/fedml/api/modules/device.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,18 @@
1010
from fedml.computing.scheduler.comm_utils.constants import SchedulerConstants
1111
from fedml.computing.scheduler.comm_utils.run_process_utils import RunProcessUtils
1212
from fedml.computing.scheduler.master.server_constants import ServerConstants
13+
from fedml.computing.scheduler.model_scheduler.device_server_constants import ServerConstants as DeviceServerConstants
1314
from fedml.computing.scheduler.master.server_login import logout as server_logout
1415
from fedml.computing.scheduler.slave.client_constants import ClientConstants
16+
from fedml.computing.scheduler.model_scheduler.device_client_constants import ClientConstants as DeviceClientConstants
1517
from fedml.computing.scheduler.slave.client_login import logout as client_logout
1618
from fedml.computing.scheduler.scheduler_entry.resource_manager import FedMLResourceManager
1719

1820

1921
def bind(
20-
api_key, computing, server, supplier
22+
api_key, computing, server, supplier,
23+
master_inference_gateway_port=DeviceServerConstants.MODEL_INFERENCE_DEFAULT_PORT,
24+
worker_inference_proxy_port=DeviceClientConstants.LOCAL_CLIENT_API_PORT
2125
):
2226
userid = api_key
2327
runner_cmd = "{}"
@@ -43,13 +47,13 @@ def bind(
4347
_bind(
4448
userid, computing, server,
4549
api_key, role, runner_cmd, device_id, os_name,
46-
docker)
50+
docker, master_inference_gateway_port, worker_inference_proxy_port)
4751

4852

4953
def _bind(
5054
userid, computing, server,
5155
api_key, role, runner_cmd, device_id, os_name,
52-
docker):
56+
docker, master_inference_gateway_port, worker_inference_proxy_port):
5357
fedml.load_env()
5458
if os.getenv(ModuleConstants.ENV_FEDML_INFER_HOST) is None:
5559
fedml.set_env_kv(ModuleConstants.ENV_FEDML_INFER_HOST, SchedulerConstants.REDIS_INFER_HOST)
@@ -60,6 +64,9 @@ def _bind(
6064
if os.getenv(ModuleConstants.ENV_FEDML_INFER_REDIS_PASSWORD) is None:
6165
fedml.set_env_kv(ModuleConstants.ENV_FEDML_INFER_REDIS_PASSWORD, SchedulerConstants.REDIS_PASSWORD)
6266

67+
fedml.set_env_kv(DeviceServerConstants.ENV_MASTER_INFERENCE_PORT_KEY, str(master_inference_gateway_port))
68+
fedml.set_env_kv(DeviceClientConstants.ENV_CLIENT_PROXY_PORT_KEY, str(worker_inference_proxy_port))
69+
6370
url = fedml._get_backend_service()
6471
platform_name = platform.system()
6572
docker_config_text = None

python/fedml/cli/modules/login.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
import fedml.api
66
from fedml.api.modules.utils import authenticate
7+
from fedml.computing.scheduler.model_scheduler.device_server_constants import ServerConstants
8+
from fedml.computing.scheduler.model_scheduler.device_client_constants import ClientConstants
79

810

911
@click.command("login", help="Login the FedML® Nexus AI Platform")
@@ -51,9 +53,25 @@
5153
default=80,
5254
help="The port for local on-premise Nexus AI Platform.",
5355
)
56+
@click.option(
57+
"--master_inference_gateway_port",
58+
"-mgp",
59+
type=int,
60+
default=ServerConstants.MODEL_INFERENCE_DEFAULT_PORT,
61+
help="The port for master inference gateway.",
62+
)
63+
@click.option(
64+
"--worker_inference_proxy_port",
65+
"-wpp",
66+
type=int,
67+
default=ClientConstants.LOCAL_CLIENT_API_PORT,
68+
help="The port for worker inference proxy.",
69+
)
5470
def fedml_login(
5571
api_key, version, compute_node, server, provider, deploy_worker_num,
56-
local_on_premise_platform, local_on_premise_platform_port):
72+
local_on_premise_platform, local_on_premise_platform_port,
73+
master_inference_gateway_port, worker_inference_proxy_port
74+
):
5775
fedml.set_env_version(version)
5876
fedml.set_local_on_premise_platform_host(local_on_premise_platform)
5977
fedml.set_local_on_premise_platform_port(local_on_premise_platform_port)
@@ -66,4 +84,4 @@ def fedml_login(
6684
print(f"Maybe you are using account id to login, we will try to login with account {api_key}.")
6785
pass
6886
os.environ["FEDML_MODEL_WORKER_NUM"] = str(deploy_worker_num)
69-
fedml.api.login(api_key, compute_node, server, provider)
87+
fedml.api.login(api_key, compute_node, server, provider, master_inference_gateway_port, worker_inference_proxy_port)

python/fedml/computing/scheduler/model_scheduler/device_client_constants.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ class ClientConstants(object):
7474
K8S_DEPLOYMENT_SLAVE_MOUNT_HOME_DIR = "/home/fedml/fedml-client"
7575

7676
LOCAL_CLIENT_API_PORT = 22030
77+
ENV_CLIENT_PROXY_PORT_KEY = "FEDML_WORKER_INFERENCE_PROXY_PORT"
7778

7879
INFERENCE_HTTP_PORT = 8000
7980
INFERENCE_GRPC_PORT = 8001
@@ -457,6 +458,14 @@ def get_public_ip():
457458
logging.info("Failed to get public ip: {}".format(e))
458459
return ip
459460

461+
@staticmethod
462+
def get_inference_worker_proxy_port() -> int:
463+
# Use dotenv to load the environment variables
464+
fedml.load_env()
465+
worker_proxy_port = int(os.getenv(ClientConstants.ENV_CLIENT_PROXY_PORT_KEY,
466+
default=ClientConstants.LOCAL_CLIENT_API_PORT))
467+
return worker_proxy_port
468+
460469
@staticmethod
461470
def check_process_is_running(process_id):
462471
for proc in psutil.process_iter():

python/fedml/computing/scheduler/model_scheduler/device_server_constants.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ class ServerConstants(object):
103103

104104
AUTO_DETECT_PUBLIC_IP = "auto_detect_public_ip"
105105
MODEL_INFERENCE_DEFAULT_PORT = 2203
106+
ENV_MASTER_INFERENCE_PORT_KEY = "FEDML_MASTER_INFERENCE_GATEWAY_PORT"
106107
MODEL_CACHE_KEY_EXPIRE_TIME = 1 * 10
107108

108109
INFERENCE_REQUEST_TIMEOUT_KEY = "request_timeout_sec"
@@ -348,6 +349,15 @@ def get_runner_infos():
348349
logging.error(f"Failed to parse runner info: {e}")
349350
return runner_info
350351

352+
@staticmethod
353+
def get_inference_master_gateway_port():
354+
# Use dotenv to load the environment variables
355+
fedml.load_env()
356+
master_inference_port = int(os.getenv(ServerConstants.ENV_MASTER_INFERENCE_PORT_KEY,
357+
default=ServerConstants.MODEL_INFERENCE_DEFAULT_PORT))
358+
return master_inference_port
359+
360+
351361
@staticmethod
352362
def save_runner_infos(unique_device_id, edge_id, run_id=None):
353363
local_pkg_data_dir = ServerConstants.get_data_dir()

python/fedml/computing/scheduler/model_scheduler/master_job_runner.py

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ def run_impl(
115115
message_center=self.message_center)
116116

117117
# start unified inference gateway process if not started
118-
FedMLDeployMasterJobRunner.start_device_inference_gateway(inference_port=inference_port)
118+
FedMLDeployMasterJobRunner.start_device_inference_gateway()
119119

120120
# start inference monitor process
121121
FedMLDeployMasterJobRunner.stop_device_inference_monitor(
@@ -144,7 +144,7 @@ def run_impl(
144144
# No device is added, updated or removed
145145
logging.info("No device is added, updated or removed. No action needed for reconciliation.")
146146
ip = GeneralConstants.get_ip_address(self.request_json)
147-
master_port = os.getenv("FEDML_MASTER_PORT", None)
147+
master_port = ServerConstants.get_inference_master_gateway_port()
148148
if master_port is not None:
149149
inference_port = int(master_port)
150150
model_inference_port = inference_port
@@ -299,9 +299,7 @@ def process_deployment_result_message(self, topic=None, payload=None):
299299
else:
300300
# This is the last worker that failed, so we should continue to "ABORTED" status
301301
model_config_parameters = self.request_json["parameters"]
302-
inference_port = model_config_parameters.get("server_internal_port",
303-
ServerConstants.MODEL_INFERENCE_DEFAULT_PORT)
304-
inference_port_external = model_config_parameters.get("server_external_port", inference_port)
302+
inference_port_external = ServerConstants.get_inference_master_gateway_port()
305303
ip = GeneralConstants.get_ip_address(self.request_json)
306304
if ip.startswith("http://") or ip.startswith("https://"):
307305
model_inference_url = "{}/inference/{}".format(ip, end_point_id)
@@ -369,12 +367,7 @@ def process_deployment_result_message(self, topic=None, payload=None):
369367
"""
370368
When all the devices have finished the add / delete / update operation
371369
"""
372-
# Generate one unified inference api
373-
# Note that here we use the gateway port instead of the inference port that is used by the slave device
374-
model_config_parameters = request_json["parameters"]
375-
inference_port = model_config_parameters.get("server_internal_port",
376-
ServerConstants.MODEL_INFERENCE_DEFAULT_PORT)
377-
inference_port_external = model_config_parameters.get("server_external_port", inference_port)
370+
inference_port_external = ServerConstants.get_inference_master_gateway_port()
378371
ip = GeneralConstants.get_ip_address(request_json)
379372

380373
if ip.startswith("http://") or ip.startswith("https://"):
@@ -457,12 +450,10 @@ def cleanup_runner_process(self, run_id):
457450
ServerConstants.cleanup_run_process(run_id, not_kill_subprocess=True)
458451

459452
@staticmethod
460-
def start_device_inference_gateway(inference_port=ServerConstants.MODEL_INFERENCE_DEFAULT_PORT):
453+
def start_device_inference_gateway():
461454
# start unified inference server
462455
python_program = get_python_program()
463-
master_port = os.getenv("FEDML_MASTER_PORT", None)
464-
if master_port is not None:
465-
inference_port = int(master_port)
456+
inference_port = ServerConstants.get_inference_master_gateway_port()
466457
if not ServerConstants.is_running_on_k8s():
467458
logging.info(f"start the model inference gateway...")
468459
inference_gw_cmd = "fedml.computing.scheduler.model_scheduler.device_model_inference:api"
@@ -543,7 +534,7 @@ def recover_inference_and_monitor():
543534
if not is_activated:
544535
continue
545536

546-
FedMLDeployMasterJobRunner.start_device_inference_gateway(inference_port=inference_port)
537+
FedMLDeployMasterJobRunner.start_device_inference_gateway()
547538

548539
FedMLDeployMasterJobRunner.stop_device_inference_monitor(
549540
run_id, end_point_name, model_id, model_name, model_version)
@@ -757,9 +748,7 @@ def parse_model_run_params(running_json):
757748
model_version = model_config["model_version"]
758749
model_config_parameters = running_json.get("parameters", {})
759750

760-
inference_port = model_config_parameters.get("server_internal_port", # Internal port is for the gateway
761-
ServerConstants.MODEL_INFERENCE_DEFAULT_PORT)
762-
inference_port_external = model_config_parameters.get("server_external_port", inference_port)
751+
inference_port = ServerConstants.get_inference_master_gateway_port()
763752

764753
return run_id, end_point_name, token, user_id, user_name, device_ids, device_objs, model_config, model_name, \
765754
model_id, model_storage_url, scale_min, scale_max, inference_engine, model_is_from_open, \

python/fedml/computing/scheduler/model_scheduler/worker_protocol_manager.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ def _init_extra_items(self):
7979

8080
client_api_cmd = "fedml.computing.scheduler.model_scheduler.device_client_api:api"
8181
client_api_pids = RunProcessUtils.get_pid_from_cmd_line(client_api_cmd)
82+
83+
worker_proxy_port = ClientConstants.get_inference_worker_proxy_port()
84+
8285
if client_api_pids is None or len(client_api_pids) <= 0:
8386
# Start local API services
8487
cur_dir = os.path.dirname(__file__)
@@ -88,7 +91,7 @@ def _init_extra_items(self):
8891
"{} -m uvicorn {} --host 0.0.0.0 --port {} --reload --reload-delay 3 --reload-dir {} "
8992
"--log-level critical".format(
9093
python_program, client_api_cmd,
91-
ClientConstants.LOCAL_CLIENT_API_PORT, fedml_base_dir
94+
worker_proxy_port, fedml_base_dir
9295
),
9396
should_capture_stdout=False,
9497
should_capture_stderr=False

0 commit comments

Comments
 (0)