Skip to content

Commit 41e62ac

Browse files
authored
Merge pull request #2115 from FedML-AI/alexleung/dev_v070_for_refactor
Alexleung/dev v070 for refactor
2 parents d7f2423 + 5354706 commit 41e62ac

4 files changed

Lines changed: 102 additions & 100 deletions

File tree

python/fedml/computing/scheduler/comm_utils/constants.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ class SchedulerConstants:
8383
TRAIN_PROVISIONING_TIMEOUT = 60 * 25
8484
TRAIN_STARTING_TIMEOUT = 60 * 15
8585
TRAIN_STOPPING_TIMEOUT = 60 * 5
86-
TRAIN_RUNNING_TIMEOUT = 60 * 60 * 12
86+
TRAIN_RUNNING_TIMEOUT = 60 * 60 * 24 * 2000
8787
TRAIN_INIT_TIMEOUT = 60 * 5
8888

8989
PUBLIC_REDIS_PORT = 6379

python/fedml/computing/scheduler/comm_utils/job_monitor.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -354,15 +354,15 @@ def monitor_slave_run_process_status(self):
354354
# Check if all processes of the specific run are exited
355355
# FIXME: Proactively release the gpu ids when the run processes have not even started yet as the docker
356356
# image is being pulled
357-
run_process_list = client_constants.ClientConstants.get_learning_process_list(job.job_id)
358-
all_run_processes_exited = True if len(run_process_list) <= 0 else False
359-
if all_run_processes_exited:
360-
if not self.released_runs.get(str(job.job_id), False):
361-
self.released_runs[str(job.job_id)] = True
362-
# Release the gpu ids
363-
print(
364-
f"[run/device][{job.job_id}/{job.edge_id}] Release gpu resource when run processes has exited on monioring slave runs periodically.")
365-
JobRunnerUtils.get_instance().release_gpu_ids(job.job_id, job.edge_id)
357+
# run_process_list = client_constants.ClientConstants.get_learning_process_list(job.job_id)
358+
# all_run_processes_exited = True if len(run_process_list) <= 0 else False
359+
# if all_run_processes_exited:
360+
# if not self.released_runs.get(str(job.job_id), False):
361+
# self.released_runs[str(job.job_id)] = True
362+
# # Release the gpu ids
363+
# print(
364+
# f"[run/device][{job.job_id}/{job.edge_id}] Release gpu resource when run processes has exited on monioring slave runs periodically.")
365+
# JobRunnerUtils.get_instance().release_gpu_ids(job.job_id, job.edge_id)
366366

367367
# Get the timeout threshold
368368
timeout_threshold = None
@@ -381,8 +381,9 @@ def monitor_slave_run_process_status(self):
381381

382382
# If the run processes have exited but run status is not completed and
383383
# timeout is out of the range, then release gpu ids and report failed status to the master agent.
384-
if all_run_processes_exited and not SchedulerConstants.is_run_completed(job.status) and \
385-
timeout_threshold is not None and timeout > timeout_threshold:
384+
# if all_run_processes_exited and not SchedulerConstants.is_run_completed(job.status) and \
385+
# timeout_threshold is not None and timeout > timeout_threshold:
386+
if timeout_threshold is not None and timeout > timeout_threshold:
386387
# Report failed status to the master agent
387388
mlops.log_training_failed_status(
388389
run_id=job.job_id, edge_id=job.edge_id, enable_broadcast=True)

python/fedml/computing/scheduler/scheduler_core/status_center.py

Lines changed: 11 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
import logging
22
import time
33

4-
from ..slave.client_constants import ClientConstants
5-
from ..master.server_constants import ServerConstants
64
from enum import Enum, unique
75
import multiprocessing
86
from multiprocessing import Process, Queue
@@ -11,7 +9,6 @@
119
from .message_center import FedMLMessageCenter
1210
import traceback
1311
from .status_manager_protocols import FedMLStatusManager
14-
from .compute_cache_manager import ComputeCacheManager
1512

1613

1714
@unique
@@ -87,11 +84,6 @@ class FedMLStatusCenter(object):
8784

