Skip to content

Commit 7f0bfc2

Browse files
Walk gather's child tasks in initialize's finally so cancel-mid-gather doesn't leak completed conns
``ConnectionPool.initialize`` opened ``min_size`` connections concurrently via ``asyncio.gather(...)``. The post-gather result-iteration loop populated the ``successes`` list and mirrored it into ``unqueued_survivors`` so the finally could walk and close any conn that didn't make it into the pool queue. If the calling task was cancelled WHILE gather was still running (outer ``asyncio.timeout``, TaskGroup sibling failure, ``task.cancel()``), gather raised CancelledError out of the await. Control jumped from the await line directly to the finally — skipping the result-iteration loop AND the ``unqueued_survivors = list(successes)`` assignment. The finally's close sweep then iterated the default empty list, leaking every conn that had completed before the cancel. Each leaked conn carried a live TCP socket bound to the leader plus an asyncio StreamReaderProtocol task pinned by the conn's _protocol ref — the GC-eventually-warns path with no exception path to surface the leak in tests. The ``_size`` accounting was already correct (ISSUE-240 moved that into the finally). The connection-close sweep now gets the same treatment: build the child tasks explicitly with ``asyncio.create_task`` so the finally has a handle on every child regardless of where control left. Gated on ``gather_returned``: when gather completed normally the existing pop-based discipline already tracked the un-queued tail precisely; walking tasks again would re-add already-queued conns and double-close. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent b0a6ef2 commit 7f0bfc2

2 files changed

Lines changed: 125 additions & 1 deletion

File tree

src/dqliteclient/pool.py

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,13 +350,44 @@ async def initialize(self) -> None:
350350
# and closes the unqueued survivors.
351351
unqueued = self._min_size
352352
unqueued_survivors: list[DqliteConnection] = []
353+
# Build child tasks explicitly (not via the ``gather`` *
354+
# expression form) so a CancelledError raised out of
355+
# ``gather`` itself — outer ``asyncio.timeout`` / a
356+
# TaskGroup sibling failure / ``task.cancel()`` —
357+
# leaves the finally with a handle on every child's
358+
# ``_result``. The expression form discards the
359+
# already-completed children's results into the
360+
# gather's local frame; once gather raises, those
361+
# results are unreachable and the live conns are
362+
# orphaned (transports leaked until GC).
363+
#
364+
# This is the symmetric resource-discipline fix to
365+
# the ``_size`` accounting fix (ISSUE-240) — that fix
366+
# moved the bookkeeping into the finally; the
367+
# connection-close sweep now does the same by
368+
# walking the explicit task list.
369+
create_tasks: list[asyncio.Task[DqliteConnection]] = [
370+
asyncio.create_task(self._create_connection())
371+
for _ in range(self._min_size)
372+
]
373+
# Track whether ``gather`` returned normally; if it
374+
# raised CancelledError, the post-gather assignment
375+
# to ``unqueued_survivors`` (and the put-loop's pop)
376+
# never ran, so the finally must walk the explicit
377+
# task list to recover completed children's
378+
# results. When gather DID return normally, the
379+
# existing pop-based discipline already tracked the
380+
# un-queued tail precisely; walking tasks again
381+
# would re-add already-queued conns and double-close.
382+
gather_returned = False
353383
try:
354384
# Create min_size connections concurrently so startup
355385
# latency doesn't scale with min_size × per-connect RTT.
356386
results = await asyncio.gather(
357-
*(self._create_connection() for _ in range(self._min_size)),
387+
*create_tasks,
358388
return_exceptions=True,
359389
)
390+
gather_returned = True
360391
successes: list[DqliteConnection] = []
361392
failures: list[BaseException] = []
362393
for r in results:
@@ -447,6 +478,32 @@ async def initialize(self) -> None:
447478
# but never into the queue. Under a clean success
448479
# this list is empty; on partial put-loop cancel it
449480
# holds the unqueued tail.
481+
#
482+
# Cancel-during-gather recovery: ``gather``
483+
# propagates CancelledError when its caller is
484+
# cancelled, but children that already completed
485+
# have their results sitting in their tasks'
486+
# ``_result`` slot. The post-gather assignment to
487+
# ``unqueued_survivors`` did not run, so this
488+
# loop iterates an empty list. Walk
489+
# ``create_tasks`` to recover those results.
490+
# Already-failed / not-yet-done / cancelled tasks
491+
# are skipped — only successful results need
492+
# closing here. Only walks the task list if
493+
# ``gather`` raised before returning; once the
494+
# post-gather code ran, the pop-based
495+
# discipline already tracked the un-queued tail
496+
# precisely (walking again would double-close
497+
# conns already in the queue).
498+
if not gather_returned:
499+
for t in create_tasks:
500+
if not t.done() or t.cancelled():
501+
continue
502+
try:
503+
r = t.result()
504+
except BaseException:
505+
continue
506+
unqueued_survivors.append(r)
450507
for conn in unqueued_survivors:
451508
try:
452509
await conn.close()

