Skip to content

Commit a95d15e

Browse files
Pin acquire-side post-reset _closed re-check
The _release-site post-reset _closed re-check is already pinned. The structurally-identical block inside acquire()'s exception arm is its own code with its own line range and was untested. Path: a user's async-with body raises → acquire() runs _reset_connection (ROLLBACK), and on success would put_nowait the conn back. pool.close() does not take _lock, so a concurrent close may flip _closed=True and drain the queue while ROLLBACK is in flight — without the post-reset re-check the conn is queued into a drained pool and orphaned. Race the close against a controllable _slow_reset event: assert the conn was close()'d via the close+release branch and not put back into the drained queue. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 447b3f9 commit a95d15e

1 file changed

Lines changed: 116 additions & 0 deletions

File tree

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
"""Pin: ``acquire()`` exception-arm re-checks ``_closed`` after the
2+
``_reset_connection`` yield.
3+
4+
The structurally-identical ``_release`` post-reset re-check is already
5+
pinned by ``test_pool_release_post_reset_closed_recheck.py``. The
6+
``acquire()`` exception-arm path is its own code with its own line
7+
range and was previously untested:
8+
9+
```python
10+
async with pool.acquire() as conn:
11+
raise <user-exception> # falls into the BaseException arm
12+
```
13+
14+
When the user's body raises, ``acquire()``'s ``except BaseException``
15+
runs ``_reset_connection`` (ROLLBACK) and on success would
16+
``put_nowait`` the conn back. ``pool.close()`` does not take
17+
``_lock``, so a concurrent close may flip ``_closed=True`` and drain
18+
the queue while the ROLLBACK is in flight. Without the post-reset
19+
re-check the conn would be queued into a drained pool and orphaned.
20+
21+
A future refactor that drops the re-check on the acquire-side path
22+
would silently regress.
23+
"""
24+
25+
from __future__ import annotations
26+
27+
import asyncio
28+
from typing import Any
29+
from unittest.mock import MagicMock
30+
31+
import pytest
32+
33+
from dqliteclient.cluster import ClusterClient
34+
from dqliteclient.pool import ConnectionPool
35+
36+
37+
class _FakeConn:
38+
def __init__(self) -> None:
39+
self._address = "localhost:9001"
40+
self._in_transaction = False
41+
self._tx_owner = None
42+
self._pool_released = False
43+
self._protocol = MagicMock()
44+
self._protocol._writer = MagicMock()
45+
self._protocol._writer.transport = MagicMock()
46+
self._protocol._writer.transport.is_closing = lambda: False
47+
self._protocol._reader = MagicMock()
48+
self._protocol._reader.at_eof = lambda: False
49+
self._protocol.is_wire_coherent = True
50+
self.close_called = False
51+
52+
@property
53+
def is_connected(self) -> bool:
54+
return self._protocol is not None
55+
56+
async def close(self) -> None:
57+
self.close_called = True
58+
self._protocol = None # type: ignore[assignment]
59+
60+
61+
def _make_pool() -> ConnectionPool:
62+
cluster = MagicMock(spec=ClusterClient)
63+
return ConnectionPool(
64+
addresses=["localhost:9001"],
65+
min_size=0,
66+
max_size=1,
67+
timeout=1.0,
68+
cluster=cluster,
69+
)
70+
71+
72+
@pytest.mark.asyncio
73+
async def test_acquire_exception_post_reset_closes_when_pool_closed_during_reset() -> None:
74+
"""``acquire()``'s exception arm must observe a concurrent
75+
``pool.close()`` that lands during the ROLLBACK yield, route the
76+
conn through close + reservation release, and not orphan it in
77+
a drained queue."""
78+
pool = _make_pool()
79+
pool._size = 1
80+
conn = _FakeConn()
81+
pool._pool.put_nowait(conn) # type: ignore[arg-type]
82+
83+
reset_can_finish = asyncio.Event()
84+
reset_entered = asyncio.Event()
85+
86+
async def _slow_reset(c: Any) -> bool:
87+
reset_entered.set()
88+
await reset_can_finish.wait()
89+
return True
90+
91+
pool._reset_connection = _slow_reset # type: ignore[assignment]
92+
93+
async def _user() -> None:
94+
async with pool.acquire():
95+
raise RuntimeError("user error")
96+
97+
user_task = asyncio.create_task(_user())
98+
await reset_entered.wait()
99+
assert not user_task.done()
100+
101+
close_task = asyncio.create_task(pool.close())
102+
for _ in range(10):
103+
await asyncio.sleep(0)
104+
if pool._closed:
105+
break
106+
assert pool._closed, "expected pool.close() to mark _closed=True"
107+
108+
reset_can_finish.set()
109+
with pytest.raises(RuntimeError, match="user error"):
110+
await user_task
111+
await close_task
112+
113+
# The conn must have been close()'d via the post-reset re-check
114+
# branch on the acquire-side path, not orphaned in a drained queue.
115+
assert conn.close_called is True
116+
assert conn._pool_released is True

0 commit comments

Comments
 (0)