Skip to content

Commit 6348b8e

Browse files
fix: guarantee _size decrement in _drain_idle() and close() on CancelledError
contextlib.suppress(Exception) does not catch CancelledError (a BaseException since Python 3.9). If CancelledError is delivered during await conn.close() in _drain_idle() or close(), the connection has already been removed from the queue but _size -= 1 is skipped, inflating _size and eventually causing pool starvation. Replace the suppress(Exception) pattern with try/except BaseException/ finally to guarantee the decrement runs regardless of exception type. Fixes #104 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 22a482e commit 6348b8e

2 files changed

Lines changed: 92 additions & 6 deletions

File tree

src/dqliteclient/pool.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,14 @@ async def _drain_idle(self) -> None:
8989
while not self._pool.empty():
9090
try:
9191
conn = self._pool.get_nowait()
92-
with contextlib.suppress(Exception):
93-
await conn.close()
94-
self._size -= 1
9592
except asyncio.QueueEmpty:
9693
break
94+
try:
95+
await conn.close()
96+
except BaseException:
97+
pass
98+
finally:
99+
self._size -= 1
97100

98101
@asynccontextmanager
99102
async def acquire(self) -> AsyncIterator[DqliteConnection]:
@@ -203,11 +206,14 @@ async def close(self) -> None:
203206
while not self._pool.empty():
204207
try:
205208
conn = self._pool.get_nowait()
206-
with contextlib.suppress(Exception):
207-
await conn.close()
208-
self._size -= 1
209209
except asyncio.QueueEmpty:
210210
break
211+
try:
212+
await conn.close()
213+
except BaseException:
214+
pass
215+
finally:
216+
self._size -= 1
211217

212218
# In-use connections are closed by acquire()'s cleanup when they
213219
# return — the else branch checks _closed and closes instead of

tests/test_pool.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,86 @@ async def test_broken_connection_discarded_on_exception(self) -> None:
481481

482482
await pool.close()
483483

484+
async def test_drain_idle_cancellation_does_not_inflate_size(self) -> None:
485+
"""If _drain_idle() is cancelled during conn.close(), _size must still decrement."""
486+
import asyncio
487+
488+
pool = ConnectionPool(["localhost:9001"], min_size=0, max_size=5)
489+
pool._initialized = True
490+
491+
# Create mock connections with a slow close that can be cancelled
492+
for _ in range(3):
493+
mock_conn = MagicMock()
494+
mock_conn.is_connected = True
495+
mock_conn._in_use = False
496+
mock_conn._in_transaction = False
497+
mock_conn._bound_loop = asyncio.get_running_loop()
498+
mock_conn._check_in_use = MagicMock()
499+
500+
async def slow_close():
501+
await asyncio.sleep(10)
502+
503+
mock_conn.close = slow_close
504+
await pool._pool.put(mock_conn)
505+
pool._size += 1
506+
507+
assert pool._size == 3
508+
assert pool._pool.qsize() == 3
509+
510+
# Start draining and cancel mid-way
511+
task = asyncio.create_task(pool._drain_idle())
512+
await asyncio.sleep(0.01) # Let drain start (first conn.close() begins)
513+
task.cancel()
514+
with contextlib.suppress(asyncio.CancelledError):
515+
await task
516+
517+
# Connections that were dequeued but not decremented due to CancelledError
518+
# during close() cause _size inflation. The invariant is:
519+
# _size == number of connections still in the queue
520+
remaining = pool._pool.qsize()
521+
assert pool._size == remaining, (
522+
f"_size ({pool._size}) != queue size ({remaining}). "
523+
f"CancelledError during conn.close() skipped _size decrement."
524+
)
525+
526+
async def test_pool_close_cancellation_does_not_inflate_size(self) -> None:
527+
"""If pool.close() is cancelled during conn.close(), _size must still decrement."""
528+
import asyncio
529+
530+
pool = ConnectionPool(["localhost:9001"], min_size=0, max_size=5)
531+
pool._initialized = True
532+
533+
for _ in range(2):
534+
mock_conn = MagicMock()
535+
mock_conn.is_connected = True
536+
mock_conn._in_use = False
537+
mock_conn._in_transaction = False
538+
mock_conn._bound_loop = asyncio.get_running_loop()
539+
mock_conn._check_in_use = MagicMock()
540+
541+
async def slow_close():
542+
await asyncio.sleep(10)
543+
544+
mock_conn.close = slow_close
545+
await pool._pool.put(mock_conn)
546+
pool._size += 1
547+
548+
assert pool._size == 2
549+
550+
task = asyncio.create_task(pool.close())
551+
await asyncio.sleep(0.01)
552+
task.cancel()
553+
with contextlib.suppress(asyncio.CancelledError):
554+
await task
555+
556+
# _size must reflect that connections were removed from queue
557+
assert pool._size >= 0
558+
# At least the first connection should have been decremented
559+
assert pool._size < 2, (
560+
f"_size should have decremented, got {pool._size}. "
561+
f"CancelledError during close() skipped _size decrement."
562+
)
563+
484564

485565
class TestConnectionPoolIntegration:
486566
"""Integration tests requiring mocked connections."""

0 commit comments

Comments
 (0)