Skip to content

Commit 2ef34df

Browse files
Pin cycle 22's _invalidate cancel-and-detach prior pending_drain (and fix the cause-shadowing bug surfaced by the pin)
Cycle 22 added two new behaviours to ``_invalidate`` that were never regression-tested: 1. Cancel-and-detach the prior ``_pending_drain`` task before overwriting the slot. A second ``_invalidate`` while the first task was still in-flight previously orphaned the first one, recreating "Task was destroyed but it is pending" at GC. 2. Try/except around ``loop.create_task`` so a ``RuntimeError("Event loop is closed")`` during ``engine.dispose()`` does not replace the original cancel/cause with a bare RuntimeError. Adding the test for #2 exposed a sibling bug: the except clause was ``as cause``, which Python deletes at end-of- block. The function parameter is also named ``cause``, so the name was unbound when the later ``if cause is not None and self._invalidation_cause is None`` check ran on the closed-loop arm — surfacing as ``UnboundLocalError`` instead of preserving the original cause as the cycle 22 commit intended. Rename the except binding to ``schedule_err`` so the parameter is preserved for the cause-recording branch. Add a third pin for cycle 22's clear of ``_invalidation_cause`` in ``_close_impl`` so the cached exception does not pin frame globals/locals across close → reconnect cycles (a sibling concern from ISSUE-1038). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 9fec3d4 commit 2ef34df

2 files changed

Lines changed: 159 additions & 2 deletions

File tree

