Skip to content

Commit 9d39016

Browse files
authored
fix: stop destroying connection pool after every sync request (#270)
* fix: stop destroying connection pool after every sync request * fix: ruff lint * fix: address comments * fix: streaming preloadcontent
1 parent c4dab0b commit 9d39016

File tree

7 files changed

+350
-23
lines changed

7 files changed

+350
-23
lines changed

openfga_sdk/rest.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import io
22
import json
33
import logging
4-
import re
54
import ssl
65
import urllib
76

@@ -221,7 +220,7 @@ async def build_request(
221220
args["url"] = f"{url}?{encoded_qs}"
222221

223222
if method in ["POST", "PUT", "PATCH", "OPTIONS", "DELETE"]:
224-
if re.search("json", headers["Content-Type"], re.IGNORECASE):
223+
if "json" in headers["Content-Type"].lower():
225224
if body is not None:
226225
body = json.dumps(body)
227226
args["data"] = body
@@ -397,11 +396,15 @@ async def stream(
397396
if isinstance(response, aiohttp.ClientResponse):
398397
logger.debug("response body: %s", buffer.decode("utf-8"))
399398

400-
# Handle any HTTP errors that may have occurred
401-
await self.handle_response_exception(response)
402-
403-
# Release the response object (required!)
404-
response.release()
399+
try:
400+
# Handle any HTTP errors that may have occurred
401+
await self.handle_response_exception(response)
402+
finally:
403+
# Release the response object back to the connection pool.
404+
# This must always run, even if handle_response_exception raises,
405+
# to avoid leaking the connection (preload_content=False means
406+
# the connection is not auto-released).
407+
response.release()
405408

406409
async def request(
407410
self,

openfga_sdk/sync/api_client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ def __exit__(self, exc_type, exc_value, traceback):
109109
self.close()
110110

111111
def close(self):
112+
self.rest_client.close()
112113
if self._pool:
113114
self._pool.close()
114115
self._pool.join()

openfga_sdk/sync/rest.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import io
22
import json
33
import logging
4-
import re
54
import ssl
65
import urllib
76

@@ -279,7 +278,7 @@ def build_request(
279278

280279
# Handle body/post_params for methods that send payloads
281280
if method in ["POST", "PUT", "PATCH", "OPTIONS", "DELETE"]:
282-
if re.search("json", headers["Content-Type"], re.IGNORECASE):
281+
if "json" in headers["Content-Type"].lower():
283282
if body is not None:
284283
body = json.dumps(body)
285284
args["body"] = body
@@ -437,11 +436,15 @@ def stream(
437436
except json.JSONDecodeError:
438437
logger.debug("Incomplete leftover data at end of stream.")
439438

440-
# Handle any HTTP errors that may have occurred
441-
self.handle_response_exception(response)
442-
443-
# Release the response object (required!)
444-
response.release_conn()
439+
try:
440+
# Handle any HTTP errors that may have occurred
441+
self.handle_response_exception(response)
442+
finally:
443+
# Release the response object back to the connection pool.
444+
# This must always run, even if handle_response_exception raises,
445+
# to avoid leaking the connection (preload_content=False means
446+
# urllib3 does not auto-release).
447+
response.release_conn()
445448

446449
def request(
447450
self,
@@ -494,10 +497,13 @@ def request(
494497
# Log the response body
495498
logger.debug("response body: %s", wrapped_response.data.decode("utf-8"))
496499

497-
# Handle any errors that may have occurred
498-
self.handle_response_exception(raw_response)
499-
500-
# Release the connection back to the pool
501-
self.close()
500+
# Handle any errors that may have occurred. If an exception is raised,
501+
# ensure the underlying response is closed so the connection is not
502+
# leaked from the pool.
503+
try:
504+
self.handle_response_exception(raw_response)
505+
except Exception:
506+
raw_response.close()
507+
raise
502508

503509
return wrapped_response or raw_response

openfga_sdk/telemetry/attributes.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -241,8 +241,9 @@ def fromRequest(
241241
_attributes[TelemetryAttributes.http_request_method] = http_method
242242

243243
if url is not None:
244-
_hostname = urllib.parse.urlparse(url).hostname
245-
_scheme = urllib.parse.urlparse(url).scheme
244+
_parsed_url = urllib.parse.urlparse(url)
245+
_hostname = _parsed_url.hostname
246+
_scheme = _parsed_url.scheme
246247

247248
if type(_hostname) is str:
248249
_attributes[TelemetryAttributes.http_host] = _hostname

openfga_sdk/validation.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import re
22

33

4+
_ULID_REGEX = re.compile("^[0-7][0-9A-HJKMNP-TV-Z]{25}$")
5+
6+
47
def is_well_formed_ulid_string(ulid):
5-
regex = re.compile("^[0-7][0-9A-HJKMNP-TV-Z]{25}$")
68
if not isinstance(ulid, str):
79
return False
8-
match = regex.match(ulid)
10+
match = _ULID_REGEX.match(ulid)
911
if match is None:
1012
return False
1113
return True

test/rest_test.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,3 +428,98 @@ async def iter_chunks(self):
428428

429429
client.handle_response_exception.assert_awaited_once()
430430
mock_response.release.assert_called_once()
431+
432+
433+
@pytest.mark.asyncio
434+
async def test_stream_releases_conn_on_error_status():
435+
"""Ensure release() is called even when handle_response_exception raises,
436+
so the connection is returned to the pool and not leaked."""
437+
mock_config = MagicMock()
438+
mock_config.ssl_ca_cert = None
439+
mock_config.cert_file = None
440+
mock_config.key_file = None
441+
mock_config.verify_ssl = True
442+
mock_config.connection_pool_maxsize = 4
443+
mock_config.proxy = None
444+
mock_config.proxy_headers = None
445+
mock_config.timeout_millisec = 5000
446+
447+
client = RESTClientObject(configuration=mock_config)
448+
mock_session = MagicMock()
449+
client.pool_manager = mock_session
450+
451+
class FakeContent:
452+
async def iter_chunks(self):
453+
yield (b'{"ok":true}\n', None)
454+
455+
mock_response = MagicMock()
456+
mock_response.status = 500
457+
mock_response.reason = "Internal Server Error"
458+
mock_response.data = None
459+
mock_response.content = FakeContent()
460+
461+
mock_context_manager = AsyncMock()
462+
mock_context_manager.__aenter__.return_value = mock_response
463+
mock_context_manager.__aexit__.return_value = None
464+
465+
mock_session.request.return_value = mock_context_manager
466+
467+
# Make handle_response_exception raise an exception
468+
client.handle_response_exception = AsyncMock(
469+
side_effect=ServiceException(status=500, reason="Internal Server Error")
470+
)
471+
client.close = AsyncMock()
472+
473+
results = []
474+
with pytest.raises(ServiceException):
475+
async for item in client.stream("GET", "http://example.com"):
476+
results.append(item)
477+
478+
# The critical assertion: release() must be called even though
479+
# handle_response_exception raised ServiceException
480+
mock_response.release.assert_called_once()
481+
482+
483+
@pytest.mark.asyncio
484+
async def test_stream_releases_conn_on_success():
485+
"""Ensure release() is called on successful stream completion."""
486+
mock_config = MagicMock()
487+
mock_config.ssl_ca_cert = None
488+
mock_config.cert_file = None
489+
mock_config.key_file = None
490+
mock_config.verify_ssl = True
491+
mock_config.connection_pool_maxsize = 4
492+
mock_config.proxy = None
493+
mock_config.proxy_headers = None
494+
mock_config.timeout_millisec = 5000
495+
496+
client = RESTClientObject(configuration=mock_config)
497+
mock_session = MagicMock()
498+
client.pool_manager = mock_session
499+
500+
class FakeContent:
501+
async def iter_chunks(self):
502+
yield (b'{"ok":true}\n', None)
503+
504+
mock_response = MagicMock()
505+
mock_response.status = 200
506+
mock_response.reason = "OK"
507+
mock_response.data = None
508+
mock_response.content = FakeContent()
509+
510+
mock_context_manager = AsyncMock()
511+
mock_context_manager.__aenter__.return_value = mock_response
512+
mock_context_manager.__aexit__.return_value = None
513+
514+
mock_session.request.return_value = mock_context_manager
515+
516+
client.handle_response_exception = AsyncMock()
517+
client.close = AsyncMock()
518+
519+
results = []
520+
async for item in client.stream("GET", "http://example.com"):
521+
results.append(item)
522+
523+
assert results == [{"ok": True}]
524+
client.handle_response_exception.assert_awaited_once()
525+
mock_response.release.assert_called_once()

0 commit comments

Comments
 (0)