11"""PEP 249 Connection implementation for dqlite."""
22
33import asyncio
4+ import threading
45from typing import Any
56
67from dqliteclient import DqliteConnection
@@ -31,17 +32,30 @@ def __init__(
3132 self ._async_conn : DqliteConnection | None = None
3233 self ._closed = False
3334 self ._loop : asyncio .AbstractEventLoop | None = None
35+ self ._thread : threading .Thread | None = None
3436
35- def _get_loop (self ) -> asyncio .AbstractEventLoop :
36- if self ._loop is None :
37- try :
38- self ._loop = asyncio .get_running_loop ()
39- except RuntimeError :
40- # No running loop, create a new one
41- self ._loop = asyncio .new_event_loop ()
42- asyncio .set_event_loop (self ._loop )
37+ def _ensure_loop (self ) -> asyncio .AbstractEventLoop :
38+ """Ensure a dedicated event loop is running in a background thread.
39+
40+ This allows sync methods to work even when called from within
41+ an already-running async context (e.g. uvicorn).
42+ """
43+ if self ._loop is None or self ._loop .is_closed ():
44+ self ._loop = asyncio .new_event_loop ()
45+ self ._thread = threading .Thread (target = self ._loop .run_forever , daemon = True )
46+ self ._thread .start ()
4347 return self ._loop
4448
49+ def _run_sync (self , coro : Any ) -> Any :
50+ """Run an async coroutine from sync code.
51+
52+ Submits the coroutine to the dedicated background event loop
53+ and blocks until the result is available.
54+ """
55+ loop = self ._ensure_loop ()
56+ future = asyncio .run_coroutine_threadsafe (coro , loop )
57+ return future .result ()
58+
4559 async def _get_async_connection (self ) -> DqliteConnection :
4660 """Get or create the underlying async connection."""
4761 if self ._closed :
@@ -64,9 +78,15 @@ async def _get_async_connection(self) -> DqliteConnection:
6478 def close (self ) -> None :
6579 """Close the connection."""
6680 if self ._async_conn is not None :
67- loop = self ._get_loop ()
68- loop .run_until_complete (self ._async_conn .close ())
81+ self ._run_sync (self ._async_conn .close ())
6982 self ._async_conn = None
83+ if self ._loop is not None and not self ._loop .is_closed ():
84+ self ._loop .call_soon_threadsafe (self ._loop .stop )
85+ if self ._thread is not None :
86+ self ._thread .join (timeout = 5 )
87+ self ._loop .close ()
88+ self ._loop = None
89+ self ._thread = None
7090 self ._closed = True
7191
7292 def commit (self ) -> None :
@@ -75,8 +95,7 @@ def commit(self) -> None:
7595 raise InterfaceError ("Connection is closed" )
7696
7797 if self ._async_conn is not None :
78- loop = self ._get_loop ()
79- loop .run_until_complete (self ._commit_async ())
98+ self ._run_sync (self ._commit_async ())
8099
81100 async def _commit_async (self ) -> None :
82101 """Async implementation of commit."""
@@ -89,8 +108,7 @@ def rollback(self) -> None:
89108 raise InterfaceError ("Connection is closed" )
90109
91110 if self ._async_conn is not None :
92- loop = self ._get_loop ()
93- loop .run_until_complete (self ._rollback_async ())
111+ self ._run_sync (self ._rollback_async ())
94112
95113 async def _rollback_async (self ) -> None :
96114 """Async implementation of rollback."""
0 commit comments