Skip to content

Commit c151831

Browse files
fedml-dimitrisRaphael-Jin
authored andcommitted
Adding hash set for counting the number of pending requests per endpoint.
1 parent 6b33065 commit c151831

2 files changed

Lines changed: 24 additions & 23 deletions

File tree

python/fedml/computing/scheduler/model_scheduler/device_model_cache.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -974,20 +974,21 @@ def delete_endpoint_scaling_down_decision_time(self, end_point_id) -> bool:
974974
self.FEDML_MODEL_ENDPOINT_SCALING_DOWN_DECISION_TIME_TAG,
975975
end_point_id))
976976

977-
def get_pending_requests_counter(self) -> int:
978-
if not self.redis_connection.exists(self.FEDML_PENDING_REQUESTS_COUNTER):
979-
self.redis_connection.set(self.FEDML_PENDING_REQUESTS_COUNTER, 0)
980-
return int(self.redis_connection.get(self.FEDML_PENDING_REQUESTS_COUNTER))
981-
982-
def update_pending_requests_counter(self, increase=False, decrease=False) -> int:
983-
if not self.redis_connection.exists(self.FEDML_PENDING_REQUESTS_COUNTER):
984-
self.redis_connection.set(self.FEDML_PENDING_REQUESTS_COUNTER, 0)
977+
def get_pending_requests_counter(self, end_point_id) -> int:
978+
# If the endpoint does not exist inside the Hash collection, set its counter to 0.
979+
if self.redis_connection.hexists(self.FEDML_PENDING_REQUESTS_COUNTER, end_point_id):
980+
return int(self.redis_connection.hget(self.FEDML_PENDING_REQUESTS_COUNTER, end_point_id))
981+
return 0
982+
983+
def update_pending_requests_counter(self, end_point_id, increase=False, decrease=False) -> int:
984+
if not self.redis_connection.hexists(self.FEDML_PENDING_REQUESTS_COUNTER, end_point_id):
985+
self.redis_connection.hset(self.FEDML_PENDING_REQUESTS_COUNTER, mapping={end_point_id: 0})
985986
if increase:
986-
self.redis_connection.incr(self.FEDML_PENDING_REQUESTS_COUNTER)
987+
self.redis_connection.hincrby(self.FEDML_PENDING_REQUESTS_COUNTER, end_point_id, 1)
987988
if decrease:
989+
# Careful on the negative, there is no native function for hash decreases.
990+
self.redis_connection.hincrby(self.FEDML_PENDING_REQUESTS_COUNTER, end_point_id, -1)
988991
# Making sure the counter never becomes negative!
989-
if self.get_pending_requests_counter() < 0:
990-
self.redis_connection.set(self.FEDML_PENDING_REQUESTS_COUNTER, 0)
991-
else:
992-
self.redis_connection.decr(self.FEDML_PENDING_REQUESTS_COUNTER)
993-
return self.get_pending_requests_counter()
992+
if self.get_pending_requests_counter(end_point_id) < 0:
993+
self.redis_connection.hset(self.FEDML_PENDING_REQUESTS_COUNTER, mapping={end_point_id: 0})
994+
return self.get_pending_requests_counter(end_point_id)

python/fedml/computing/scheduler/model_scheduler/device_model_inference.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,10 @@ async def auth_middleware(request: Request, call_next):
5555
{"error": True, "message": "Invalid JSON."},
5656
status_code=status.HTTP_400_BAD_REQUEST)
5757

58-
# Get total pending requests.
59-
pending_requests_num = FEDML_MODEL_CACHE.get_pending_requests_counter()
58+
# Get endpoint's total pending requests.
59+
end_point_id = request_json.get("end_point_id", None)
60+
pending_requests_num = FEDML_MODEL_CACHE.get_pending_requests_counter(end_point_id)
6061
if pending_requests_num:
61-
end_point_id = request_json.get("end_point_id", None)
6262
# Fetch metrics of the past k=3 requests.
6363
pask_k_metrics = FEDML_MODEL_CACHE.get_endpoint_metrics(
6464
end_point_id=end_point_id,
@@ -173,7 +173,7 @@ async def _predict(
173173
header=None
174174
) -> Union[MutableMapping[str, Any], Response, StreamingResponse]:
175175
# Always increase the pending requests counter on a new incoming request.
176-
FEDML_MODEL_CACHE.update_pending_requests_counter(increase=True)
176+
FEDML_MODEL_CACHE.update_pending_requests_counter(end_point_id, increase=True)
177177
inference_response = {}
178178

179179
try:
@@ -205,14 +205,14 @@ async def _predict(
205205
if not is_endpoint_activated(in_end_point_id):
206206
inference_response = {"error": True, "message": "endpoint is not activated."}
207207
logging_inference_request(input_json, inference_response)
208-
FEDML_MODEL_CACHE.update_pending_requests_counter(decrease=True)
208+
FEDML_MODEL_CACHE.update_pending_requests_counter(end_point_id, decrease=True)
209209
return inference_response
210210

211211
# Found idle inference device
212212
idle_device, end_point_id, model_id, model_name, model_version, inference_host, inference_output_url = \
213213
found_idle_inference_device(in_end_point_id, in_end_point_name, in_model_name, in_model_version)
214214
if idle_device is None or idle_device == "":
215-
FEDML_MODEL_CACHE.update_pending_requests_counter(decrease=True)
215+
FEDML_MODEL_CACHE.update_pending_requests_counter(end_point_id, decrease=True)
216216
return {"error": True, "error_code": status.HTTP_404_NOT_FOUND,
217217
"message": "can not found active inference worker for this endpoint."}
218218

@@ -252,18 +252,18 @@ async def _predict(
252252
pass
253253

254254
logging_inference_request(input_json, inference_response)
255-
FEDML_MODEL_CACHE.update_pending_requests_counter(decrease=True)
255+
FEDML_MODEL_CACHE.update_pending_requests_counter(end_point_id, decrease=True)
256256
return inference_response
257257
else:
258258
inference_response = {"error": True, "message": "token is not valid."}
259259
logging_inference_request(input_json, inference_response)
260-
FEDML_MODEL_CACHE.update_pending_requests_counter(decrease=True)
260+
FEDML_MODEL_CACHE.update_pending_requests_counter(end_point_id, decrease=True)
261261
return inference_response
262262

263263
except Exception as e:
264264
logging.error("Inference Exception: {}".format(traceback.format_exc()))
265265
# Need to reduce the pending requests counter in whatever exception that may be raised.
266-
FEDML_MODEL_CACHE.update_pending_requests_counter(decrease=True)
266+
FEDML_MODEL_CACHE.update_pending_requests_counter(end_point_id, decrease=True)
267267

268268

269269
def retrieve_info_by_endpoint_id(end_point_id, in_end_point_name=None, in_model_name=None,

0 commit comments

Comments
 (0)