Skip to content

Commit e399121

Browse files
authored
Even more efficient httpx (#206)
* Make async methods and blob_client use httpx client objects more efficiently. --------- Co-authored-by: Raymond Wiker <rayw@equinor.com>
1 parent 5dde6f6 commit e399121

2 files changed

Lines changed: 83 additions & 51 deletions

File tree

src/sumo/wrapper/_blob_client.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,16 @@
1-
import httpx
2-
31
from ._decorators import (
42
raise_for_status,
53
raise_for_status_async,
64
)
75

8-
from ._retry_strategy import RetryStrategy
9-
106

117
class BlobClient:
128
"""Upload blobs to blob store using pre-authorized URLs"""
139

14-
def __init__(self, retry_strategy=RetryStrategy()):
10+
def __init__(self, client, async_client, timeout, retry_strategy):
11+
self._client = client
12+
self._async_client = client
13+
self._timeout = timeout
1514
self._retry_strategy = retry_strategy
1615
return
1716

@@ -30,7 +29,9 @@ def upload_blob(self, blob: bytes, url: str):
3029
}
3130

3231
def _put():
33-
return httpx.put(url, content=blob, headers=headers)
32+
return self._client.put(
33+
url, content=blob, headers=headers, timeout=self._timeout
34+
)
3435

3536
retryer = self._retry_strategy.make_retryer()
3637

@@ -51,8 +52,9 @@ async def upload_blob_async(self, blob: bytes, url: str):
5152
}
5253

5354
async def _put():
54-
async with httpx.AsyncClient() as client:
55-
return await client.put(url=url, content=blob, headers=headers)
55+
return await self._async_client.put(
56+
url=url, content=blob, headers=headers, timeout=self._timeout
57+
)
5658

5759
retryer = self._retry_strategy.make_retryer_async()
5860

src/sumo/wrapper/sumo_client.py

Lines changed: 73 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import logging
2-
2+
import asyncio
33
import httpx
4-
54
import jwt
65

76
from ._blob_client import BlobClient
@@ -18,7 +17,7 @@
1817

1918
logger = logging.getLogger("sumo.wrapper")
2019

21-
DEFAULT_TIMEOUT = httpx.Timeout(20.0)
20+
DEFAULT_TIMEOUT = httpx.Timeout(30.0)
2221

2322

2423
class SumoClient:
@@ -32,6 +31,7 @@ def __init__(
3231
devicecode: bool = False,
3332
verbosity: str = "CRITICAL",
3433
retry_strategy=RetryStrategy(),
34+
timeout=DEFAULT_TIMEOUT,
3535
):
3636
"""Initialize a new Sumo object
3737
@@ -49,8 +49,10 @@ def __init__(
4949
raise ValueError(f"Invalid environment: {env}")
5050

5151
self._retry_strategy = retry_strategy
52-
self._client = httpx
53-
self._blob_client = BlobClient(retry_strategy)
52+
self._client = httpx.Client(follow_redirects=True)
53+
self._async_client = httpx.AsyncClient(follow_redirects=True)
54+
55+
self._timeout = timeout
5456

5557
access_token = None
5658
refresh_token = None
@@ -94,12 +96,39 @@ def __init__(
9496
return
9597

9698
def __enter__(self):
97-
self._client = httpx.Client()
9899
return self
99100

100101
def __exit__(self, exc_type, exc_value, traceback):
101102
self._client.close()
102-
return
103+
self._client = None
104+
return False
105+
106+
async def __aenter__(self):
107+
return self
108+
109+
async def __aexit__(self, exc_type, exc_value, traceback):
110+
await self._async_client.aclose()
111+
self._async_client = None
112+
return False
113+
114+
def __del__(self):
115+
if self._client is not None:
116+
self._client.close()
117+
self._client = None
118+
pass
119+
if self._async_client is not None:
120+
121+
async def closeit(client):
122+
await client.aclose()
123+
return
124+
125+
try:
126+
loop = asyncio.get_running_loop()
127+
loop.create_task(closeit(self._async_client))
128+
except RuntimeError:
129+
pass
130+
self._async_client = None
131+
pass
103132

104133
def authenticate(self):
105134
if self.auth is None:
@@ -126,7 +155,12 @@ def blob_client(self) -> BlobClient:
126155
await sumo.blob_client.upload_blob_async(blob, blob_url)
127156
"""
128157