tests/test_pool_cancellation.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,73 @@ async def _flaky_connect(**kwargs: Any) -> _FakeConn:
169169
)
170170

171171

172+
class TestInitializeCancelDuringGatherDoesNotLeakCompletedConns:
173+
"""An outer cancel landing while ``asyncio.gather`` inside
174+
``initialize`` still has children pending must close the
175+
children that already completed. Today the
176+
``unqueued_survivors`` list is populated AFTER gather returns,
177+
so a CancelledError raised out of gather skips the assignment
178+
and the finally iterates an empty list — leaking every
179+
completed conn's transport.
180+
"""
181+
182+
async def test_initialize_cancel_during_gather_closes_completed_conns(self) -> None:
183+
completed: list[_FakeConn] = []
184+
call_idx = [0]
185+
completed_event = asyncio.Event()
186+
187+
async def _half_fast_half_slow(**kwargs: Any) -> _FakeConn:
188+
i = call_idx[0]
189+
call_idx[0] += 1
190+
if i < 5:
191+
# Fast path: completes before the timeout fires.
192+
c = _FakeConn(name=f"c{i}")
193+
completed.append(c)
194+
if len(completed) == 5:
195+
completed_event.set()
196+
return c
197+
# Slow path: hangs forever (cancelled by the timeout).
198+
await asyncio.sleep(60)
199+
raise RuntimeError("unreachable")
200+
201+
cluster = MagicMock(spec=ClusterClient)
202+
cluster.connect = _half_fast_half_slow
203+
pool = ConnectionPool(
204+
addresses=["localhost:9001"],
205+
min_size=10,
206+
max_size=10,
207+
timeout=5.0,
208+
cluster=cluster,
209+
)
210+
211+
async def _wrapped() -> None:
212+
await pool.initialize()
213+
214+
# Drive the timeout / cancel while the slow children are
215+
# still pending. Wait for the fast 5 to complete first so
216+
# the leak is deterministic.
217+
init_task = asyncio.create_task(_wrapped())
218+
await completed_event.wait()
219+
await asyncio.sleep(0.01) # let gather observe the 5 results
220+
init_task.cancel()
221+
with pytest.raises(asyncio.CancelledError):
222+
await init_task
223+
224+
# Every conn that completed before the cancel must have
225+
# been effectively closed. Today (without the fix), the
226+
# unqueued_survivors list is empty in the finally because
227+
# the assignment happens AFTER gather returns.
228+
unclosed = [c for c in completed if not c.close_effective]
229+
assert not unclosed, (
230+
f"Cancel-during-gather leaked {len(unclosed)} of {len(completed)} "
231+
f"completed conns: {[c.name for c in unclosed]}. The pool's "
232+
"finally must walk gather's child tasks (not just the post-gather "
233+
"successes list) to reach completed-but-unqueued conns."
234+
)
235+
# _size must also be drained (the existing ISSUE-240 fix).
236+
assert pool._size == 0
237+
238+
172239
class TestAcquireCancellationRestoresSize:
173240
"""A caller cancelled while parked in acquire() must not leak the
174241
pool reservation. If a connection was pulled off the queue and

0 commit comments

Comments
 (0)