Skip to content

Commit 048c070

Browse files
Let cancellation escape pool._drain_idle
The helper caught ``BaseException`` and silently discarded it. That included ``asyncio.CancelledError``, so ``pool.close()`` invoked inside ``asyncio.timeout()`` or a ``TaskGroup`` would hang past its deadline while drain kept running — the cancellation was absorbed. Narrow the catch to ``Exception`` and log the address+error at DEBUG. Transport-level close failures (``BrokenPipeError``, ``OSError``, our own ``DqliteConnectionError``) are still absorbed so drain finishes the remaining idle connections; cancellation and interpreter-exit signals now propagate. ``_release_reservation`` stays inside the ``finally``, so the per-iteration reservation invariant is preserved under both paths. Add regression tests for both angles: ``CancelledError`` from a connection's close escapes ``_drain_idle`` and still decrements ``_size``; an ``OSError`` is absorbed, subsequent connections are drained, and a DEBUG log record names the failure. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 7341924 commit 048c070

File tree

2 files changed

+72
-2
lines changed

2 files changed

+72
-2
lines changed

src/dqliteclient/pool.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -264,8 +264,18 @@ async def _drain_idle(self) -> None:
264264
break
265265
try:
266266
await conn.close()
267-
except BaseException:
268-
pass
267+
except Exception as exc:
268+
# Transport-level failures (BrokenPipeError, OSError, our
269+
# own DqliteConnectionError) are absorbed so drain can
270+
# finish the remaining connections. CancelledError /
271+
# KeyboardInterrupt / SystemExit propagate — swallowing
272+
# them used to break structured concurrency (``asyncio.
273+
# timeout`` around ``pool.close()`` would silently hang).
274+
logger.debug(
275+
"pool: close() on idle connection %r failed: %r",
276+
getattr(conn, "_address", "?"),
277+
exc,
278+
)
269279
finally:
270280
await self._release_reservation()
271281

tests/test_pool.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Tests for connection pooling."""
22

3+
import asyncio
34
import contextlib
45
from unittest.mock import AsyncMock, MagicMock, patch
56

@@ -1243,3 +1244,62 @@ def test_unexpected_exception_is_not_swallowed(self) -> None:
12431244
conn = self._make_conn(transport=transport, reader=reader)
12441245
with pytest.raises(ValueError, match="broken mock"):
12451246
_socket_looks_dead(conn)
1247+
1248+
1249+
class TestDrainIdleCancellation:
1250+
"""``_drain_idle`` must not swallow ``CancelledError`` or any other
1251+
``BaseException``. The reservation release on each iteration still runs,
1252+
so cancellation is clean — but the cancel signal itself must propagate.
1253+
1254+
A close() failure for a non-cancellation reason should be absorbed so
1255+
drain can finish the remaining connections, but logged for diagnostics.
1256+
"""
1257+
1258+
async def test_cancelled_error_in_close_propagates(self) -> None:
1259+
"""An async close() raising ``CancelledError`` must escape ``_drain_idle``.
1260+
Previously the helper's ``except BaseException: pass`` swallowed it,
1261+
breaking structured concurrency (``asyncio.timeout`` / ``TaskGroup``).
1262+
"""
1263+
pool = ConnectionPool(["localhost:9001"])
1264+
1265+
bad_conn = MagicMock()
1266+
1267+
async def cancel_close() -> None:
1268+
raise asyncio.CancelledError("outer timeout")
1269+
1270+
bad_conn.close = cancel_close
1271+
await pool._pool.put(bad_conn)
1272+
pool._size = 1
1273+
1274+
with pytest.raises(asyncio.CancelledError, match="outer timeout"):
1275+
await pool._drain_idle()
1276+
1277+
# Reservation must still be released on the cancellation path so
1278+
# the pool is recoverable after the cancel.
1279+
assert pool._size == 0
1280+
1281+
async def test_ordinary_exception_is_absorbed_and_logged(
1282+
self, caplog: pytest.LogCaptureFixture
1283+
) -> None:
1284+
"""An ``OSError`` from close() should be absorbed so drain continues.
1285+
Behaviour is logged at DEBUG for operator diagnostics.
1286+
"""
1287+
import logging
1288+
1289+
pool = ConnectionPool(["localhost:9001"])
1290+
1291+
c1, c2 = MagicMock(), MagicMock()
1292+
c1._address = "n1:9001"
1293+
c2._address = "n2:9001"
1294+
c1.close = AsyncMock(side_effect=OSError("boom"))
1295+
c2.close = AsyncMock()
1296+
await pool._pool.put(c1)
1297+
await pool._pool.put(c2)
1298+
pool._size = 2
1299+
1300+
with caplog.at_level(logging.DEBUG, logger="dqliteclient.pool"):
1301+
await pool._drain_idle()
1302+
1303+
c2.close.assert_awaited_once()
1304+
assert pool._size == 0
1305+
assert any("boom" in rec.getMessage() for rec in caplog.records)

0 commit comments

Comments
 (0)