Skip to content

Commit add75e5

Browse files
Pin AsyncConnection to the loop that first created its locks
The module docstring already warned that each AsyncConnection is bound to the event loop it was first used on, but nothing enforced it. Running the same instance on a second loop silently produced asyncio's internal "got Future attached to a different loop" RuntimeError. Capture a weakref to the loop on first _ensure_locks(), raise ProgrammingError up front if a different loop later calls in, and clear the pin on close() so a legitimate fixture-reuse cycle still works. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 1bf1d1b commit add75e5

File tree

2 files changed

+96
-2
lines changed

2 files changed

+96
-2
lines changed

src/dqlitedbapi/aio/connection.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import asyncio
44
import contextlib
55
import math
6+
import weakref
67
from types import TracebackType
78
from typing import Any
89

@@ -62,15 +63,41 @@ def __init__(
6263
# glue code before any loop exists).
6364
self._connect_lock: asyncio.Lock | None = None
6465
self._op_lock: asyncio.Lock | None = None
66+
# Weak reference to the loop the locks were first bound to.
67+
# Captured at first ``_ensure_locks()`` so subsequent use from a
68+
# different event loop raises a clean ProgrammingError instead
69+
# of asyncio's internal "got Future attached to a different
70+
# loop" RuntimeError. Weakref avoids pinning a closed loop
71+
# alive once the caller has moved on.
72+
self._loop_ref: weakref.ref[asyncio.AbstractEventLoop] | None = None
6573
# PEP 249 optional extension; see Connection.messages.
6674
self.messages: list[tuple[type, Any]] = []
6775

6876
def _ensure_locks(self) -> tuple[asyncio.Lock, asyncio.Lock]:
69-
"""Lazy-create the asyncio locks on the currently-running loop."""
77+
"""Lazy-create the asyncio locks on the currently-running loop.
78+
79+
Also pins the connection to that loop: subsequent calls from a
80+
different loop raise ``ProgrammingError`` up front. The
81+
underlying ``DqliteConnection`` protocol's StreamReader/Writer
82+
is also loop-bound, so transparently rebinding is not safe;
83+
fail fast with a clear message instead.
84+
"""
85+
loop = asyncio.get_running_loop()
7086
if self._connect_lock is None:
87+
self._loop_ref = weakref.ref(loop)
7188
self._connect_lock = asyncio.Lock()
72-
if self._op_lock is None:
7389
self._op_lock = asyncio.Lock()
90+
else:
91+
bound = self._loop_ref() if self._loop_ref is not None else None
92+
if bound is not loop:
93+
raise ProgrammingError(
94+
"AsyncConnection was first used on a different event loop; "
95+
"AsyncConnection instances are loop-bound and cannot be "
96+
"reused across asyncio.run() invocations."
97+
)
98+
# ``_op_lock`` is created together with ``_connect_lock`` above;
99+
# the assertion keeps mypy narrow without a runtime cost.
100+
assert self._op_lock is not None
74101
return self._connect_lock, self._op_lock
75102

76103
async def _ensure_connection(self) -> DqliteConnection:
@@ -130,6 +157,7 @@ async def close(self) -> None:
130157
# connect_lock.
131158
self._connect_lock = None
132159
self._op_lock = None
160+
self._loop_ref = None
133161
return
134162
_, op_lock = self._ensure_locks()
135163
async with op_lock:
@@ -141,6 +169,7 @@ async def close(self) -> None:
141169
# re-check before it touches the now-None primitive.
142170
self._connect_lock = None
143171
self._op_lock = None
172+
self._loop_id = None
144173

145174
async def commit(self) -> None:
146175
"""Commit any pending transaction.

tests/test_async_lock_binding.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,3 +65,68 @@ async def scenario() -> None:
6565
assert conn._op_lock is None
6666

6767
asyncio.run(scenario())
68+
69+
70+
class TestLoopAffinityEnforcement:
71+
"""After the first _ensure_locks() call, the AsyncConnection is
72+
pinned to that loop. Any subsequent use from a different loop must
73+
raise a clean ProgrammingError instead of asyncio's internal
74+
"got Future attached to a different loop" RuntimeError.
75+
"""
76+
77+
def test_cross_loop_use_raises_programming_error(self) -> None:
78+
import asyncio
79+
80+
from dqlitedbapi.aio.connection import AsyncConnection
81+
from dqlitedbapi.exceptions import ProgrammingError
82+
83+
conn = AsyncConnection("localhost:19001", database="x")
84+
85+
async def touch() -> None:
86+
conn._ensure_locks()
87+
88+
asyncio.run(touch())
89+
90+
loop2 = asyncio.new_event_loop()
91+
try:
92+
with pytest.raises(ProgrammingError, match="loop"):
93+
loop2.run_until_complete(touch())
94+
finally:
95+
loop2.close()
96+
97+
def test_same_loop_reuse_is_fine(self) -> None:
98+
import asyncio
99+
100+
from dqlitedbapi.aio.connection import AsyncConnection
101+
102+
conn = AsyncConnection("localhost:19001", database="x")
103+
104+
async def touch() -> tuple[asyncio.Lock, asyncio.Lock]:
105+
a1, b1 = conn._ensure_locks()
106+
a2, b2 = conn._ensure_locks()
107+
assert a1 is a2
108+
assert b1 is b2
109+
return a1, b1
110+
111+
asyncio.run(touch())
112+
113+
def test_close_clears_loop_pin(self) -> None:
114+
"""Close resets the pin so a subsequent asyncio.run on the
115+
same object (e.g. test fixture reuse) works without tripping
116+
the cross-loop guard.
117+
"""
118+
import asyncio
119+
120+
from dqlitedbapi.aio.connection import AsyncConnection
121+
122+
conn = AsyncConnection("localhost:19001", database="x")
123+
124+
async def _scenario() -> None:
125+
conn._ensure_locks()
126+
await conn.close()
127+
128+
asyncio.run(_scenario())
129+
assert conn._loop_ref is None
130+
131+
132+
import pytest # noqa: E402

0 commit comments

Comments
 (0)