Skip to content

Commit 10d38e3

Browse files
Harden transaction + pool cancellation discipline
Coordinated fix for six cross-cutting cancellation/lifecycle bugs in DqliteConnection.transaction() and ConnectionPool. Seven related issues land together because they share the same code blocks and invariants; splitting them would produce repeated rewrites of the same methods across multiple PRs. transaction() rewrites (connection.py): - When body raises AND ROLLBACK also raises, invalidate the connection (set _protocol=None) so the pool discards it instead of returning to the queue with a live server-side transaction. The previous `finally` block cleared _in_transaction unconditionally, masking the dangling transaction from the pool's _reset_connection check. - The body-raised ROLLBACK no longer uses suppress(BaseException). CancelledError / KeyboardInterrupt / SystemExit propagate (structured-concurrency contract — TaskGroup / asyncio.timeout() rely on it). Non-cancellation exceptions still invalidate the connection so the original body exception propagates cleanly. - Cancellation matrix — cancel before BEGIN, in body, during COMMIT, during ROLLBACK — all four phases now produce consistent post-conditions: CancelledError propagates, connection state is terminal (either usable or invalidated). ConnectionPool rewrites (pool.py): - initialize() now uses asyncio.gather(return_exceptions=True) and closes survivors before re-raising the first failure. The previous default-exceptions path cancelled sibling tasks but did NOT close already-succeeded connections, leaking transports. - The check-`_closed`-then-clear-`closed_event` sequence in acquire() now runs under self._lock, eliminating the window where a concurrent close() could set() the event between check and clear (which would lose the close signal and stall parked waiters until the per-poll timeout). - acquire()'s except branch now wraps cleanup in a try/finally with asyncio.shield(self._release_reservation()). The finally runs even when a second cancellation lands mid-cleanup; shield keeps the _size decrement uninterruptible so the pool never loses a reservation under rapid-fire cancellation. - CRITICAL transport leak fix (flagged by concurrency review on the initial diff): every `_pool_released = True` now comes AFTER `await conn.close()`. DqliteConnection.close() has an early-return guard on _pool_released; setting the flag first made every pool-side close() a silent no-op, leaking the transport. Applies to all 8 cleanup branches across acquire() and _release(). connect() failure path: - New _abort_protocol() helper uses asyncio.wait_for(protocol.wait_closed(), timeout=0.5) so the transport drains under normal conditions but never hangs on an unresponsive peer. Different call site than the leader-query finally block (where unbounded wait_closed was correctly rejected for hang reasons); connect() is a retry-loop path that benefits from bounded draining. Tests (all RED before this commit, GREEN after): - tests/test_transaction_cancellation.py (7 tests): - TestRollbackFailureInvalidatesConnection - TestRollbackHappyPath: regression for body-raise + clean-rollback - TestRollbackCancellationPropagates - TestTransactionCancellationPhases: 4-phase matrix - tests/test_pool_cancellation.py (4 tests): - TestInitializePartialFailureClosesSurvivors - TestAcquireCancellationRestoresSize - TestCloseWakesAllWaiters: message-text assertion (CI-latency robust) rather than timing The _FakeConn test double replicates the real DqliteConnection's _pool_released close-guard via close_effective. Without this, the transport-leak bug (the critical finding on the initial diff) would have been invisible to tests. Two concurrency reviews and one test-quality review ran on the implementation. The first review caught the _pool_released ordering bug; the fix applies to every cleanup branch. A follow-up review verified all 8 branches close-before-flag. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent b4904f0 commit 10d38e3

File tree

4 files changed

+817
-47
lines changed

4 files changed

+817
-47
lines changed

src/dqliteclient/connection.py

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,7 @@ async def connect(self) -> None:
176176
await self._protocol.handshake()
177177
self._db_id = await self._protocol.open_database(self._database)
178178
except OperationalError as e:
179-
self._protocol.close()
180-
self._protocol = None
179+
await self._abort_protocol()
181180
if e.code in _LEADER_ERROR_CODES:
182181
# Leader-change errors during OPEN are transport-level
183182
# problems — the caller needs to reconnect elsewhere, not
@@ -187,8 +186,7 @@ async def connect(self) -> None:
187186
) from e
188187
raise
189188
except BaseException:
190-
self._protocol.close()
191-
self._protocol = None
189+
await self._abort_protocol()
192190
raise
193191
finally:
194192
self._in_use = False
@@ -210,6 +208,25 @@ async def close(self) -> None:
210208
protocol.close()
211209
await protocol.wait_closed()
212210

