Skip to content

Commit 877bd1d

Browse files
refactor(pool): reservation pattern for _size accounting (ISSUE-34/58)
Every mutation of ``ConnectionPool._size`` now goes through either the new ``_release_reservation`` helper or happens under ``_lock``, and ``_create_connection`` no longer mutates ``_size`` itself. In ``acquire()``, the reservation (``_size += 1``) is taken under the lock; the TCP / leader-discovery work happens *outside* the lock; on failure the caller releases the reservation. Two improvements: 1. ISSUE-58 — pool throughput no longer bottlenecked on the single ``_lock`` across network handshakes. Concurrent acquires under capacity can create connections in parallel. 2. ISSUE-34 — failure and drain paths (``_drain_idle``, the exception branches in ``acquire``, ``_release``) no longer decrement ``_size`` without holding the lock, so the count stays consistent against concurrent capacity checks. Add a stress test that drives 10 concurrent acquires with max_size=3 and asserts the cap is always respected even with injected slow creates; add tests for create-failure rollback and drain-idle size accounting. Update the pre-existing ``test_dead_conn_replacement`` assertion from "lock must be held during create" (now false) to "_size must never exceed max_size" (the actual invariant). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 87b4e6a commit 877bd1d

File tree

3 files changed

+195
-40
lines changed

3 files changed

+195
-40
lines changed

src/dqliteclient/pool.py

