Skip to content

Commit e0ad9b5

Browse files
committed
[Deploy] Remove unnecessary logic; Rename readiness check function; Forbidden user level control of host post.
1 parent 23d88fc commit e0ad9b5

1 file changed

Lines changed: 40 additions & 110 deletions

File tree

python/fedml/computing/scheduler/model_scheduler/device_model_deployment.py

Lines changed: 40 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version,
6868
num_gpus = gpu_per_replica
6969
gpu_ids, gpu_attach_cmd = None, ""
7070

71+
# Concatenate the model name
7172
running_model_name = ClientConstants.get_running_model_name(
7273
end_point_name, inference_model_name, model_version, end_point_id, model_id, edge_id=edge_id)
7374

@@ -77,6 +78,7 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version,
7778
config = yaml.safe_load(file)
7879

7980
# Resource related
81+
inference_type = "default"
8082
use_gpu = config.get('use_gpu', True)
8183
num_gpus_frm_yml = config.get('num_gpus', None)
8284
if not use_gpu:
@@ -85,9 +87,7 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version,
8587
if num_gpus_frm_yml is not None:
8688
num_gpus = int(num_gpus_frm_yml)
8789
usr_indicated_wait_time = config.get('deploy_timeout', 900)
88-
usr_indicated_worker_port = config.get('worker_port', "")
89-
if usr_indicated_worker_port == "":
90-
usr_indicated_worker_port = os.environ.get("FEDML_WORKER_PORT", "")
90+
usr_indicated_retry_cnt = max(int(usr_indicated_wait_time) // 10, 1)
9191
shm_size = config.get('shm_size', None)
9292
storage_opt = config.get('storage_opt', None)
9393
tmpfs = config.get('tmpfs', None)
@@ -96,17 +96,6 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version,
9696
cpus = int(cpus)
9797
memory = config.get('memory', None)
9898

99-
if usr_indicated_worker_port == "":
100-
usr_indicated_worker_port = None
101-
else:
102-
usr_indicated_worker_port = int(usr_indicated_worker_port)
103-
104-
worker_port_env = os.environ.get("FEDML_WORKER_PORT", "")
105-
worker_port_from_config = config.get('worker_port', "")
106-
logging.info(f"usr_indicated_worker_port {usr_indicated_worker_port}, worker port env {worker_port_env}, "
107-
f"worker port from config {worker_port_from_config}")
108-
109-
usr_indicated_retry_cnt = max(int(usr_indicated_wait_time) // 10, 1)
11099
inference_image_name = config.get('inference_image_name',
111100
ClientConstants.INFERENCE_SERVER_CUSTOME_IMAGE)
112101
image_pull_policy = config.get('image_pull_policy', SchedulerConstants.IMAGE_PULL_POLICY_IF_NOT_PRESENT)
@@ -144,25 +133,15 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version,
144133

145134
# If using customized image, then bootstrap + job will be the entry point
146135
enable_custom_image = config.get("enable_custom_image", False)
136+
# inference_type = "custom"
147137
customized_image_entry_cmd = \
148138
"/bin/bash /home/fedml/models_serving/fedml-deploy-bootstrap-entry-auto-gen.sh"
149139

150140
docker_registry_user_name = config.get("docker_registry_user_name", "")
151141
docker_registry_user_password = config.get("docker_registry_user_password", "")
152142
docker_registry = config.get("docker_registry", "")
153143

154-
port_inside_container = int(config.get("port_inside_container", 2345))
155-
use_triton = config.get("use_triton", False)
156-
if use_triton:
157-
inference_type = "triton"
158-
else:
159-
inference_type = "default"
160-
161-
# Config check
162-
if src_code_dir == "":
163-
raise Exception("Please indicate source_code_dir in the fedml_model_config.yaml")
164-
if relative_entry == "":
165-
logging.warning("You missed main_entry in the fedml_model_config.yaml")
144+
port_inside_container = int(config.get("port", 2345))
166145

167146
# Request the GPU ids for the deployment
168147
if num_gpus > 0:
@@ -175,22 +154,10 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version,
175154
end_point_id, end_point_name, inference_model_name, edge_id, replica_rank+1, gpu_ids)
176155
logging.info("GPU ids allocated: {}".format(gpu_ids))
177156

157+
# Create the model serving dir if not exists
178158
model_serving_dir = ClientConstants.get_model_serving_dir()
179159
if not os.path.exists(model_serving_dir):
180160
os.makedirs(model_serving_dir, exist_ok=True)
181-
converted_model_path = os.path.join(model_storage_local_path, ClientConstants.FEDML_CONVERTED_MODEL_DIR_NAME)
182-
if os.path.exists(converted_model_path):
183-
model_file_list = os.listdir(converted_model_path)
184-
for model_file in model_file_list:
185-
src_model_file = os.path.join(converted_model_path, model_file)
186-
dst_model_file = os.path.join(model_serving_dir, model_file)
187-
if os.path.isdir(src_model_file):
188-
if not os.path.exists(dst_model_file):
189-
shutil.copytree(src_model_file, dst_model_file, copy_function=shutil.copy,
190-
ignore_dangling_symlinks=True)
191-
else:
192-
if not os.path.exists(dst_model_file):
193-
shutil.copyfile(src_model_file, dst_model_file)
194161

195162
if inference_engine != ClientConstants.INFERENCE_ENGINE_TYPE_INT_DEFAULT:
196163
raise Exception(f"inference engine {inference_engine} is not supported")
@@ -228,13 +195,12 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version,
228195
logging.info(f"Start pulling the inference image {inference_image_name}... with policy {image_pull_policy}")
229196
ContainerUtils.get_instance().pull_image_with_policy(image_pull_policy, inference_image_name)
230197

231-
volumns = []
198+
volumes = []
232199
binds = {}
233200
environment = {}
234201

235202
# data_cache_dir mounting
236-
assert type(data_cache_dir_input) == dict or type(data_cache_dir_input) == str
237-
if type(data_cache_dir_input) == str:
203+
if isinstance(data_cache_dir_input, str):
238204
# In this case, we mount to the same folder, if it has ~, we replace it with /home/fedml
239205
src_data_cache_dir, dst_data_cache_dir = "", ""
240206
if data_cache_dir_input != "":
@@ -253,28 +219,30 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version,
253219
if type(src_data_cache_dir) == str and src_data_cache_dir != "":
254220
logging.info("Start copying the data cache to the container...")
255221
if os.path.exists(src_data_cache_dir):
256-
volumns.append(src_data_cache_dir)
222+
volumes.append(src_data_cache_dir)
257223
binds[src_data_cache_dir] = {
258224
"bind": dst_data_cache_dir,
259225
"mode": "rw"
260226
}
261227
environment["DATA_CACHE_FOLDER"] = dst_data_cache_dir
262-
else:
228+
elif isinstance(data_cache_dir_input, dict):
263229
for k, v in data_cache_dir_input.items():
264230
if os.path.exists(k):
265-
volumns.append(v)
231+
volumes.append(v)
266232
binds[k] = {
267233
"bind": v,
268234
"mode": "rw"
269235
}
270236
else:
271237
logging.warning(f"{k} does not exist, skip mounting it to the container")
272-
logging.info(f"Data cache mount: {volumns}, {binds}")
238+
logging.info(f"Data cache mount: {volumes}, {binds}")
239+
else:
240+
logging.warning("data_cache_dir_input is not a string or a dictionary, skip mounting it to the container")
273241

274242
# Default mounting
275243
if not enable_custom_image or (enable_custom_image and relative_entry != ""):
276244
logging.info("Start copying the source code to the container...")
277-
volumns.append(src_code_dir)
245+
volumes.append(src_code_dir)
278246
binds[src_code_dir] = {
279247
"bind": dst_model_serving_dir,
280248
"mode": "rw"
@@ -284,7 +252,7 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version,
284252
host_config_dict = {
285253
"binds": binds,
286254
"port_bindings": {
287-
port_inside_container: usr_indicated_worker_port
255+
port_inside_container: None
288256
},
289257
"shm_size": shm_size,
290258
"storage_opt": storage_opt,
@@ -312,7 +280,6 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version,
312280
if not enable_custom_image:
313281
# For some image, the default user is root. Unified to fedml.
314282
environment["HOME"] = "/home/fedml"
315-
316283
environment["BOOTSTRAP_DIR"] = dst_bootstrap_dir
317284
environment["FEDML_CURRENT_RUN_ID"] = end_point_id
318285
environment["FEDML_CURRENT_EDGE_ID"] = edge_id
@@ -326,12 +293,13 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version,
326293
for key in extra_envs:
327294
environment[key] = extra_envs[key]
328295

296+
# Create the container
329297
try:
330298
host_config = client.api.create_host_config(**host_config_dict)
331299
new_container = client.api.create_container(
332300
image=inference_image_name,
333301
name=default_server_container_name,
334-
volumes=volumns,
302+
volumes=volumes,
335303
ports=[port_inside_container], # port open inside the container
336304
environment=environment,
337305
host_config=host_config,
@@ -349,22 +317,18 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version,
349317
while True:
350318
cnt += 1
351319
try:
352-
if usr_indicated_worker_port is not None:
353-
inference_http_port = usr_indicated_worker_port
354-
break
355-
else:
356-
# Find the random port
357-
port_info = client.api.port(new_container.get("Id"), port_inside_container)
358-
inference_http_port = port_info[0]["HostPort"]
359-
logging.info("inference_http_port: {}".format(inference_http_port))
360-
break
320+
# Find the random port
321+
port_info = client.api.port(new_container.get("Id"), port_inside_container)
322+
inference_http_port = port_info[0]["HostPort"]
323+
logging.info("host port allocated: {}".format(inference_http_port))
324+
break
361325
except:
362326
if cnt >= 5:
363327
raise Exception("Failed to get the port allocation")
364328
time.sleep(3)
365329

366330
# Logging the info from the container when starting
367-
log_deployment_result(end_point_id, model_id, default_server_container_name,
331+
log_deployment_output(end_point_id, model_id, default_server_container_name,
368332
ClientConstants.CMD_TYPE_RUN_DEFAULT_SERVER,
369333
inference_model_name, inference_engine, inference_http_port, inference_type,
370334
retry_interval=10, deploy_attempt_threshold=usr_indicated_retry_cnt,
@@ -373,9 +337,8 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version,
373337

374338
# Return the running model name and the inference output url
375339
inference_output_url, running_model_version, ret_model_metadata, ret_model_config = \
376-
get_model_info(inference_model_name, inference_engine, inference_http_port,
377-
infer_host, False, inference_type, request_input_example=request_input_example,
378-
enable_custom_image=enable_custom_image)
340+
check_container_readiness(inference_http_port=inference_http_port, infer_host=infer_host,
341+
request_input_example=request_input_example)
379342

380343
if inference_output_url == "":
381344
return running_model_name, "", None, None, None
@@ -426,9 +389,8 @@ def should_exit_logs(end_point_id, model_id, cmd_type, model_name, inference_eng
426389
# If the container has exited, return True, means we should exit the logs
427390
try:
428391
inference_output_url, model_version, model_metadata, model_config = \
429-
get_model_info(model_name, inference_engine, inference_port, infer_host,
430-
inference_type=inference_type, request_input_example=request_input_example,
431-
enable_custom_image=enable_custom_image)
392+
check_container_readiness(inference_http_port=inference_port, infer_host=infer_host,
393+
request_input_example=request_input_example)
432394
if inference_output_url != "":
433395
logging.info("Log test for deploying model successfully, inference url: {}, "
434396
"model metadata: {}, model config: {}".
@@ -443,7 +405,7 @@ def should_exit_logs(end_point_id, model_id, cmd_type, model_name, inference_eng
443405
return False
444406

445407

446-
def log_deployment_result(end_point_id, model_id, cmd_container_name, cmd_type,
408+
def log_deployment_output(end_point_id, model_id, cmd_container_name, cmd_type,
447409
inference_model_name, inference_engine,
448410
inference_http_port, inference_type="default",
449411
retry_interval=10, deploy_attempt_threshold=10,
@@ -542,10 +504,10 @@ def log_deployment_result(end_point_id, model_id, cmd_container_name, cmd_type,
542504
time.sleep(retry_interval)
543505

544506

545-
def is_client_inference_container_ready(infer_url_host, inference_http_port, inference_model_name, local_infer_url,
546-
inference_type="default", model_version="", request_input_example=None):
507+
def is_client_inference_container_ready(infer_url_host, inference_http_port, readiness_check_type="default",
508+
readiness_check_cmd=None, request_input_example=None):
547509

548-
if inference_type == "default":
510+
if readiness_check_type == "default":
549511
default_client_container_ready_url = "http://{}:{}/ready".format("0.0.0.0", inference_http_port)
550512
response = None
551513
try:
@@ -555,59 +517,27 @@ def is_client_inference_container_ready(infer_url_host, inference_http_port, inf
555517
if not response or response.status_code != 200:
556518
return "", "", {}, {}
557519

558-
# Report the deployed model info
520+
# Construct the model metadata (input and output)
559521
model_metadata = {}
560522
if request_input_example is not None and len(request_input_example) > 0:
561523
model_metadata["inputs"] = request_input_example
562524
else:
563525
model_metadata["inputs"] = {"text": "What is a good cure for hiccups?"}
564526
model_metadata["outputs"] = []
565527
model_metadata["type"] = "default"
528+
566529
return "http://{}:{}/predict".format(infer_url_host, inference_http_port), None, model_metadata, None
567530
else:
568-
triton_server_url = "{}:{}".format(infer_url_host, inference_http_port)
569-
if model_version == "" or model_version is None:
570-
model_version = ClientConstants.INFERENCE_MODEL_VERSION
571-
logging.info(
572-
f"triton_server_url: {triton_server_url} model_version: {model_version} model_name: {inference_model_name}")
573-
triton_client = http_client.InferenceServerClient(url=triton_server_url, verbose=False)
574-
if not triton_client.is_model_ready(
575-
model_name=inference_model_name, model_version=model_version
576-
):
577-
return "", model_version, {}, {}
578-
logging.info(f"Model {inference_model_name} is ready, start to get model metadata...")
579-
model_metadata = triton_client.get_model_metadata(model_name=inference_model_name, model_version=model_version)
580-
model_config = triton_client.get_model_config(model_name=inference_model_name, model_version=model_version)
581-
version_list = model_metadata.get("versions", None)
582-
if version_list is not None and len(version_list) > 0:
583-
model_version = version_list[0]
584-
else:
585-
model_version = ClientConstants.INFERENCE_MODEL_VERSION
586-
587-
inference_output_url = "http://{}:{}/{}/models/{}/versions/{}/infer".format(infer_url_host,
588-
inference_http_port,
589-
ClientConstants.INFERENCE_INFERENCE_SERVER_VERSION,
590-
inference_model_name,
591-
model_version)
592-
593-
return inference_output_url, model_version, model_metadata, model_config
594-
595-
596-
def get_model_info(model_name, inference_engine, inference_http_port, infer_host="127.0.0.1", is_hg_model=False,
597-
inference_type="default", request_input_example=None, enable_custom_image=False):
598-
if model_name is None:
531+
# TODO(Raphael): Support arbitrary readiness check command
532+
logging.error(f"Unknown readiness check type: {readiness_check_type}")
599533
return "", "", {}, {}
600534

601-
local_infer_url = "{}:{}".format(infer_host, inference_http_port)
602-
603-
if is_hg_model:
604-
inference_model_name = "{}_{}_inference".format(model_name, str(inference_engine))
605-
else:
606-
inference_model_name = model_name
607535

536+
def check_container_readiness(inference_http_port, infer_host="127.0.0.1", request_input_example=None,
537+
readiness_check_type="default", readiness_check_cmd=None):
608538
response_from_client_container = is_client_inference_container_ready(
609-
infer_host, inference_http_port, inference_model_name, local_infer_url,
610-
inference_type, model_version="", request_input_example=request_input_example)
539+
infer_host, inference_http_port, readiness_check_type, readiness_check_cmd,
540+
request_input_example=request_input_example)
611541

612542
return response_from_client_container
613543

0 commit comments

Comments
 (0)