Skip to content

Commit 0b0f6dd

Browse files
Reap pending-drain task created by racing _invalidate during close_impl
DqliteConnection._close_impl snapshotted self._pending_drain, nulled the slot, and awaited the snapshot task. During that await the loop runs other ready callbacks — including ``loop.call_soon_threadsafe(self._invalidate, ...)`` callbacks queued by the dbapi sync wrapper's timeout / KI arms. The racing _invalidate found self._protocol still non-None (close_impl had not yet reached the proto null-out), took the live-protocol branch, and created a NEW _pending_drain task. After the original await pending resolved, close_impl read self._protocol — now None — and short-circuited via the early-return, leaving the new drain task orphaned. asyncio emitted "Task was destroyed but it is pending" at interpreter shutdown. Wrap the snapshot-and-await in a bounded re-snapshot loop (cap 3) so a fresh _pending_drain created during the await is reaped on the next iteration. The loop terminates because the racing _invalidate also clears _protocol, so the next reachable _invalidate (if any) takes the no-protocol short-circuit and does not create another pending_drain. Cap 3 fails loudly on a pathological feedback loop rather than spin. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 3082c91 commit 0b0f6dd

2 files changed

Lines changed: 127 additions & 12 deletions

File tree

src/dqliteclient/connection.py

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1216,18 +1216,22 @@ async def _close_impl(self) -> None:
12161216
# only inside the ``_protocol is None`` branch left a done-task
12171217
# reference on the happy path indefinitely, inconsistent with
12181218
# ``connect()``'s own ``_pending_drain = None`` symmetry.
1219-
pending = self._pending_drain
1220-
self._pending_drain = None
1221-
if pending is not None and not pending.done(): # pragma: no cover
1222-
# Defensive: the pending bounded-drain task is set only
1223-
# by ``_invalidate`` and is normally already-done by
1224-
# the time ``close()`` runs (the drain is bounded by
1225-
# ``close_timeout`` and the close path is the second
1226-
# caller). Reaching here requires the rare race where
1227-
# ``_invalidate`` fires between the snapshot read and
1228-
# the second caller's resumption — verified by code
1229-
# review, not coverage.
1230-
#
1219+
# Bounded re-snapshot loop: a concurrent ``_invalidate``
1220+
# callback queued via ``loop.call_soon_threadsafe`` (the
1221+
# dbapi sync wrapper's timeout / KI arms do this) can run
1222+
# during ``await pending`` and CREATE a new ``_pending_drain``
1223+
# task on this same connection. Without re-snapshotting,
1224+
# that fresh drain task is orphaned — surfacing as
1225+
# "Task was destroyed but it is pending" at GC. The loop is
1226+
# bounded (each iteration retires one drain task; after a
1227+
# few rounds the racing _invalidate sees ``_protocol is None``
1228+
# and the cycle terminates). Cap at 3 to fail loudly on a
1229+
# pathological feedback loop rather than spin.
1230+
for _attempt in range(3):
1231+
pending = self._pending_drain
1232+
self._pending_drain = None
1233+
if pending is None or pending.done():
1234+
break
12311235
# Narrow suppress to ``(Exception, asyncio.CancelledError)``
12321236
# so KeyboardInterrupt and SystemExit propagate. Cycle 23
12331237
# noted that the previous ``BaseException`` suppress was
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
"""Pin: ``DqliteConnection._close_impl`` reaps a fresh
2+
``_pending_drain`` task that a concurrent ``_invalidate`` callback
3+
created during the prior drain's ``await pending``.
4+
5+
Without the bounded re-snapshot loop, a
6+
``loop.call_soon_threadsafe(self._invalidate, ...)`` callback queued
7+
by the dbapi sync wrapper's timeout / KI arms can run during
8+
``await pending``, find ``self._protocol`` still non-None, and create
9+
a NEW ``_pending_drain`` task. After the original ``await pending``
10+
resolves, ``_close_impl`` reads ``self._protocol`` (now None — the
11+
racing ``_invalidate`` cleared it), short-circuits, and returns —
12+
leaving the new drain task orphaned. asyncio emits
13+
"Task was destroyed but it is pending" at interpreter shutdown.
14+
"""
15+
16+
from __future__ import annotations
17+
18+
import asyncio
19+
import warnings
20+
21+
import pytest
22+
23+
from dqliteclient.connection import DqliteConnection
24+
25+
26+
@pytest.mark.asyncio
27+
async def test_close_impl_drains_late_arriving_pending_drain() -> None:
28+
"""Drive the race shape: snapshot pending=A; during ``await A``,
29+
a callback creates pending=B (a still-running task). The bounded
30+
re-snapshot loop must observe B and await it, leaving no orphan.
31+
"""
32+
loop = asyncio.get_running_loop()
33+
34+
async def _slow_drain(label: str) -> None:
35+
# Survive long enough that close_impl observes us as not done.
36+
await asyncio.sleep(0.005)
37+
38+
pending_a = loop.create_task(_slow_drain("A"))
39+
pending_b_holder: list[asyncio.Task[None]] = []
40+
41+
def _race_invalidate_callback() -> None:
42+
# Simulates the dbapi sync wrapper's
43+
# ``loop.call_soon_threadsafe(dying._invalidate, ...)`` that
44+
# runs while close_impl is awaiting the prior drain.
45+
b = loop.create_task(_slow_drain("B"))
46+
pending_b_holder.append(b)
47+
conn._pending_drain = b
48+
49+
conn = DqliteConnection.__new__(DqliteConnection)
50+
conn._pending_drain = pending_a
51+
conn._protocol = None # short-circuit the rest of close_impl
52+
conn._in_transaction = False
53+
conn._tx_owner = None
54+
conn._savepoint_stack = []
55+
conn._savepoint_implicit_begin = False
56+
conn._has_untracked_savepoint = False
57+
conn._invalidation_cause = None
58+
conn._bound_loop = None
59+
60+
# Schedule the race callback to fire while we're inside the
61+
# await pending block. ``call_soon`` runs at the next loop
62+
# iteration which is exactly when ``await pending`` yields.
63+
loop.call_soon(_race_invalidate_callback)
64+
65+
with warnings.catch_warnings(record=True) as captured:
66+
warnings.simplefilter("always")
67+
await conn._close_impl()
68+
# Let the loop drain so any orphaned task surfaces as a
69+
# "Task was destroyed but it is pending" warning at GC.
70+
await asyncio.sleep(0.05)
71+
72+
# Both A and B must be done — the loop reaped both.
73+
assert pending_a.done()
74+
assert pending_b_holder, "race callback should have run"
75+
assert pending_b_holder[0].done(), (
76+
"fresh _pending_drain task created during await must be reaped"
77+
)
78+
# No "Task was destroyed but it is pending" warnings on the
79+
# warnings channel.
80+
asyncio_warnings = [w for w in captured if "Task was destroyed" in str(w.message)]
81+
assert not asyncio_warnings, (
82+
f"Expected no orphaned-task warnings; got {[str(w.message) for w in asyncio_warnings]}"
83+
)
84+
85+
86+
@pytest.mark.asyncio
87+
async def test_close_impl_loop_terminates_when_invalidate_clears_protocol() -> None:
88+
"""The re-snapshot loop terminates after one iteration when the
89+
racing ``_invalidate`` clears ``_protocol`` (the production
90+
callback path) — no infinite spin."""
91+
loop = asyncio.get_running_loop()
92+
93+
async def _slow(label: str) -> None:
94+
await asyncio.sleep(0.001)
95+
96+
pending_a = loop.create_task(_slow("A"))
97+
98+
conn = DqliteConnection.__new__(DqliteConnection)
99+
conn._pending_drain = pending_a
100+
conn._protocol = None
101+
conn._in_transaction = False
102+
conn._tx_owner = None
103+
conn._savepoint_stack = []
104+
conn._savepoint_implicit_begin = False
105+
conn._has_untracked_savepoint = False
106+
conn._invalidation_cause = None
107+
conn._bound_loop = None
108+
109+
await conn._close_impl()
110+
assert pending_a.done()
111+
assert conn._pending_drain is None

0 commit comments

Comments
 (0)