8885
def __init__(self, message_queue=None):
8986
self.status_queue = message_queue
90-
self.job_status_in_slave = dict()
91-
self.entire_job_status = None
92-
self.job_status_in_master = dict()
93-
self.slave_devices_status = dict()
94-
self.master_devices_status = dict()
9587
self.status_center_process = None
9688
self.status_event = None
9789
self.status_sender_message_center_queue = None
@@ -108,50 +100,6 @@ def __repr__(self):
108100
attrs=" ".join("{}={!r}".format(k, v) for k, v in self.__dict__.items()),
109101
)
110102

111-
def add_job_status_in_slave(self, device_id, status):
112-
self.job_status_in_slave[device_id] = self._status_transition(status)
113-
114-
def add_job_status_in_master(self, device_id, status):
115-
self.job_status_in_master[device_id] = self._status_transition(status)
116-
117-
def set_entire_job_status(self, status):
118-
self.entire_job_status = status
119-
120-
def add_slave_device_status(self, device_id, status):
121-
self.slave_devices_status[device_id] = self._status_transition(status)
122-
123-
def add_master_device_status(self, device_id, status):
124-
self.master_devices_status[device_id] = self._status_transition(status)
125-
126-
def get_job_status_in_slave(self, device_id):
127-
return self.job_status_in_slave.get(device_id, None)
128-
129-
def get_job_status_in_master(self, device_id):
130-
return self.job_status_in_master.get(device_id, None)
131-
132-
def get_entire_job_status(self):
133-
return self.entire_job_status
134-
135-
def get_slave_device_status(self, device_id):
136-
return self.slave_devices_status.get(device_id, None)
137-
138-
def get_master_device_status(self, device_id):
139-
return self.master_devices_status.get(device_id, None)
140-
141-
def _status_transition(self, status):
142-
transition_status = status
143-
if self.entire_job_status is not None:
144-
if self.entire_job_status == ServerConstants.MSG_MLOPS_SERVER_STATUS_FAILED or \
145-
self.entire_job_status == ServerConstants.MSG_MLOPS_SERVER_STATUS_FINISHED:
146-
if status == ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED or \
147-
status == ClientConstants.MSG_MLOPS_CLIENT_STATUS_FINISHED or \
148-
status == ClientConstants.MSG_MLOPS_CLIENT_STATUS_KILLED:
149-
transition_status = status
150-
else:
151-
transition_status = ClientConstants.MSG_MLOPS_CLIENT_STATUS_KILLED
152-
153-
return transition_status
154-
155103
def get_status_runner(self):
156104
return None
157105

@@ -205,16 +153,6 @@ def rebuild_message_center(self, message_center_queue):
205153
def rebuild_status_center(self, status_queue):
206154
pass
207155

