Skip to content

Commit 778e10b

Browse files
Fail loud on _close_impl re-snapshot cap exhaustion
The cycle-27 CC2 fix added a bounded re-snapshot loop to _close_impl with a cap of 3, with the comment "Cap at 3 to fail loudly on a pathological feedback loop rather than spin." The for-loop had no ``else:`` clause — when the cap exhausted (a racing _invalidate kept creating fresh pending_drain tasks each iteration), the loop silently broke with self._pending_drain still pointing at a live task, defeating the comment's "fail loudly" promise: the residual task was orphaned and surfaced as "Task was destroyed but it is pending" at GC. Add the ``else:`` clause: cancel the residual task (best-effort) so no orphan diagnostic fires, null out self._pending_drain to keep the post-condition consistent with the break path, and log a WARNING so operators see the pathological feedback loop in production logs. Hoist the cap into a named ``_RESNAPSHOT_CAP`` constant so future tuning changes are obvious. Pin the WARNING firing, the residual cancellation, and the absence of "Task was destroyed" diagnostics under an adversarial __await__ that re-plants a fresh non-done pending each iteration. Without the fail-loud branch the warning never fires. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 820716e commit 778e10b

2 files changed

Lines changed: 115 additions & 1 deletion

File tree

src/dqliteclient/connection.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1227,7 +1227,8 @@ async def _close_impl(self) -> None:
12271227
# few rounds the racing _invalidate sees ``_protocol is None``
12281228
# and the cycle terminates). Cap at 3 to fail loudly on a
12291229
# pathological feedback loop rather than spin.
1230-
for _attempt in range(3):
1230+
_RESNAPSHOT_CAP = 3
1231+
for _attempt in range(_RESNAPSHOT_CAP):
12311232
pending = self._pending_drain
12321233
self._pending_drain = None
12331234
if pending is None or pending.done():
@@ -1244,6 +1245,27 @@ async def _close_impl(self) -> None:
12441245
# would be lost forever if absorbed here.
12451246
with contextlib.suppress(Exception, asyncio.CancelledError):
12461247
await pending
1248+
else:
1249+
# Cap exhausted: a racing ``_invalidate`` keeps creating
1250+
# fresh ``_pending_drain`` tasks each iteration. The
1251+
# comment block above promised "fail loudly"; without an
1252+
# explicit log/cancel here, ``self._pending_drain`` would
1253+
# remain set and the residual task would still be
1254+
# orphaned at GC. Cancel it (best-effort) and log a
1255+
# WARNING so operators see the pathological feedback
1256+
# loop in production logs.
1257+
stuck = self._pending_drain
1258+
if stuck is not None and not stuck.done():
1259+
stuck.cancel()
1260+
self._pending_drain = None
1261+
logger.warning(
1262+
"DqliteConnection._close_impl: _pending_drain still set after "
1263+
"%d re-snapshot iterations; cancelling residual task to avoid "
1264+
"'Task was destroyed but it is pending' at GC. This indicates "
1265+
"a pathological _invalidate feedback loop on connection id=%s.",
1266+
_RESNAPSHOT_CAP,
1267+
id(self),
1268+
)
12471269
# Mirror ``_invalidate``'s atomic clear of the transaction
12481270
# bookkeeping. Without this, a raw ``BEGIN`` followed by an
12491271
# explicit ``close()`` and a reconnect on the same instance

tests/test_close_impl_reaps_pending_drain_created_during_await.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,98 @@ def _race_invalidate_callback() -> None:
9393
raise AssertionError(f"Expected no orphaned-task diagnostics; got {msgs}")
9494

9595

