Skip to content

Commit 1cc175f

Browse files
Pin pool _release post-reset _closed re-check
A previous commit added the defensive re-check at the _release site after the _reset_connection yield: if pool.close() landed during the ROLLBACK, the conn must be close()'d and the reservation released rather than put_nowait'd into a drained queue (which would orphan it). Existing tests covered the connect-site analogous race; the _release-site re-check was uncovered. Drive the race deterministically with an asyncio.Event-gated _reset_connection so an interleaving close() flips _closed=True before the reset returns; assert the conn is close()'d and _pool_released=True.
1 parent 1dc709b commit 1cc175f

1 file changed

Lines changed: 108 additions & 0 deletions

File tree

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
"""Pin: pool ``_release`` re-checks ``_closed`` after the
2+
``_reset_connection`` yield.
3+
4+
``pool.close()`` does not take ``_lock``, so it may flip
5+
``_closed=True`` and drain the queue while ``_release`` is
6+
suspended awaiting the ROLLBACK. Without the post-reset re-check,
7+
the conn would be put_nowait'd into a drained queue and orphaned
8+
(acquire() raises "Pool is closed", the conn is unreachable).
9+
10+
Existing tests cover the connect-site analogous race; the
11+
``_release`` site post-reset re-check was untested. A future
12+
refactor that drops the re-check would silently re-introduce the
13+
orphan-into-drained-queue bug.
14+
"""
15+
16+
from __future__ import annotations
17+
18+
import asyncio
19+
from typing import Any
20+
from unittest.mock import MagicMock
21+
22+
import pytest
23+
24+
from dqliteclient.cluster import ClusterClient
25+
from dqliteclient.pool import ConnectionPool
26+
27+
28+
class _FakeConn:
29+
def __init__(self) -> None:
30+
self._address = "localhost:9001"
31+
self._in_transaction = False
32+
self._tx_owner = None
33+
self._pool_released = False
34+
self._protocol = MagicMock()
35+
self._protocol._writer = MagicMock()
36+
self._protocol._writer.transport = MagicMock()
37+
self._protocol._writer.transport.is_closing = lambda: False
38+
self._protocol._reader = MagicMock()
39+
self._protocol._reader.at_eof = lambda: False
40+
self._protocol.is_wire_coherent = True
41+
self.close_called = False
42+
43+
@property
44+
def is_connected(self) -> bool:
45+
return self._protocol is not None
46+
47+
async def close(self) -> None:
48+
self.close_called = True
49+
self._protocol = None # type: ignore[assignment]
50+
51+
52+
def _make_pool() -> ConnectionPool:
53+
cluster = MagicMock(spec=ClusterClient)
54+
return ConnectionPool(
55+
addresses=["localhost:9001"],
56+
min_size=0,
57+
max_size=1,
58+
timeout=1.0,
59+
cluster=cluster,
60+
)
61+
62+
63+
@pytest.mark.asyncio
64+
async def test_release_post_reset_closes_conn_when_pool_closed_during_reset() -> None:
65+
"""``_release`` must observe a concurrent ``pool.close()`` that
66+
landed during the ``_reset_connection`` yield, route the conn
67+
through close + reservation release, and not orphan it in a
68+
drained queue."""
69+
pool = _make_pool()
70+
pool._size = 1
71+
conn = _FakeConn()
72+
73+
# Block _reset_connection on a controllable event so we can
74+
# interleave a pool.close() before the ROLLBACK returns.
75+
reset_can_finish = asyncio.Event()
76+
reset_entered = asyncio.Event()
77+
78+
async def _slow_reset(c: Any) -> bool:
79+
reset_entered.set()
80+
await reset_can_finish.wait()
81+
return True # ROLLBACK "succeeded"
82+
83+
pool._reset_connection = _slow_reset # type: ignore[assignment]
84+
85+
# Start _release on a background task; it parks inside _slow_reset.
86+
release_task = asyncio.create_task(pool._release(conn)) # type: ignore[arg-type]
87+
await reset_entered.wait()
88+
assert not release_task.done()
89+
90+
# Concurrently close the pool. The post-reset re-check must
91+
# observe _closed=True after we let _slow_reset return.
92+
close_task = asyncio.create_task(pool.close())
93+
# Give close() a tick to mark _closed=True before we let _slow_reset finish.
94+
for _ in range(5):
95+
await asyncio.sleep(0)
96+
if pool._closed:
97+
break
98+
assert pool._closed, "expected pool.close() to mark _closed=True"
99+
100+
reset_can_finish.set()
101+
await release_task
102+
await close_task
103+
104+
# The conn must have been close()'d (post-reset re-check
105+
# branched into the close+release path), not orphaned in the
106+
# drained queue.
107+
assert conn.close_called is True
108+
assert conn._pool_released is True

0 commit comments

Comments
 (0)