src/dqliteclient/connection.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1456,18 +1456,26 @@ async def _bounded_drain() -> None:
14561456
# close() awaits it.
14571457
try:
14581458
self._pending_drain = loop.create_task(_bounded_drain())
1459-
except RuntimeError as cause:
1459+
except RuntimeError as schedule_err:
14601460
# ``loop.create_task`` raises
14611461
# RuntimeError("Event loop is closed") if the
14621462
# loop has been stopped — a real shape during
14631463
# interpreter shutdown / engine.dispose() races.
14641464
# Don't replace the original cancel/cause with a
14651465
# bare RuntimeError; log and move on with no
14661466
# pending drain.
1467+
#
1468+
# Bind to ``schedule_err`` (NOT ``cause``) so the
1469+
# except's ``as`` does not shadow the function
1470+
# parameter ``cause`` and leave it unbound for
1471+
# the ``cause is not None`` check below — Python
1472+
# deletes ``except as`` bindings at end-of-block,
1473+
# so reusing the name silently breaks a sibling
1474+
# reference further down.
14671475
logger.debug(
14681476
"Connection._invalidate: loop.create_task raised %s "
14691477
"while scheduling _bounded_drain; original cause preserved",
1470-
type(cause).__name__,
1478+
type(schedule_err).__name__,
14711479
exc_info=True,
14721480
)
14731481
self._pending_drain = None
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
"""Pin: cycle 22's cancel-and-detach guard on
2+
``DqliteConnection._invalidate``.
3+
4+
A second ``_invalidate`` call while a prior bounded-drain task is
5+
in-flight must cancel the prior task before overwriting
6+
``self._pending_drain`` with the new one. Without the cancel,
7+
the first task is orphaned (still running on the loop, no longer
8+
reachable from ``self``), and ``close()`` awaits only the second
9+
— recreating the exact "Task was destroyed but it is pending"
10+
warning the drain mechanism was added to suppress.
11+
12+
Also pins the cycle 22 ``RuntimeError("Event loop is closed")``
13+
guard around ``loop.create_task``: when the loop is dead the
14+
fallback sets ``_pending_drain = None`` and preserves the
15+
original cancel/cause instead of replacing it with a bare
16+
``RuntimeError``.
17+
18+
And pins the ``_invalidation_cause`` clear on ``_close_impl``
19+
(cycle 22) so the cached exception does not pin frame
20+
globals/locals across close → reconnect cycles.
21+
"""
22+
23+
from __future__ import annotations
24+
25+
import asyncio
26+
import logging
27+
28+
import pytest
29+
30+
from dqliteclient.connection import DqliteConnection
31+
32+
33+
def _make_conn_with_protocol() -> DqliteConnection:
34+
"""Build a DqliteConnection skeleton sufficient to drive
35+
``_invalidate`` end-to-end without a real wire connection."""
36+
conn = DqliteConnection.__new__(DqliteConnection)
37+
conn._protocol = None
38+
conn._db_id = None
39+
conn._pending_drain = None
40+
conn._invalidation_cause = None
41+
conn._in_use = False
42+
conn._in_transaction = False
43+
conn._tx_owner = None
44+
conn._savepoint_stack = []
45+
conn._closed = False # type: ignore[attr-defined]
46+
conn._close_timeout = 0.05
47+
conn._address = "test:9001"
48+
conn._bound_loop = None
49+
conn._pool_released = False
50+
return conn
51+
52+
53+
class _FakeProtocol:
54+
"""Minimal stand-in for DqliteProtocol covering the slots
55+
``_invalidate`` reads (``close()``, ``wait_closed()``)."""
56+
57+
def __init__(self) -> None:
58+
self.close_calls = 0
59+
60+
def close(self) -> None:
61+
self.close_calls += 1
62+
63+
async def wait_closed(self) -> None:
64+
# Yield once so the bounded drain has a chance to be
65+
# observed in pending state by a sibling _invalidate.
66+
await asyncio.sleep(0.5)
67+
68+
69+
@pytest.mark.asyncio
70+
async def test_second_invalidate_cancels_prior_pending_drain() -> None:
71+
conn = _make_conn_with_protocol()
72+
73+
# First invalidate: scheduling a bounded-drain task.
74+
conn._protocol = _FakeProtocol() # type: ignore[assignment]
75+
conn._invalidate()
76+
first_task = conn._pending_drain
77+
assert first_task is not None
78+
assert not first_task.done()
79+
80+
# Re-set the protocol so the second _invalidate's
81+
# ``if self._protocol is not None:`` branch runs.
82+
conn._protocol = _FakeProtocol() # type: ignore[assignment]
83+
conn._invalidate()
84+
second_task = conn._pending_drain
85+
86+
assert second_task is not None
87+
assert second_task is not first_task
88+
# The cycle 22 contract: prior task has cancel() scheduled
89+
# before the slot is overwritten. ``cancel()`` flips the task
90+
# to ``cancelling`` (not yet ``cancelled``) until the task
91+
# observes the CancelledError at its next await — pump the
92+
# loop so the cancellation lands.
93+
assert first_task.cancelling() > 0 or first_task.cancelled() or first_task.done()
94+
await asyncio.sleep(0)
95+
await asyncio.sleep(0)
96+
assert first_task.cancelled() or first_task.done()
97+
98+
99+
@pytest.mark.asyncio
100+
async def test_invalidate_with_closed_loop_preserves_cause(
101+
caplog: pytest.LogCaptureFixture,
102+
) -> None:
103+
"""``loop.create_task`` raising
104+
``RuntimeError("Event loop is closed")`` must NOT replace the
105+
original cancel/cause; cycle 22 added the try/except guard
106+
that DEBUG-logs and continues with ``_pending_drain = None``."""
107+
conn = _make_conn_with_protocol()
108+
conn._protocol = _FakeProtocol() # type: ignore[assignment]
109+
110+
# Patch the running loop's create_task to simulate the
111+
# closed-loop shape during dispose.
112+
loop = asyncio.get_running_loop()
113+
real_create_task = loop.create_task
114+
115+
def _raise_loop_closed(coro: object, **kwargs: object) -> object:
116+
# Close the coroutine to suppress "never awaited" warnings.
117+
coro.close() # type: ignore[attr-defined]
118+
raise RuntimeError("Event loop is closed")
119+
120+
loop.create_task = _raise_loop_closed # type: ignore[assignment]
121+
try:
122+
caplog.set_level(logging.DEBUG, logger="dqliteclient.connection")
123+
conn._invalidate(cause=ValueError("original cause"))
124+
finally:
125+
loop.create_task = real_create_task
126+
127+
# ``_pending_drain`` falls back to None instead of leaking the
128+
# un-scheduled coroutine; the original cause is preserved on
129+
# ``_invalidation_cause`` for downstream chaining.
130+
assert conn._pending_drain is None
131+
assert isinstance(conn._invalidation_cause, ValueError)
132+
assert any("loop.create_task" in r.message for r in caplog.records)
133+
134+
135+
@pytest.mark.asyncio
136+
async def test_close_impl_clears_invalidation_cause() -> None:
137+
"""Cycle 22 added ``self._invalidation_cause = None`` on
138+
``_close_impl`` so the cached exception does NOT pin
139+
frame globals/locals across close → reconnect cycles. A
140+
regression that drops the line re-introduces the
141+
traceback-pin defect."""
142+
conn = _make_conn_with_protocol()
143+
# Simulate a prior invalidation having stored a cause.
144+
conn._invalidation_cause = ValueError("prior failure")
145+
assert conn._invalidation_cause is not None
146+
147+
await conn._close_impl()
148+
149+
assert conn._invalidation_cause is None

0 commit comments

Comments
 (0)