|
| 1 | + |
| 2 | +from multiprocessing import Process |
| 3 | +from ..comm_utils import sys_utils |
| 4 | +from ..comm_utils.job_cleanup import JobCleanup |
| 5 | +from ....core.mlops import MLOpsRuntimeLog, MLOpsMetrics |
| 6 | +from ..scheduler_core.master_api_daemon import MasterApiDaemon |
| 7 | +from ..scheduler_core.account_manager import FedMLAccountManager |
| 8 | +from ..scheduler_core.general_constants import GeneralConstants |
| 9 | +from abc import ABC, abstractmethod |
| 10 | + |
| 11 | + |
| 12 | +class FedMLBaseMasterAgent(ABC): |
| 13 | + |
| 14 | + def __init__(self): |
| 15 | + self.agent_args = None |
| 16 | + self.master_api_daemon = None |
| 17 | + self.master_api_process = None |
| 18 | + self.mlops_metrics = MLOpsMetrics() |
| 19 | + self.status_reporter = None |
| 20 | + self.enable_simulation_cloud_agent = True |
| 21 | + self.use_local_process_as_cloud_server = False |
| 22 | + self.protocol_mgr = None |
| 23 | + |
| 24 | + def login( |
| 25 | + self, user_id, api_key=None, device_id=None, |
| 26 | + os_name=None, role=None |
| 27 | + ): |
| 28 | + # Login account |
| 29 | + login_result = FedMLAccountManager.get_instance().login( |
| 30 | + user_id, api_key=api_key, device_id=device_id, |
| 31 | + os_name=os_name, role=role |
| 32 | + ) |
| 33 | + if login_result is not None: |
| 34 | + self.agent_args = login_result |
| 35 | + else: |
| 36 | + return None |
| 37 | + |
| 38 | + # Save the bound info |
| 39 | + self._save_agent_info( |
| 40 | + login_result.current_device_id + "." + login_result.os_name, login_result.edge_id) |
| 41 | + |
| 42 | + # Init the logs for protocol manager |
| 43 | + self._init_logs(login_result, login_result.edge_id) |
| 44 | + |
| 45 | + # Create the protocol manager to communicate with the slave agents and MLOps. |
| 46 | + self._create_protocol_manager(role, login_result) |
| 47 | + |
| 48 | + # Initialize the protocol manager |
| 49 | + # noinspection PyBoardException |
| 50 | + try: |
| 51 | + self._initialize_protocol_manager() |
| 52 | + except Exception as e: |
| 53 | + FedMLAccountManager.write_login_failed_file(is_client=False) |
| 54 | + self.protocol_mgr.stop() |
| 55 | + raise e |
| 56 | + |
| 57 | + # Start the protocol manager to process the messages from MLOps and slave agents. |
| 58 | + self.protocol_mgr.start() |
| 59 | + |
| 60 | + @staticmethod |
| 61 | + def logout(): |
| 62 | + GeneralConstants.cleanup_run_process(None, is_master=True) |
| 63 | + sys_utils.cleanup_all_fedml_server_api_processes() |
| 64 | + |
| 65 | + def _create_protocol_manager(self, role, login_result): |
| 66 | + if self.protocol_mgr is not None: |
| 67 | + return |
| 68 | + self.protocol_mgr = self._generate_protocol_manager_instance( |
| 69 | + login_result, agent_config=login_result.agent_config) |
| 70 | + self.protocol_mgr.run_as_edge_server_and_agent = True \ |
| 71 | + if role == FedMLAccountManager.ROLE_EDGE_SERVER else False |
| 72 | + self.protocol_mgr.run_as_cloud_agent = True if role == FedMLAccountManager.ROLE_CLOUD_AGENT else False |
| 73 | + self.protocol_mgr.run_as_cloud_server = True if role == FedMLAccountManager.ROLE_CLOUD_SERVER else False |
| 74 | + self.protocol_mgr.args = login_result |
| 75 | + self.protocol_mgr.edge_id = login_result.edge_id |
| 76 | + self.protocol_mgr.unique_device_id = login_result.unique_device_id |
| 77 | + self.protocol_mgr.user_name = login_result.user_name |
| 78 | + self.protocol_mgr.agent_config = login_result.agent_config |
| 79 | + self.protocol_mgr.enable_simulation_cloud_agent = self.enable_simulation_cloud_agent |
| 80 | + self.protocol_mgr.use_local_process_as_cloud_server = self.use_local_process_as_cloud_server |
| 81 | + |
| 82 | + def _initialize_protocol_manager(self): |
| 83 | + # Init local database |
| 84 | + self._init_database() |
| 85 | + |
| 86 | + # Initialize the master protocol |
| 87 | + self.protocol_mgr.initialize() |
| 88 | + |
| 89 | + # Report the IDLE status to MLOps |
| 90 | + self.mlops_metrics.report_server_training_status( |
| 91 | + None, GeneralConstants.MSG_MLOPS_SERVER_STATUS_IDLE, edge_id=self.agent_args.edge_id) |
| 92 | + |
| 93 | + # Cleanup data when startup |
| 94 | + JobCleanup.get_instance().sync_data_on_startup(self.agent_args.edge_id, is_client=False) |
| 95 | + |
| 96 | + # Start the API server on master agent |
| 97 | + self.master_api_daemon = MasterApiDaemon() |
| 98 | + self.master_api_process = Process(target=self.master_api_daemon.run) |
| 99 | + self.master_api_process.start() |
| 100 | + |
| 101 | + def _init_logs(self, agent_args, edge_id): |
| 102 | + # Init runtime logs |
| 103 | + in_args = agent_args |
| 104 | + in_args.log_file_dir = self._get_log_file_dir() |
| 105 | + in_args.run_id = 0 |
| 106 | + in_args.role = "server" |
| 107 | + in_args.edge_id = edge_id |
| 108 | + in_args.using_mlops = True |
| 109 | + in_args.server_agent_id = edge_id |
| 110 | + MLOpsRuntimeLog.get_instance(in_args).init_logs() |
| 111 | + |
| 112 | + @abstractmethod |
| 113 | + def _get_log_file_dir(self): |
| 114 | + pass |
| 115 | + |
| 116 | + @abstractmethod |
| 117 | + def _save_agent_info(self, unique_device_id, edge_id): |
| 118 | + pass |
| 119 | + |
| 120 | + @abstractmethod |
| 121 | + def _init_database(self): |
| 122 | + pass |
| 123 | + |
| 124 | + @abstractmethod |
| 125 | + def _generate_protocol_manager_instance(self, args, agent_config=None): |
| 126 | + return None |
0 commit comments