211+
async def _abort_protocol(self) -> None:
212+
"""Tear down a half-open protocol during a connect failure path.
213+
214+
Close the writer, then give ``wait_closed`` a bounded budget so
215+
the transport drains under normal conditions but never hangs on
216+
an unresponsive peer (ISSUE-72). Cycle-2 ISSUE-38 rejected an
217+
unbounded ``wait_closed`` in the leader-query finally block
218+
because leader-query is a discovery path that runs against
219+
arbitrary nodes; connect is a retry-loop path that benefits
220+
from draining, so the two sites take different decisions.
221+
"""
222+
protocol = self._protocol
223+
if protocol is None:
224+
return
225+
self._protocol = None
226+
protocol.close()
227+
with contextlib.suppress(BaseException):
228+
await asyncio.wait_for(protocol.wait_closed(), timeout=0.5)
229+
213230
async def __aenter__(self) -> "DqliteConnection":
214231
await self.connect()
215232
return self
@@ -383,7 +400,26 @@ async def fetchval(self, sql: str, params: Sequence[Any] | None = None) -> Any:
383400

384401
@asynccontextmanager
385402
async def transaction(self) -> AsyncIterator[None]:
386-
"""Context manager for transactions."""
403+
"""Context manager for transactions.
404+
405+
Cancellation contract (ISSUE-75 / ISSUE-79):
406+
- Cancellation during BEGIN: state cleared, CancelledError
407+
propagates.
408+
- Cancellation during the body: ROLLBACK is attempted. If
409+
ROLLBACK itself is cancelled, the connection is invalidated
410+
and CancelledError propagates (structured-concurrency
411+
contract — TaskGroup / asyncio.timeout() require this).
412+
- Cancellation during COMMIT: connection invalidated
413+
(server-side state ambiguous), CancelledError propagates.
414+
- Cancellation during ROLLBACK (body already raised): connection
415+
invalidated, CancelledError propagates and supersedes the
416+
body exception (Python chains it via ``__context__``).
417+
418+
Non-cancellation ROLLBACK failure (ISSUE-73): connection is
419+
invalidated so the pool discards it instead of reusing a
420+
Python-side "_in_transaction=False" connection with live
421+
server-side transaction state.
422+
"""
387423
if self._in_transaction:
388424
raise InterfaceError("Nested transactions are not supported; use SAVEPOINT directly")
389425

@@ -409,10 +445,35 @@ async def transaction(self) -> AsyncIterator[None]:
409445
# pool discards it instead of recycling an unknown state.
410446
self._invalidate()
411447
else:
412-
# Body raised before COMMIT; try to roll back. Swallow
413-
# rollback errors; the original exception is more important.
414-
with contextlib.suppress(BaseException):
448+
# Body raised before COMMIT; try to roll back.
449+
#
450+
# Narrow suppression to Exception (NOT BaseException):
451+
# CancelledError / KeyboardInterrupt / SystemExit must
452+
# propagate. Previously ``suppress(BaseException)``
453+
# swallowed cancellation, breaking structured-concurrency
454+
# contracts (ISSUE-75).
455+
#
456+
# If ROLLBACK fails for any reason (including the narrow
457+
# cancellation catch below), the connection's transaction
458+
# state is unknowable from our side and the connection
459+
# must be invalidated so the pool discards it on return
460+
# (ISSUE-73). The original body exception is still the
461+
# one that propagates, except for cancellation which
462+
# takes precedence.
463+
try:
415464
await self.execute("ROLLBACK")
465+
except (asyncio.CancelledError, KeyboardInterrupt, SystemExit):
466+
# Rollback interrupted mid-flight. Server-side tx is
467+
# in an unknown state; invalidate and propagate the
468+
# higher-priority signal.
469+
self._invalidate()
470+
raise
471+
except Exception:
472+
# Rollback failed for a non-cancellation reason.
473+
# Invalidate so the pool discards on return, then
474+
# re-raise the ORIGINAL body exception (below)
475+
# — rollback failure is a secondary concern.
476+
self._invalidate()
416477
raise
417478
finally:
418479
self._tx_owner = None

