Skip to content

Commit 5ea8dc7

Browse files
fix: prevent pool starvation by looping acquire with capacity re-check
Previously, acquire() made a one-shot decision: if the pool was at max capacity, it committed to waiting on the queue for a returned connection. If connections were dying (not returned) in error paths, the _size counter decremented but no one notified the blocked waiter. The waiter stayed stuck on the queue until timeout, even though the pool had capacity to create a new connection. Restructure acquire() as a loop with short sub-timeouts (0.5s). After each sub-timeout, the loop re-checks whether _size < max_size and creates a new connection if capacity is available. This ensures the pool self-heals during cluster failover or connection failures. Closes #081 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 3a1a6db commit 5ea8dc7

File tree

2 files changed

+93
-12
lines changed

2 files changed

+93
-12
lines changed

src/dqliteclient/pool.py

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -79,28 +79,39 @@ async def acquire(self) -> AsyncIterator[DqliteConnection]:
7979
if self._closed:
8080
raise DqliteConnectionError("Pool is closed")
8181

82+
loop = asyncio.get_running_loop()
83+
deadline = loop.time() + self._timeout
8284
conn: DqliteConnection | None = None
8385

84-
# Try to get from pool
85-
try:
86-
conn = self._pool.get_nowait()
87-
except asyncio.QueueEmpty:
88-
# Create new if under max
86+
while conn is None:
87+
# Try to get an idle connection from the queue
88+
try:
89+
conn = self._pool.get_nowait()
90+
break
91+
except asyncio.QueueEmpty:
92+
pass
93+
94+
# Try to create a new connection if under max
8995
async with self._lock:
9096
if self._size < self._max_size:
9197
conn = await self._create_connection()
98+
break
9299

93-
# Wait for one if at max
94-
if conn is None:
100+
# At capacity — wait briefly on the queue, then loop back to
101+
# re-check capacity (another coroutine may have freed a slot)
102+
remaining = deadline - loop.time()
103+
if remaining <= 0:
104+
raise DqliteConnectionError(
105+
f"Timed out waiting for a connection from the pool "
106+
f"(max_size={self._max_size}, timeout={self._timeout}s)"
107+
)
95108
try:
96109
conn = await asyncio.wait_for(
97-
self._pool.get(), timeout=self._timeout
110+
self._pool.get(), timeout=min(remaining, 0.5)
98111
)
99112
except TimeoutError:
100-
raise DqliteConnectionError(
101-
f"Timed out waiting for a connection from the pool "
102-
f"(max_size={self._max_size}, timeout={self._timeout}s)"
103-
) from None
113+
# Don't fail yet — loop back to re-check _size
114+
continue
104115

105116
# If connection is dead, discard and create a fresh one with leader discovery
106117
if not conn.is_connected:

tests/test_pool.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,76 @@ async def hold_connection():
224224
# _size must never go negative
225225
assert pool._size >= 0, f"_size went negative: {pool._size}"
226226

227+
async def test_acquire_recovers_after_connection_failure(self) -> None:
228+
"""A waiter blocked on the queue should recover when capacity frees up."""
229+
import asyncio
230+
231+
pool = ConnectionPool(["localhost:9001"], min_size=0, max_size=1, timeout=2.0)
232+
233+
mock_conn1 = MagicMock()
234+
mock_conn1.is_connected = True
235+
mock_conn1.connect = AsyncMock()
236+
mock_conn1.close = AsyncMock()
237+
238+
mock_conn2 = MagicMock()
239+
mock_conn2.is_connected = True
240+
mock_conn2.connect = AsyncMock()
241+
mock_conn2.close = AsyncMock()
242+
243+
conns = iter([mock_conn1, mock_conn2])
244+
245+
with patch.object(pool._cluster, "connect", side_effect=lambda **kw: next(conns)):
246+
# Fill the pool to max_size and keep the connection checked out
247+
holder_acquired = asyncio.Event()
248+
holder_release = asyncio.Event()
249+
250+
async def holder():
251+
async with pool.acquire() as conn1:
252+
assert conn1 is mock_conn1
253+
holder_acquired.set()
254+
await holder_release.wait()
255+
raise RuntimeError("simulated failure")
256+
257+
holder_task = asyncio.create_task(holder())
258+
await holder_acquired.wait()
259+
260+
# Now start the waiter — it MUST enter the queue.get() wait
261+
# because _size == max_size and the queue is empty
262+
waiter_done = asyncio.Event()
263+
waiter_result: MagicMock | None = None
264+
265+
async def waiter():
266+
nonlocal waiter_result
267+
async with pool.acquire() as c:
268+
waiter_result = c
269+
waiter_done.set()
270+
271+
waiter_task = asyncio.create_task(waiter())
272+
# Give the waiter time to pass get_nowait (empty) and the lock
273+
# check (_size == max_size) and enter the queue.get() wait
274+
await asyncio.sleep(0.1)
275+
276+
# Now release the holder — it fails, closing conn and decrementing _size
277+
holder_release.set()
278+
with contextlib.suppress(RuntimeError):
279+
await holder_task
280+
# At this point _size == 0. The waiter is blocked on queue.get().
281+
# With the fix, the waiter should wake up and create a new connection.
282+
283+
try:
284+
await asyncio.wait_for(waiter_done.wait(), timeout=1.5)
285+
except TimeoutError:
286+
waiter_task.cancel()
287+
with contextlib.suppress(asyncio.CancelledError):
288+
await waiter_task
289+
pytest.fail(
290+
"Waiter timed out — pool starvation: waiter stuck on queue "
291+
"even though _size dropped below max_size"
292+
)
293+
294+
assert waiter_result is mock_conn2
295+
await pool.close()
296+
227297
async def test_dead_conn_replacement_respects_max_size(self) -> None:
228298
"""Dead-connection replacement must not allow _size to exceed max_size."""
229299
pool = ConnectionPool(["localhost:9001"], min_size=1, max_size=2)

0 commit comments

Comments
 (0)