Skip to content

Commit d7f2423

Browse files
authored
Merge pull request #2111 from FedML-AI/alexleung/dev_v070_for_refactor
Alexleung/dev v070 for refactor
2 parents 7c0e08c + 74a5634 commit d7f2423

6 files changed

Lines changed: 68 additions & 54 deletions

File tree

python/fedml/computing/scheduler/comm_utils/run_process_utils.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@ def get_run_process_prefix(prefix, run_id):
1414
return f"{prefix}-run@{run_id}@pid@"
1515

1616
@staticmethod
17-
def cleanup_run_process(run_id, data_dir, info_dir,
18-
info_file_prefix=SchedulerConstants.RUN_PROCESS_TYPE_RUNNER_PROCESS):
17+
def cleanup_run_process(
18+
run_id, data_dir, info_dir,
19+
info_file_prefix=SchedulerConstants.RUN_PROCESS_TYPE_RUNNER_PROCESS, not_kill_subprocess=False
20+
):
1921
try:
2022
local_pkg_data_dir = data_dir
2123
run_process_dir = os.path.join(local_pkg_data_dir, info_dir)
@@ -43,12 +45,13 @@ def cleanup_run_process(run_id, data_dir, info_dir,
4345

4446
try:
4547
process = psutil.Process(int(process_id))
46-
child_processes = process.children(recursive=True)
47-
for sub_process in child_processes:
48-
if platform.system() == 'Windows':
49-
os.system("taskkill /PID {} /T /F".format(sub_process.pid))
50-
else:
51-
os.kill(sub_process.pid, signal.SIGKILL)
48+
if not not_kill_subprocess:
49+
child_processes = process.children(recursive=True)
50+
for sub_process in child_processes:
51+
if platform.system() == 'Windows':
52+
os.system("taskkill /PID {} /T /F".format(sub_process.pid))
53+
else:
54+
os.kill(sub_process.pid, signal.SIGKILL)
5255

5356
if process is not None:
5457
if platform.system() == 'Windows':

python/fedml/computing/scheduler/master/base_master_job_runner.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,13 @@ def run(
111111
if self.mlops_metrics is not None:
112112
self.mlops_metrics.stop_sys_perf()
113113
time.sleep(3)
114-
ServerConstants.cleanup_run_process(self.run_id)
114+
self.cleanup_runner_process(self.run_id)
115115
ServerConstants.cleanup_learning_process(self.run_id)
116116
ServerConstants.cleanup_bootstrap_process(self.run_id)
117117

118+
def cleanup_runner_process(self, run_id):
119+
ServerConstants.cleanup_run_process(run_id)
120+
118121
@debug
119122
@abstractmethod
120123
def run_impl(

python/fedml/computing/scheduler/master/server_constants.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,9 +268,10 @@ def get_dataset_metadata_url():
268268
return get_dataset_metadata_url
269269

270270
@staticmethod
271-
def cleanup_run_process(run_id):
271+
def cleanup_run_process(run_id, not_kill_subprocess=False):
272272
RunProcessUtils.cleanup_run_process(
273-
run_id, ServerConstants.get_data_dir(), ServerConstants.LOCAL_RUNNER_INFO_DIR_NAME)
273+
run_id, ServerConstants.get_data_dir(), ServerConstants.LOCAL_RUNNER_INFO_DIR_NAME,
274+
not_kill_subprocess=not_kill_subprocess)
274275

275276
@staticmethod
276277
def save_run_process(run_id, process_id):

python/fedml/computing/scheduler/model_scheduler/device_model_inference.py

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,35 +26,35 @@
2626
pass
2727

2828

29-
class Settings(BaseSettings):
30-
redis_addr: str
31-
redis_port: str
32-
redis_password: str
33-
end_point_name: str
34-
model_name: str
35-
model_version: str
36-
model_infer_url: str
37-
version: str
38-
use_mqtt_inference: bool
39-
use_worker_gateway: bool
40-
ext_info: str
41-
42-
43-
settings = Settings()
44-
45-
# class settings:
46-
# redis_addr = "127.0.0.1"
47-
# redis_port = 6379
48-
# redis_password = "fedml_default"
49-
# end_point_name = ""
50-
# model_name = ""
51-
# model_version = ""
52-
# model_infer_url = "127.0.0.1"
53-
# version = "dev"
54-
# use_mqtt_inference = False
55-
# use_worker_gateway = False
56-
# ext_info = "2b34303961245c4f175f2236282d7a272c040b0904747579087f6a760112030109010c215d54505707140005190a051c347f365c4a430c020a7d39120e26032a78730f797f7c031f0901657e75"
29+
# class Settings(BaseSettings):
30+
# redis_addr: str
31+
# redis_port: str
32+
# redis_password: str
33+
# end_point_name: str
34+
# model_name: str
35+
# model_version: str
36+
# model_infer_url: str
37+
# version: str
38+
# use_mqtt_inference: bool
39+
# use_worker_gateway: bool
40+
# ext_info: str
5741
#
42+
#
43+
# settings = Settings()
44+
45+
class settings:
46+
redis_addr = "127.0.0.1"
47+
redis_port = 6379
48+
redis_password = "fedml_default"
49+
end_point_name = ""
50+
model_name = ""
51+
model_version = ""
52+
model_infer_url = "127.0.0.1"
53+
version = "dev"
54+
use_mqtt_inference = False
55+
use_worker_gateway = False
56+
ext_info = "2b34303961245c4f175f2236282d7a272c040b0904747579087f6a760112030109010c215d54505707140005190a051c347f365c4a430c020a7d39120e26032a78730f797f7c031f0901657e75"
57+
5858

5959
api = FastAPI()
6060

python/fedml/computing/scheduler/model_scheduler/device_server_constants.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,9 +295,10 @@ def get_public_ip():
295295
return ip
296296

297297
@staticmethod
298-
def cleanup_run_process(run_id):
298+
def cleanup_run_process(run_id, not_kill_subprocess=False):
299299
RunProcessUtils.cleanup_run_process(
300-
run_id, ServerConstants.get_data_dir(), ServerConstants.LOCAL_RUNNER_INFO_DIR_NAME)
300+
run_id, ServerConstants.get_data_dir(), ServerConstants.LOCAL_RUNNER_INFO_DIR_NAME,
301+
not_kill_subprocess=not_kill_subprocess)
301302

302303
@staticmethod
303304
def save_run_process(run_id, process_id):

python/fedml/computing/scheduler/model_scheduler/master_job_runner.py

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525

2626

2727
class FedMLDeployMasterJobRunner(FedMLBaseMasterJobRunner, FedMLDeployJobRunnerMsgSender, ABC):
28-
2928
default_redis_addr = "local"
3029
default_redis_port = "6379"
3130
default_redis_password = "fedml_default"
@@ -54,7 +53,7 @@ def __init__(self, args, run_id=0, request_json=None, agent_config=None, edge_id
5453
self.deployment_result_queue = Queue()
5554

5655
# Override
57-
def _generate_job_runner_instance(self, args, run_id=None, request_json=None, agent_config=None, edge_id=None,):
56+
def _generate_job_runner_instance(self, args, run_id=None, request_json=None, agent_config=None, edge_id=None, ):
5857
return FedMLDeployMasterJobRunner(
5958
args, run_id=run_id, request_json=request_json, agent_config=self.agent_config, edge_id=edge_id
6059
)
@@ -65,10 +64,10 @@ def _generate_extend_queue_list(self):
6564

6665
# Override
6766
def run_impl(
68-
self, edge_id_status_queue, edge_device_info_queue, run_metrics_queue,
69-
run_event_queue, run_artifacts_queue, run_logs_queue, edge_device_info_global_queue,
70-
run_extend_queue_list=None, sender_message_queue=None, listener_message_queue=None,
71-
status_center_queue=None
67+
self, edge_id_status_queue, edge_device_info_queue, run_metrics_queue,
68+
run_event_queue, run_artifacts_queue, run_logs_queue, edge_device_info_global_queue,
69+
run_extend_queue_list=None, sender_message_queue=None, listener_message_queue=None,
70+
status_center_queue=None
7271
):
7372
# Parse the model parameters.
7473
run_id, end_point_name, token, user_id, user_name, device_ids, device_objs, model_config, model_name, \
@@ -114,13 +113,13 @@ def run_impl(
114113
message_center=self.message_center)
115114

116115
# start unified inference server
117-
self.start_device_inference_gateway(
116+
FedMLDeployMasterJobRunner.start_device_inference_gateway(
118117
inference_port=inference_port, agent_config=self.agent_config)
119118

120119
# start inference monitor server
121-
self.stop_device_inference_monitor(
120+
FedMLDeployMasterJobRunner.stop_device_inference_monitor(
122121
run_id, end_point_name, model_id, model_name, model_version)
123-
self.start_device_inference_monitor(
122+
FedMLDeployMasterJobRunner.start_device_inference_monitor(
124123
run_id, end_point_name, model_id, model_name, model_version)
125124

126125
# Changed the status to "IDLE"
@@ -331,7 +330,7 @@ def process_deployment_result_message(self, topic=None, payload=None):
331330
elif run_operation == "UPDATE":
332331
# Overwrite the json with the rollback version diff
333332
rollback_version_diff = self.replica_controller.rollback_get_replica_version_diff(
334-
device_id_trigger=device_id, replica_no_trigger=replica_no)
333+
device_id_trigger=device_id, replica_no_trigger=replica_no)
335334

336335
# Change the target version to the start version
337336
self.replica_controller.rollback_setback_target_replica_version()
@@ -461,6 +460,9 @@ def process_deployment_result_message(self, topic=None, payload=None):
461460
time.sleep(3)
462461
self.trigger_completed_event()
463462

463+
def cleanup_runner_process(self, run_id):
464+
ServerConstants.cleanup_run_process(run_id, not_kill_subprocess=True)
465+
464466
@staticmethod
465467
def start_device_inference_gateway(
466468
inference_port=ServerConstants.MODEL_INFERENCE_DEFAULT_PORT,
@@ -489,6 +491,7 @@ def start_device_inference_gateway(
489491
agent_config["mqtt_config"]["MQTT_USER"] + connect_str +
490492
agent_config["mqtt_config"]["MQTT_PWD"] + connect_str +
491493
str(agent_config["mqtt_config"]["MQTT_KEEPALIVE"]), "FEDML@9999GREAT")
494+
python_program = get_python_program()
492495
inference_gateway_process = ServerConstants.exec_console_with_script(
493496
"REDIS_ADDR=\"{}\" REDIS_PORT=\"{}\" REDIS_PASSWORD=\"{}\" "
494497
"END_POINT_NAME=\"{}\" "
@@ -503,6 +506,8 @@ def start_device_inference_gateway(
503506
should_capture_stdout=False, should_capture_stderr=False)
504507

505508
return inference_gateway_process
509+
else:
510+
return inference_gateway_pids[0]
506511

507512
return None
508513

@@ -546,8 +551,6 @@ def recover_inference_and_monitor():
546551
except Exception as e:
547552
pass
548553

549-
FedMLDeployMasterJobRunner.start_device_inference_gateway(agent_config=agent_config)
550-
551554
history_jobs = FedMLServerDataInterface.get_instance().get_history_jobs()
552555
for job in history_jobs.job_list:
553556
if job.running_json is None:
@@ -566,6 +569,9 @@ def recover_inference_and_monitor():
566569
if not is_activated:
567570
continue
568571

572+
FedMLDeployMasterJobRunner.start_device_inference_gateway(
573+
inference_port=inference_port, agent_config=agent_config)
574+
569575
FedMLDeployMasterJobRunner.stop_device_inference_monitor(
570576
run_id, end_point_name, model_id, model_name, model_version)
571577
FedMLDeployMasterJobRunner.start_device_inference_monitor(
@@ -778,7 +784,7 @@ def parse_model_run_params(running_json):
778784
model_version = model_config["model_version"]
779785
model_config_parameters = running_json.get("parameters", {})
780786

781-
inference_port = model_config_parameters.get("server_internal_port", # Internal port is for the gateway
787+
inference_port = model_config_parameters.get("server_internal_port", # Internal port is for the gateway
782788
ServerConstants.MODEL_INFERENCE_DEFAULT_PORT)
783789
inference_port_external = model_config_parameters.get("server_external_port", inference_port)
784790

0 commit comments

Comments
 (0)