129-
return self._blob_client
158+
return BlobClient(
159+
self._client,
160+
self._async_client,
161+
self._timeout,
162+
self._retry_strategy,
163+
)
130164

131165
@raise_for_status
132166
def get(self, path: str, params: dict = None) -> dict:
@@ -169,7 +203,7 @@ def _get():
169203
params=params,
170204
headers=headers,
171205
follow_redirects=True,
172-
timeout=DEFAULT_TIMEOUT,
206+
timeout=self._timeout,
173207
)
174208

175209
retryer = self._retry_strategy.make_retryer()
@@ -244,7 +278,7 @@ def _post():
244278
json=json,
245279
headers=headers,
246280
params=params,
247-
timeout=DEFAULT_TIMEOUT,
281+
timeout=self._timeout,
248282
)
249283

250284
retryer = self._retry_strategy.make_retryer()
@@ -290,7 +324,7 @@ def _put():
290324
content=blob,
291325
json=json,
292326
headers=headers,
293-
timeout=DEFAULT_TIMEOUT,
327+
timeout=self._timeout,
294328
)
295329

296330
retryer = self._retry_strategy.make_retryer()
@@ -328,7 +362,7 @@ def _delete():
328362
f"{self.base_url}{path}",
329363
headers=headers,
330364
params=params,
331-
timeout=DEFAULT_TIMEOUT,
365+
timeout=self._timeout,
332366
)
333367

334368
retryer = self._retry_strategy.make_retryer()
@@ -389,13 +423,12 @@ async def get_async(self, path: str, params: dict = None):
389423
headers.update(self.auth.get_authorization())
390424

391425
async def _get():
392-
async with httpx.AsyncClient(follow_redirects=True) as client:
393-
return await client.get(
394-
f"{self.base_url}{path}",
395-
params=params,
396-
headers=headers,
397-
timeout=DEFAULT_TIMEOUT,
398-
)
426+
return await self._async_client.get(
427+
f"{self.base_url}{path}",
428+
params=params,
429+
headers=headers,
430+
timeout=self._timeout,
431+
)
399432

400433
retryer = self._retry_strategy.make_retryer_async()
401434

@@ -464,15 +497,14 @@ async def post_async(
464497
headers.update(self.auth.get_authorization())
465498

466499
async def _post():
467-
async with httpx.AsyncClient() as client:
468-
return await client.post(
469-
url=f"{self.base_url}{path}",
470-
content=blob,
471-
json=json,
472-
headers=headers,
473-
params=params,
474-
timeout=DEFAULT_TIMEOUT,
475-
)
500+
return await self._async_client.post(
501+
url=f"{self.base_url}{path}",
502+
content=blob,
503+
json=json,
504+
headers=headers,
505+
params=params,
506+
timeout=self._timeout,
507+
)
476508

477509
retryer = self._retry_strategy.make_retryer_async()
478510

@@ -512,14 +544,13 @@ async def put_async(
512544
headers.update(self.auth.get_authorization())
513545

514546
async def _put():
515-
async with httpx.AsyncClient() as client:
516-
return await client.put(
517-
url=f"{self.base_url}{path}",
518-
content=blob,
519-
json=json,
520-
headers=headers,
521-
timeout=DEFAULT_TIMEOUT,
522-
)
547+
return await self._async_client.put(
548+
url=f"{self.base_url}{path}",
549+
content=blob,
550+
json=json,
551+
headers=headers,
552+
timeout=self._timeout,
553+
)
523554

524555
retryer = self._retry_strategy.make_retryer_async()
525556

@@ -552,13 +583,12 @@ async def delete_async(self, path: str, params: dict = None) -> dict:
552583
headers.update(self.auth.get_authorization())
553584

554585
async def _delete():
555-
async with httpx.AsyncClient() as client:
556-
return await client.delete(
557-
url=f"{self.base_url}{path}",
558-
headers=headers,
559-
params=params,
560-
timeout=DEFAULT_TIMEOUT,
561-
)
586+
return await self._async_client.delete(
587+
url=f"{self.base_url}{path}",
588+
headers=headers,
589+
params=params,
590+
timeout=self._timeout,
591+
)
562592

563593
retryer = self._retry_strategy.make_retryer_async()
564594

0 commit comments

Comments
 (0)