Lines changed: 67 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -118,29 +118,57 @@ def __init__(
118118
self._initialized = False
119119

120120
async def initialize(self) -> None:
121-
"""Initialize the pool with minimum connections."""
121+
"""Initialize the pool with minimum connections.
122+
123+
Idempotent: concurrent callers share the same initialization —
124+
only one performs the TCP work, the others await its result.
125+
"""
126+
# Hold the lock across the gather so a second concurrent
127+
# initialize() call observes _initialized=True after the first
128+
# completes and returns without re-creating.
122129
async with self._lock:
123130
if self._initialized:
124131
return
125-
# Create min_size connections concurrently so pool startup
126-
# latency doesn't scale with min_size × per-connect RTT. If any
127-
# one fails, cancel the rest and propagate; leave _initialized
128-
# False so the caller can retry.
129132
if self._min_size > 0:
130-
conns = await asyncio.gather(
131-
*(self._create_connection() for _ in range(self._min_size))
132-
)
133+
self._size += self._min_size
134+
try:
135+
# Create min_size connections concurrently so startup
136+
# latency doesn't scale with min_size × per-connect RTT.
137+
conns = await asyncio.gather(
138+
*(self._create_connection() for _ in range(self._min_size))
139+
)
140+
except BaseException:
141+
self._size -= self._min_size
142+
self._signal_state_change()
143+
raise
133144
for conn in conns:
134145
await self._pool.put(conn)
135146
self._initialized = True
136147

137148
async def _create_connection(self) -> DqliteConnection:
138-
"""Create a new connection to the leader."""
139-
conn = await self._cluster.connect(
149+
"""Create a new connection to the leader.
150+
151+
Does NOT mutate ``self._size`` — callers must have incremented
152+
``_size`` under ``_lock`` as a reservation before calling this
153+
method (see ``acquire`` / ``initialize``) so that concurrent
154+
callers cannot collectively exceed ``_max_size``. On failure,
155+
the caller is responsible for decrementing to release the
156+
reservation.
157+
"""
158+
return await self._cluster.connect(
140159
database=self._database, max_total_rows=self._max_total_rows
141160
)
142-
self._size += 1
143-
return conn
161+
162+
async def _release_reservation(self) -> None:
163+
"""Decrement ``_size`` under the lock, waking waiters.
164+
165+
Every ``_size -= 1`` call in the pool must go through this
166+
helper so the counter stays consistent against concurrent
167+
capacity checks in ``acquire``.
168+
"""
169+
async with self._lock:
170+
self._size -= 1
171+
self._signal_state_change()
144172

145173
def _get_closed_event(self) -> asyncio.Event:
146174
"""Lazily create the closed Event bound to the running loop."""
@@ -178,8 +206,7 @@ async def _drain_idle(self) -> None:
178206
except BaseException:
179207
pass
180208
finally:
181-
self._size -= 1
182-
self._signal_state_change()
209+
await self._release_reservation()
183210

184211
@asynccontextmanager
185212
async def acquire(self) -> AsyncIterator[DqliteConnection]:
@@ -202,13 +229,23 @@ async def acquire(self) -> AsyncIterator[DqliteConnection]:
202229
except asyncio.QueueEmpty:
203230
pass
204231

205-
# Try to create a new connection if under max
232+
# Try to reserve a new-connection slot under the lock, then
233+
# drop the lock before the TCP handshake so concurrent
234+
# pool users aren't serialized on network latency.
235+
reserved = False
206236
async with self._lock:
207237
if self._closed:
208238
raise DqliteConnectionError("Pool is closed")
209239
if self._size < self._max_size:
240+
self._size += 1
241+
reserved = True
242+
if reserved:
243+
try:
210244
conn = await self._create_connection()
211-
break
245+
except BaseException:
246+
await self._release_reservation()
247+
raise
248+
break
212249

213250
# At capacity — wait briefly on the queue, then loop back to
214251
# re-check capacity (another coroutine may have freed a slot)
@@ -250,9 +287,16 @@ async def acquire(self) -> AsyncIterator[DqliteConnection]:
250287
# Also drain other idle connections — they likely point to the same dead server.
251288
if not conn.is_connected:
252289
await self._drain_idle()
290+
# Release the dead reservation, reserve a new slot, then
291+
# do the network work outside the lock.
253292
async with self._lock:
254-
self._size -= 1
293+
self._size -= 1 # dead conn's reservation
294+
self._size += 1 # fresh reservation
295+
try:
255296
conn = await self._create_connection()
297+
except BaseException:
298+
await self._release_reservation()
299+
raise
256300

257301
conn._pool_released = False
258302
try:
@@ -265,25 +309,22 @@ async def acquire(self) -> AsyncIterator[DqliteConnection]:
265309
conn._pool_released = True
266310
with contextlib.suppress(Exception):
267311
await conn.close()
268-
self._size -= 1
269-
self._signal_state_change()
312+
await self._release_reservation()
270313
raise
271314
conn._pool_released = True
272315
try:
273316
self._pool.put_nowait(conn)
274317
except asyncio.QueueFull:
275318
await conn.close()
276-
self._size -= 1
277-
self._signal_state_change()
319+
await self._release_reservation()
278320
else:
279321
# Connection is broken (invalidated by execute/fetch error handlers).
280322
# Drain other idle connections — they likely point to the same dead server.
281323
conn._pool_released = True
282324
await self._drain_idle()
283325
with contextlib.suppress(BaseException):
284326
await conn.close()
285-
self._size -= 1
286-
self._signal_state_change()
327+
await self._release_reservation()
287328
raise
288329
else:
289330
await self._release(conn)
@@ -330,25 +371,22 @@ async def _release(self, conn: DqliteConnection) -> None:
330371
if self._closed:
331372
conn._pool_released = True
332373
await conn.close()
333-
self._size -= 1
334-
self._signal_state_change()
374+
await self._release_reservation()
335375
return
336376

337377
if not await self._reset_connection(conn):
338378
conn._pool_released = True
339379
with contextlib.suppress(Exception):
340380
await conn.close()
341-
self._size -= 1
342-
self._signal_state_change()
381+
await self._release_reservation()
343382
return
344383

345384
conn._pool_released = True
346385
try:
347386
self._pool.put_nowait(conn)
348387
except asyncio.QueueFull:
349388
await conn.close()
350-
self._size -= 1
351-
self._signal_state_change()
389+
await self._release_reservation()
352390

353391
async def execute(self, sql: str, params: Sequence[Any] | None = None) -> tuple[int, int]:
354392
"""Execute a SQL statement using a pooled connection."""

tests/test_pool.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,15 @@ async def waiter():
300300
await pool.close()
301301

302302
async def test_dead_conn_replacement_respects_max_size(self) -> None:
303-
"""Dead-connection replacement must not allow _size to exceed max_size."""
303+
"""Dead-connection replacement must not allow _size to exceed max_size.
304+
305+
ISSUE-34/58 changed the pool to a reservation pattern: the
306+
lock is released before the TCP handshake. The invariant we
307+
assert here is the one that actually matters — that
308+
``_size`` never exceeds ``_max_size`` at any observation
309+
point — rather than the specific implementation detail of
310+
"lock held during create".
311+
"""
304312
pool = ConnectionPool(["localhost:9001"], min_size=1, max_size=2)
305313

306314
dead_conn = MagicMock()
@@ -313,16 +321,16 @@ async def test_dead_conn_replacement_respects_max_size(self) -> None:
313321

314322
assert pool._size == 1
315323

316-
# Track whether the lock is held during dead-conn replacement.
317-
# If _create_connection is called while the lock is NOT held,
318-
# another coroutine could race and exceed max_size.
319-
lock_was_held_during_create = False
324+
# Track _size at every create call. It must never exceed
325+
# max_size, even transiently, because creates happen outside
326+
# the lock under the new pattern and concurrent acquires
327+
# would otherwise race past the cap.
328+
peak_size_during_create = 0
320329
original_create = pool._create_connection
321330

322331
async def tracking_create():
323-
nonlocal lock_was_held_during_create
324-
if pool._lock.locked():
325-
lock_was_held_during_create = True
332+
nonlocal peak_size_during_create
333+
peak_size_during_create = max(peak_size_during_create, pool._size)
326334
return await original_create()
327335

328336
new_conn = MagicMock()
@@ -335,9 +343,9 @@ async def tracking_create():
335343
async with pool.acquire() as conn:
336344
assert conn is new_conn
337345

338-
assert lock_was_held_during_create, (
339-
"Dead-connection replacement must hold the lock to prevent "
340-
"_size race with concurrent acquire() calls"
346+
assert peak_size_during_create <= pool._max_size, (
347+
f"_size exceeded max_size during dead-conn replacement "
348+
f"(peak={peak_size_during_create}, max={pool._max_size})"
341349
)
342350

343351
await pool.close()

tests/test_pool_size_invariants.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
"""ISSUE-34 / ISSUE-58: concurrent-access invariants of ConnectionPool.
2+
3+
Verifies that _size never exceeds max_size under concurrent acquires,
4+
even when _create_connection is slow (simulating TCP handshake
5+
latency) — a regression would be the reservation pattern losing
6+
ordering between check and increment."""
7+
8+
import asyncio
9+
from unittest.mock import AsyncMock, MagicMock, patch
10+
11+
from dqliteclient.pool import ConnectionPool
12+
13+
14+
async def _make_conn() -> MagicMock:
15+
mock = MagicMock()
16+
mock.is_connected = True
17+
mock.connect = AsyncMock()
18+
mock.close = AsyncMock()
19+
mock._in_transaction = False
20+
mock._pool_released = False
21+
mock._tx_owner = None
22+
mock.execute = AsyncMock()
23+
return mock
24+
25+
26+
class TestPoolSizeInvariants:
27+
async def test_concurrent_acquires_respect_max_size(self) -> None:
28+
"""10 concurrent acquires with max_size=3 must create at most 3.
29+
30+
With the pre-fix implementation that held the lock across the
31+
network call, this would pass — but slowly. The new
32+
reservation pattern drops the lock for the create; this test
33+
ensures it still respects the cap.
34+
"""
35+
pool = ConnectionPool(["localhost:9001"], min_size=0, max_size=3, timeout=5.0)
36+
37+
create_count = 0
38+
peak_size = 0
39+
40+
async def slow_create(**kwargs):
41+
nonlocal create_count, peak_size
42+
create_count += 1
43+
peak_size = max(peak_size, pool._size)
44+
# Simulate TCP handshake latency; allow other tasks to run.
45+
for _ in range(5):
46+
await asyncio.sleep(0)
47+
return await _make_conn()
48+
49+
with patch.object(pool._cluster, "connect", side_effect=slow_create):
50+
51+
async def borrow() -> None:
52+
async with pool.acquire() as c:
53+
assert c is not None
54+
55+
await asyncio.gather(*(borrow() for _ in range(10)))
56+
57+
assert create_count == 3, f"expected 3 creates (max_size), got {create_count}"
58+
assert peak_size <= pool._max_size, (
59+
f"_size exceeded max_size: peak={peak_size}, max={pool._max_size}"
60+
)
61+
await pool.close()
62+
63+
async def test_create_failure_rolls_back_reservation(self) -> None:
64+
"""If _create_connection raises, the reservation must be released
65+
so a subsequent acquire can succeed."""
66+
pool = ConnectionPool(["localhost:9001"], min_size=0, max_size=1, timeout=1.0)
67+
68+
call_count = 0
69+
70+
async def flaky_create(**kwargs):
71+
nonlocal call_count
72+
call_count += 1
73+
if call_count == 1:
74+
raise OSError("boom")
75+
return await _make_conn()
76+
77+
with patch.object(pool._cluster, "connect", side_effect=flaky_create):
78+
79+
async def borrow() -> None:
80+
async with pool.acquire() as c:
81+
assert c is not None
82+
83+
# First call fails — reservation must be released.
84+
import pytest
85+
86+
with pytest.raises(OSError):
87+
await borrow()
88+
assert pool._size == 0, f"reservation leaked after create failure; _size={pool._size}"
89+
90+
# Second call should succeed.
91+
await borrow()
92+
93+
await pool.close()
94+
95+
async def test_drained_idle_releases_size(self) -> None:
96+
"""_drain_idle must decrement _size by exactly the number of drained connections."""
97+
pool = ConnectionPool(["localhost:9001"], min_size=3, max_size=5, timeout=1.0)
98+
99+
async def ok_create(**kwargs):
100+
return await _make_conn()
101+
102+
with patch.object(pool._cluster, "connect", side_effect=ok_create):
103+
await pool.initialize()
104+
assert pool._size == 3
105+
106+
await pool._drain_idle()
107+
assert pool._size == 0, f"drain_idle left _size={pool._size}"
108+
109+
await pool.close()

0 commit comments

Comments
 (0)