|
2 | 2 | import logging |
3 | 3 |
|
4 | 4 | from collections import namedtuple |
5 | | -from fedml.computing.scheduler.model_scheduler.autoscaler.autoscaler import Autoscaler, ReactivePolicy |
| 5 | +from fedml.computing.scheduler.model_scheduler.autoscaler.autoscaler import Autoscaler |
6 | 6 | from fedml.core.mlops.mlops_runtime_log import MLOpsRuntimeLog |
7 | 7 | from fedml.computing.scheduler.model_scheduler.device_model_cache import FedMLModelCache |
| 8 | +from fedml.computing.scheduler.model_scheduler.autoscaler.policies import ConcurrentQueryPolicy |
8 | 9 |
|
9 | 10 |
|
10 | 11 | if __name__ == "__main__": |
|
18 | 19 | parser.add_argument('--redis_addr', default="local") |
19 | 20 | parser.add_argument('--redis_port', default=6379) |
20 | 21 | parser.add_argument('--redis_password', default="fedml_default") |
21 | | - parser.add_argument('--metric', |
22 | | - default="latency", |
23 | | - help="Either latency or qps") |
24 | 22 | args = parser.parse_args() |
25 | 23 |
|
26 | 24 | fedml_model_cache = FedMLModelCache.get_instance() |
|
32 | 30 | # Init the autoscaler |
33 | 31 | autoscaler = Autoscaler(args.redis_addr, args.redis_port, args.redis_password) |
34 | 32 |
|
35 | | - latency_reactive_policy_default = { |
36 | | - "metric": "latency", |
37 | | - "ewm_mins": 15, |
38 | | - "ewm_alpha": 0.5, |
39 | | - "ub_threshold": 0.5, |
40 | | - "lb_threshold": 0.99, |
41 | | - "triggering_value": 1.6561916828471053 |
| 33 | + autoscaling_policy_config = { |
| 34 | + "current_replicas": 1, |
| 35 | + "min_replicas": 1, |
| 36 | + "max_replicas": 3, |
| 37 | + "queries_per_replica": 2, |
| 38 | + "window_size_secs": 60, |
| 39 | + "scaledown_delay_secs": 120, |
42 | 40 | } |
43 | | - qps_reactive_policy_default = { |
44 | | - "metric": "qps", |
45 | | - "ewm_mins": 15, |
46 | | - "ewm_alpha": 0.5, |
47 | | - "ub_threshold": 2, |
48 | | - "lb_threshold": 0.5 |
49 | | - } |
50 | | - policy_config = latency_reactive_policy_default \ |
51 | | - if args.metric == "latency" else qps_reactive_policy_default |
52 | | - autoscaling_policy = ReactivePolicy(**policy_config) |
53 | | - |
54 | | - for endpoint_settings in endpoints_settings_list: |
55 | | - endpoint_state = endpoint_settings["state"] |
56 | | - if endpoint_state == "DEPLOYED" and endpoint_settings["enable_auto_scaling"]: |
57 | | - |
58 | | - e_id, e_name, model_name = \ |
59 | | - endpoint_settings["endpoint_id"], \ |
60 | | - endpoint_settings["endpoint_name"], \ |
61 | | - endpoint_settings["model_name"] |
62 | | - logging.info(f"Querying the autoscaler for endpoint {e_id} with user settings {endpoint_settings}.") |
63 | | - |
64 | | - # For every endpoint we just update the policy configuration. |
65 | | - autoscaling_policy.min_replicas = endpoint_settings["scale_min"] |
66 | | - autoscaling_policy.max_replicas = endpoint_settings["scale_max"] |
67 | | - # We retrieve a list of replicas for every endpoint. The number |
68 | | - # of running replicas is the length of that list. |
69 | | - current_replicas = len(fedml_model_cache.get_endpoint_replicas_results(e_id)) |
70 | | - autoscaling_policy.current_replicas = current_replicas |
71 | | - logging.info(f"Endpoint {e_id} autoscaling policy: {autoscaling_policy}.") |
72 | | - |
73 | | - scale_op = autoscaler.scale_operation_endpoint( |
74 | | - autoscaling_policy, |
75 | | - str(e_id)) |
76 | | - |
77 | | - new_replicas = current_replicas + scale_op.value |
| 41 | + autoscaling_policy = ConcurrentQueryPolicy(**autoscaling_policy_config) |
78 | 42 |
|
79 | | - logging.info(f"Scaling operation {scale_op.value} for endpoint {e_id} .") |
80 | | - logging.info(f"New Replicas {new_replicas} for endpoint {e_id} .") |
81 | | - logging.info(f"Current Replicas {current_replicas} for endpoint {e_id} .") |
| 43 | + e_id = 1821952311 |
| 44 | + scale_op = autoscaler.scale_operation_endpoint( |
| 45 | + autoscaling_policy, |
| 46 | + str(e_id)) |
| 47 | + logging.info(f"Scaling operation {scale_op.value} for endpoint {e_id} .") |
0 commit comments