96+
@pytest.mark.asyncio
97+
async def test_close_impl_fails_loud_when_resnapshot_cap_exhausted(
98+
caplog: pytest.LogCaptureFixture,
99+
) -> None:
100+
"""Pathological feedback loop: a racing callback re-creates a
101+
fresh ``_pending_drain`` on every iteration. The cap exhausts;
102+
the loop must (a) cancel the residual task to avoid the
103+
"Task was destroyed but it is pending" diagnostic at GC, and
104+
(b) log a WARNING so operators see the loop in production logs.
105+
106+
Pre-fix the loop's ``else:`` clause did not exist — the loop
107+
silently broke with ``_pending_drain`` still pointing at a live
108+
task and the warning the comment promised never fired.
109+
"""
110+
import logging
111+
112+
loop = asyncio.get_running_loop()
113+
114+
conn = DqliteConnection.__new__(DqliteConnection)
115+
conn._protocol = None
116+
conn._in_transaction = False
117+
conn._tx_owner = None
118+
conn._savepoint_stack = []
119+
conn._savepoint_implicit_begin = False
120+
conn._has_untracked_savepoint = False
121+
conn._invalidation_cause = None
122+
conn._bound_loop = None
123+
124+
# Adversarial: each ``await pending`` that resolves triggers a
125+
# fresh not-done task to be assigned to ``conn._pending_drain``.
126+
# Use a custom awaitable so we can precisely simulate the
127+
# pathological _invalidate feedback loop: the awaitable's
128+
# ``__await__`` yields once (giving control back to _close_impl),
129+
# then sets a fresh adversary on conn._pending_drain before
130+
# returning. This guarantees iteration N+1 ALWAYS sees a
131+
# not-done pending, exhausting the cap.
132+
fresh_tasks: list[asyncio.Task[None]] = []
133+
134+
from collections.abc import Generator
135+
from typing import Any
136+
137+
class _Adversary:
138+
def __init__(self) -> None:
139+
self._real = loop.create_task(asyncio.sleep(0.001))
140+
fresh_tasks.append(self._real)
141+
142+
def done(self) -> bool:
143+
return False # always reports not-done so loop awaits
144+
145+
def cancel(self) -> bool:
146+
return self._real.cancel()
147+
148+
def __await__(self) -> Generator[Any]:
149+
yield from self._real.__await__()
150+
# Before returning, plant a fresh adversary so iteration
151+
# N+1 sees a non-done pending.
152+
conn._pending_drain = _Adversary() # type: ignore[assignment]
153+
154+
conn._pending_drain = _Adversary() # type: ignore[assignment]
155+
156+
captured: list[dict[str, object]] = []
157+
prior_handler = loop.get_exception_handler()
158+
loop.set_exception_handler(lambda _loop, ctx: captured.append(ctx))
159+
with caplog.at_level(logging.WARNING, logger="dqliteclient.connection"):
160+
try:
161+
await conn._close_impl()
162+
await asyncio.sleep(0.05)
163+
finally:
164+
loop.set_exception_handler(prior_handler)
165+
166+
# The fail-loud WARNING must fire when the cap is exhausted.
167+
warnings_seen = [
168+
r
169+
for r in caplog.records
170+
if r.levelno == logging.WARNING and "re-snapshot iterations" in r.getMessage()
171+
]
172+
assert warnings_seen, (
173+
"_close_impl must log a WARNING when the bounded re-snapshot loop's "
174+
"cap is exhausted; without it, the comment's 'fail loudly' promise "
175+
"is unkept and operators have no signal of the feedback loop."
176+
)
177+
# The residual pending_drain must be cleared so no orphaned-task
178+
# diagnostic surfaces.
179+
assert conn._pending_drain is None
180+
asyncio_diagnostics = [
181+
ctx for ctx in captured if "Task was destroyed" in str(ctx.get("message", ""))
182+
]
183+
if asyncio_diagnostics:
184+
msgs = [ctx.get("message") for ctx in asyncio_diagnostics]
185+
raise AssertionError(f"Expected no orphaned-task diagnostics; got {msgs}")
186+
187+
96188
@pytest.mark.asyncio
97189
async def test_close_impl_loop_terminates_when_invalidate_clears_protocol() -> None:
98190
"""The re-snapshot loop terminates after one iteration when the

0 commit comments

Comments
 (0)