208-
@staticmethod
209-
def save_job_status(run_id, status):
210-
ComputeCacheManager.get_instance().set_redis_params()
211-
ComputeCacheManager.get_instance().get_status_cache().save_job_status(run_id, status)
212-
213-
@staticmethod
214-
def save_device_status_in_job(run_id, device_id, status):
215-
ComputeCacheManager.get_instance().set_redis_params()
216-
ComputeCacheManager.get_instance().get_status_cache().save_device_status_in_job(run_id, device_id, status)
217-
218156
def run_status_dispatcher(self, status_event, status_queue,
219157
sender_message_center_queue,
220158
listener_message_center_queue):
@@ -272,23 +210,31 @@ def run_status_dispatcher(self, status_event, status_queue,
272210
else:
273211
status_manager_instances[status_entity.run_id].edge_id = status_entity.edge_id
274212

213+
# if the job status is completed then continue
214+
if status_manager_instances[status_entity.run_id].is_job_completed():
215+
continue
216+
275217
# Process the master and slave status.
276218
if message_entity.topic.startswith(FedMLStatusCenter.TOPIC_MASTER_STATUS_PREFIX):
277219
# Process the job status
278220
status_manager_instances[status_entity.run_id].status_center_process_master_status(
279221
message_entity.topic, message_entity.payload)
280222

281223
# Save the job status
282-
FedMLStatusCenter.save_job_status(status_entity.run_id, self.get_entire_job_status())
224+
status_manager_instances[status_entity.run_id].save_job_status()
225+
226+
# Popup the status manager instance when the job status is completed
227+
if status_manager_instances[status_entity.run_id].is_job_completed():
228+
status_manager_instances.pop(status_entity.run_id)
229+
continue
283230

284231
elif message_entity.topic.startswith(FedMLStatusCenter.TOPIC_SLAVE_STATUS_PREFIX):
285232
# Process the slave device status
286233
status_manager_instances[status_entity.run_id].status_center_process_slave_status(
287234
message_entity.topic, message_entity.payload)
288235

289236
# Save the device status in job
290-
FedMLStatusCenter.save_device_status_in_job(status_entity.run_id, status_entity.edge_id,
291-
self.get_job_status_in_slave(status_entity.edge_id))
237+
status_manager_instances[status_entity.run_id].save_device_status_in_job(status_entity.edge_id)
292238

293239
except Exception as e:
294240
if message_entity is not None:

python/fedml/computing/scheduler/scheduler_core/status_manager_protocols.py

Lines changed: 78 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from ..master.server_data_interface import FedMLServerDataInterface
1313
from .message_common import LogArgs
1414
from .general_constants import GeneralConstants
15+
from ..scheduler_core.compute_cache_manager import ComputeCacheManager
1516

1617

1718
class FedMLStatusManager(object):
@@ -33,13 +34,81 @@ def __init__(self, run_id=None, edge_id=None, server_id=None,
3334
self.log_args = LogArgs(role="server", edge_id=self.edge_id,
3435
server_id=self.server_id, log_file_dir=ServerConstants.get_log_file_dir())
3536

37+
self.job_status_in_slave = dict()
38+
self.entire_job_status = None
39+
self.job_status_in_master = dict()
40+
self.slave_devices_status = dict()
41+
self.master_devices_status = dict()
42+
self.completed_job_status_list = [ServerConstants.MSG_MLOPS_SERVER_STATUS_FAILED,
43+
ServerConstants.MSG_MLOPS_SERVER_STATUS_FINISHED,
44+
ServerConstants.MSG_MLOPS_SERVER_STATUS_KILLED]
45+
3646
def __repr__(self):
3747
return "<{klass} @{id:x} {attrs}>".format(
3848
klass=self.__class__.__name__,
3949
id=id(self) & 0xFFFFFF,
4050
attrs=" ".join("{}={!r}".format(k, v) for k, v in self.__dict__.items()),
4151
)
4252

53+
def add_job_status_in_slave(self, device_id, status):
54+
self.job_status_in_slave[device_id] = self._status_transition(status)
55+
56+
def add_job_status_in_master(self, device_id, status):
57+
self.job_status_in_master[device_id] = self._status_transition(status)
58+
59+
def set_entire_job_status(self, status):
60+
self.entire_job_status = status
61+
62+
def add_slave_device_status(self, device_id, status):
63+
self.slave_devices_status[device_id] = self._status_transition(status)
64+
65+
def add_master_device_status(self, run_id, device_id, status):
66+
self.master_devices_status[device_id] = self._status_transition(status)
67+
68+
def get_job_status_in_slave(self, device_id):
69+
return self.job_status_in_slave.get(device_id, None)
70+
71+
def get_job_status_in_master(self, device_id):
72+
return self.job_status_in_master.get(device_id, None)
73+
74+
def get_entire_job_status(self):
75+
return self.entire_job_status
76+
77+
def get_slave_device_status(self, device_id):
78+
return self.slave_devices_status.get(device_id, None)
79+
80+
def get_master_device_status(self, device_id):
81+
return self.master_devices_status.get(device_id, None)
82+
83+
def is_job_completed(self):
84+
if self.entire_job_status and self.entire_job_status in self.completed_job_status_list:
85+
return True
86+
return False
87+
88+
def _status_transition(self, status):
89+
transition_status = status
90+
if self.entire_job_status is not None:
91+
if self.entire_job_status == ServerConstants.MSG_MLOPS_SERVER_STATUS_FAILED or \
92+
self.entire_job_status == ServerConstants.MSG_MLOPS_SERVER_STATUS_FINISHED:
93+
if status == ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED or \
94+
status == ClientConstants.MSG_MLOPS_CLIENT_STATUS_FINISHED or \
95+
status == ClientConstants.MSG_MLOPS_CLIENT_STATUS_KILLED:
96+
transition_status = status
97+
else:
98+
transition_status = ClientConstants.MSG_MLOPS_CLIENT_STATUS_KILLED
99+
100+
return transition_status
101+
102+
def save_job_status(self):
103+
ComputeCacheManager.get_instance().set_redis_params()
104+
ComputeCacheManager.get_instance().get_status_cache().save_job_status(
105+
self.run_id, self.get_entire_job_status())
106+
107+
def save_device_status_in_job(self, device_id):
108+
ComputeCacheManager.get_instance().set_redis_params()
109+
ComputeCacheManager.get_instance().get_status_cache().save_device_status_in_job(
110+
self.run_id, device_id, self.get_job_status_in_slave(device_id))
111+
43112
def process_job_completed_status(self, master_id, status):
44113
# Stop the system performance monitor
45114
try:
@@ -75,10 +144,8 @@ def process_job_completed_status(self, master_id, status):
75144
self.report_deployment_status(self.run_id, GeneralConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_FAILED)
76145

77146
def process_job_exception_status(self, master_id, status):
78-
# Send the exception status to slave devices.
79-
self.report_exception_status(
80-
self.edge_id_list, run_id=self.run_id, server_id=master_id,
81-
status=ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED)
147+
# Report exception job status
148+
self.report_exception_status(status)
82149

83150
# Save the job status to local storage
84151
FedMLServerDataInterface.get_instance().save_job_status(self.run_id, master_id, status, status)
@@ -113,9 +180,9 @@ def status_center_process_master_status(self, topic, payload):
113180

114181
def process_job_status_consensus(self, run_id, master_id, status):
115182
# Set the master status in the job and entire job status
116-
self.status_center.set_entire_job_status(status)
117-
self.status_center.add_job_status_in_master(master_id, status)
118-
status = self.status_center.get_entire_job_status()
183+
self.set_entire_job_status(status)
184+
self.add_job_status_in_master(master_id, status)
185+
status = self.get_entire_job_status()
119186

120187
# Set the device status based on the job status
121188
edge_id_status_dict = self.client_agent_active_list.get(f"{run_id}", {})
@@ -152,8 +219,8 @@ def get_device_consensus_status_in_job(job_status, device_status):
152219
return None
153220

154221
def get_device_consensus_status_in_current_device(self, edge_id, status):
155-
self.status_center.add_job_status_in_slave(edge_id, status)
156-
consensus_status = self.status_center.get_job_status_in_slave(edge_id)
222+
self.add_job_status_in_slave(edge_id, status)
223+
consensus_status = self.get_job_status_in_slave(edge_id)
157224
consensus_status = ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED \
158225
if consensus_status == ClientConstants.MSG_MLOPS_CLIENT_STATUS_EXCEPTION else consensus_status
159226
return consensus_status
@@ -275,25 +342,13 @@ def report_server_status(self, run_id, edge_id, server_id, status):
275342
self.status_reporter.report_server_id_status(
276343
run_id, status, edge_id=edge_id, server_id=server_id, server_agent_id=edge_id, update_db=False)
277344

278-
def report_exception_status(
279-
self, edge_id_list, run_id=0, server_id=None, status=None, payload=None):
280-
if payload is None:
281-
payload_obj = {"runId": run_id, "edgeids": edge_id_list}
282-
if server_id is not None:
283-
payload_obj["serverId"] = server_id
284-
else:
285-
payload_obj = json.loads(payload)
286-
payload_obj["run_status"] = ClientConstants.MSG_MLOPS_CLIENT_STATUS_EXCEPTION if status is None else status
287-
topic_exception = "flserver_agent/" + str(self.edge_id) + "/stop_train"
288-
self.message_reporter.send_message(topic_exception, json.dumps(payload_obj))
345+
def report_exception_status(self, status):
346+
self.status_reporter.report_job_status(self.run_id, status)
289347

290348
def status_center_process_slave_status_to_master_in_slave_agent(self, topic, payload):
291349
# Forward the status message to the sender queue of message center.
292350
self.message_center.send_message(topic, payload)
293351

294-
# Post the status message to the listener queue of message center
295-
#self.message_center.receive_message(GeneralConstants.MSG_TOPIC_REPORT_DEVICE_STATUS_IN_JOB, payload)
296-
297352
def status_center_process_slave_status_to_mlops_in_slave_agent(self, topic, payload):
298353
# Forward the status message to message center.
299354
self.message_center.send_message(topic, payload)

0 commit comments

Comments
 (0)