src/dqliteclient/pool.py

Lines changed: 97 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,13 @@ async def initialize(self) -> None:
154154
155155
Idempotent: concurrent callers share the same initialization —
156156
only one performs the TCP work, the others await its result.
157+
158+
Partial-failure behavior (ISSUE-77): ``asyncio.gather`` with the
159+
default ``return_exceptions=False`` cancels sibling tasks on
160+
first failure but does NOT close connections that already
161+
succeeded — they leak as orphaned transports. Use
162+
``return_exceptions=True`` so every task resolves, then close
163+
survivors explicitly before re-raising the first failure.
157164
"""
158165
# Hold the lock across the gather so a second concurrent
159166
# initialize() call observes _initialized=True after the first
@@ -163,17 +170,31 @@ async def initialize(self) -> None:
163170
return
164171
if self._min_size > 0:
165172
self._size += self._min_size
166-
try:
167-
# Create min_size connections concurrently so startup
168-
# latency doesn't scale with min_size × per-connect RTT.
169-
conns = await asyncio.gather(
170-
*(self._create_connection() for _ in range(self._min_size))
171-
)
172-
except BaseException:
173+
# Create min_size connections concurrently so startup
174+
# latency doesn't scale with min_size × per-connect RTT.
175+
results = await asyncio.gather(
176+
*(self._create_connection() for _ in range(self._min_size)),
177+
return_exceptions=True,
178+
)
179+
successes: list[DqliteConnection] = []
180+
failures: list[BaseException] = []
181+
for r in results:
182+
if isinstance(r, BaseException):
183+
failures.append(r)
184+
else:
185+
successes.append(r)
186+
if failures:
187+
# Close the connections that did succeed — they are
188+
# unowned now that initialize is aborting.
189+
for conn in successes:
190+
with contextlib.suppress(BaseException):
191+
await conn.close()
173192
self._size -= self._min_size
174193
self._signal_state_change()
175-
raise
176-
for conn in conns:
194+
# Re-raise the first observed failure as the root
195+
# cause; additional failures chain as context.
196+
raise failures[0]
197+
for conn in successes:
177198
await self._pool.put(conn)
178199
self._initialized = True
179200

@@ -292,10 +313,13 @@ async def acquire(self) -> AsyncIterator[DqliteConnection]:
292313
)
293314
# Race the queue against the state-change event so any pool
294315
# state change (close, size decrement, drain) wakes waiters
295-
# promptly. Clear right before parking so we only wake on
296-
# subsequent signals; the loop top re-checks _closed.
297-
closed_event = self._get_closed_event()
298-
if not self._closed:
316+
# promptly. The check-_closed-then-clear pair runs under
317+
# the lock so a concurrent close() can't set() the event
318+
# between our read and our clear (ISSUE-74).
319+
async with self._lock:
320+
closed_event = self._get_closed_event()
321+
if self._closed:
322+
raise DqliteConnectionError("Pool is closed")
299323
closed_event.clear()
300324
get_task: asyncio.Task[DqliteConnection] = asyncio.create_task(self._pool.get())
301325
closed_task = asyncio.create_task(closed_event.wait())
@@ -337,29 +361,51 @@ async def acquire(self) -> AsyncIterator[DqliteConnection]:
337361
try:
338362
yield conn
339363
except BaseException:
340-
if conn.is_connected and not self._closed:
341-
# Connection is healthy — user code raised a non-connection error.
342-
# Roll back any open transaction, then return to pool.
343-
if not await self._reset_connection(conn):
344-
conn._pool_released = True
345-
with contextlib.suppress(Exception):
364+
# Cleanup must complete even if a second cancellation lands
365+
# mid-await (ISSUE-76). ``returned_to_queue`` tracks whether
366+
# the reservation was transferred to an in-queue connection;
367+
# if not, the ``finally`` below releases it. ``asyncio.shield``
368+
# around ``_release_reservation`` makes the decrement itself
369+
# uninterruptible so ``_size`` stays consistent under rapid-
370+
# fire cancellation.
371+
#
372+
# Order note: ``conn.close()`` has a guard that returns early
373+
# when ``_pool_released`` is True (so user code can't
374+
# accidentally close a connection they returned to the
375+
# pool). The pool itself MUST therefore close BEFORE setting
376+
# the flag, or the transport leaks.
377+
returned_to_queue = False
378+
try:
379+
if conn.is_connected and not self._closed:
380+
# Connection is healthy — user code raised a non-connection
381+
# error. Roll back any open transaction, then return to pool.
382+
if await self._reset_connection(conn):
383+
try:
384+
self._pool.put_nowait(conn)
385+
except asyncio.QueueFull:
386+
with contextlib.suppress(Exception):
387+
await conn.close()
388+
conn._pool_released = True
389+
else:
390+
conn._pool_released = True
391+
returned_to_queue = True
392+
else:
393+
with contextlib.suppress(Exception):
394+
await conn.close()
395+
conn._pool_released = True
396+
else:
397+
# Connection is broken (invalidated by execute/fetch error
398+
# handlers). Drain other idle connections — they likely
399+
# point to the same dead server.
400+
with contextlib.suppress(BaseException):
401+
await self._drain_idle()
402+
with contextlib.suppress(BaseException):
346403
await conn.close()
347-
await self._release_reservation()
348-
raise
349-
conn._pool_released = True
350-
try:
351-
self._pool.put_nowait(conn)
352-
except asyncio.QueueFull:
353-
await conn.close()
354-
await self._release_reservation()
355-
else:
356-
# Connection is broken (invalidated by execute/fetch error handlers).
357-
# Drain other idle connections — they likely point to the same dead server.
358-
conn._pool_released = True
359-
await self._drain_idle()
360-
with contextlib.suppress(BaseException):
361-
await conn.close()
362-
await self._release_reservation()
404+
conn._pool_released = True
405+
finally:
406+
if not returned_to_queue:
407+
with contextlib.suppress(BaseException):
408+
await asyncio.shield(self._release_reservation())
363409
raise
364410
else:
365411
await self._release(conn)
@@ -402,26 +448,38 @@ async def _reset_connection(self, conn: DqliteConnection) -> bool:
402448
return True
403449

404450
async def _release(self, conn: DqliteConnection) -> None:
405-
"""Return a connection to the pool or close it."""
451+
"""Return a connection to the pool or close it.
452+
453+
``conn.close()`` has an early-return guard against
454+
``_pool_released=True``, so close MUST run before the flag is
455+
set — otherwise the transport leaks (ISSUE-76 review found
456+
this bug across every branch that closes a pool-owned
457+
connection).
458+
"""
406459
if self._closed:
407-
conn._pool_released = True
408460
await conn.close()
461+
conn._pool_released = True
409462
await self._release_reservation()
410463
return
411464

412465
if not await self._reset_connection(conn):
413-
conn._pool_released = True
414466
with contextlib.suppress(Exception):
415467
await conn.close()
468+
conn._pool_released = True
416469
await self._release_reservation()
417470
return
418471

419-
conn._pool_released = True
472+
# Healthy connection returning to queue: no close; just flip
473+
# the flag and enqueue. If the queue is full we must close
474+
# before setting the flag.
420475
try:
421476
self._pool.put_nowait(conn)
422477
except asyncio.QueueFull:
423478
await conn.close()
479+
conn._pool_released = True
424480
await self._release_reservation()
481+
else:
482+
conn._pool_released = True
425483

426484
async def execute(self, sql: str, params: Sequence[Any] | None = None) -> tuple[int, int]:
427485
"""Execute a SQL statement using a pooled connection."""

0 commit comments

Comments
 (0)