Skip to content

Commit e0e67ad

Browse files
Reap leader-probe inner-shield drain Task to silence unobserved exception warning
The leader-probe writer-drain finally wrapped writer.wait_closed() in asyncio.shield(asyncio.wait_for(...)) so an outer cancel could not interrupt the bounded drain. shield's documented effect is "the caller's await raises CancelledError; the inner task keeps running." The composition leaks an unobserved exception: when the awaiter is cancelled and the inner wait_for later resolves with TimeoutError (peer unresponsive), no coroutine is awaiting the inner Task, and asyncio's task-finalisation logger emits "Task exception was never retrieved" at GC. Wrap the inner wait_for in an explicit Task and attach a done-callback that observes ``.exception()`` so the warning is reaped at the loop boundary, regardless of whether the awaiter saw the result. Mirrors the ``_clear_slot`` discipline already used by the single-flight find-leader slot map elsewhere in this file. Pin: helper exists, helper observes the exception, leader-probe finally consumes an explicit inner_drain task variable, behavioural warning-channel check that no "Task exception was never retrieved" warning fires under outer cancel mid-drain. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent ad4e760 commit e0e67ad

2 files changed

Lines changed: 148 additions & 6 deletions

File tree

src/dqliteclient/cluster.py

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,25 @@ def _truncate_error(message: str) -> str:
102102
return message[:_MAX_ERROR_MESSAGE_SNIPPET] + f"... [truncated, {len(message)} chars]"
103103

104104

105+
def _observe_drain_exception(t: asyncio.Task[None]) -> None:
106+
"""Done-callback that reaps an inner-shield drain task's exception.
107+
108+
The leader-probe writer-drain finally wraps ``writer.wait_closed()``
109+
in ``asyncio.shield(asyncio.wait_for(...))``. When the awaiter is
110+
cancelled mid-shield and the inner ``wait_for`` later fires
111+
``TimeoutError`` (or completes with any other exception), no
112+
coroutine is awaiting the inner task — asyncio's
113+
task-finalisation logger emits ``"Task exception was never
114+
retrieved"`` at GC. Calling ``.exception()`` is the canonical
115+
"I've observed this" signal so the warning never fires. Mirrors
116+
the ``_clear_slot`` discipline used by the single-flight
117+
find-leader slot map.
118+
"""
119+
if not t.cancelled():
120+
with contextlib.suppress(BaseException):
121+
t.exception()
122+
123+
105124
class ClusterClient:
106125
"""Client that discovers the current dqlite leader.
107126
@@ -543,13 +562,26 @@ async def _query_leader(
543562
# the drain to completion within its 100 ms budget even
544563
# during shutdown; the outer cancel still propagates past
545564
# this ``finally`` as expected.
546-
with contextlib.suppress(OSError, TimeoutError):
547-
await asyncio.shield(
548-
asyncio.wait_for(
549-
writer.wait_closed(),
550-
timeout=_LEADER_PROBE_DRAIN_TIMEOUT_SECONDS,
551-
)
565+
#
566+
# Wrap the inner ``wait_for`` in an explicit Task with a
567+
# done-callback that observes ``.exception()`` so the
568+
# ``TimeoutError`` (or ``CancelledError`` on shutdown) is
569+
# never an "unobserved task exception" at GC. Mirrors the
570+
# ``_clear_slot`` done-callback discipline used by the
571+
# single-flight find-leader slot map elsewhere in this
572+
# file. Without the explicit observer, an outer cancel
573+
# mid-shield orphans the inner Task with a
574+
# ``TimeoutError`` that asyncio's task-finalisation
575+
# logger emits as "Task exception was never retrieved".
576+
inner_drain: asyncio.Task[None] = asyncio.ensure_future(
577+
asyncio.wait_for(
578+
writer.wait_closed(),
579+
timeout=_LEADER_PROBE_DRAIN_TIMEOUT_SECONDS,
552580
)
581+
)
582+
inner_drain.add_done_callback(_observe_drain_exception)
583+
with contextlib.suppress(OSError, TimeoutError):
584+
await asyncio.shield(inner_drain)
553585

554586
async def connect(
555587
self,
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
"""Pin: the leader-probe ``finally`` writer drain does NOT leak an
2+
unobserved ``TimeoutError`` from the inner ``wait_for`` Task on outer
3+
cancel.
4+
5+
The drain wraps ``writer.wait_closed()`` in
6+
``asyncio.shield(asyncio.wait_for(...))``. When the awaiter is
7+
cancelled mid-shield, ``shield`` re-raises CancelledError to the
8+
awaiter while letting the inner Task survive. If the inner Task
9+
later resolves with TimeoutError (peer unresponsive) and no coroutine
10+
is awaiting the Task, asyncio's task-finalisation logger emits
11+
``"Task exception was never retrieved"`` at GC. The fix is an
12+
explicit done-callback that observes ``.exception()`` so the
13+
warning never fires.
14+
"""
15+
16+
from __future__ import annotations
17+
18+
import asyncio
19+
import contextlib
20+
import inspect
21+
import warnings
22+
23+
import pytest
24+
25+
from dqliteclient import cluster as cluster_module
26+
27+
28+
def test_observe_drain_exception_is_module_level_helper() -> None:
29+
"""Source-level pin: the helper exists. A regression that inlines
30+
it back without observation would surface here."""
31+
assert hasattr(cluster_module, "_observe_drain_exception")
32+
assert callable(cluster_module._observe_drain_exception)
33+
34+
35+
def test_observe_drain_exception_reaps_timeout_error() -> None:
36+
"""Functional pin: the helper consumes the exception so a future
37+
``t.exception()`` read does not re-raise."""
38+
39+
async def _drive() -> None:
40+
async def _slow() -> None:
41+
await asyncio.sleep(60)
42+
43+
# Build a task that resolves with TimeoutError.
44+
inner = asyncio.ensure_future(asyncio.wait_for(_slow(), timeout=0.001))
45+
# Wait for the inner to time out.
46+
with contextlib.suppress(asyncio.TimeoutError, TimeoutError):
47+
await inner
48+
# Inner is done with TimeoutError — observe via the helper.
49+
cluster_module._observe_drain_exception(inner)
50+
# ``.exception()`` on an already-observed task does not re-raise.
51+
assert isinstance(inner.exception(), TimeoutError)
52+
53+
asyncio.run(_drive())
54+
55+
56+
def test_query_leader_finally_uses_observed_drain_pattern() -> None:
57+
"""Source-level pin: the leader-probe finally wraps the inner
58+
drain in ``asyncio.ensure_future`` plus the
59+
``add_done_callback(_observe_drain_exception)`` pattern, NOT the
60+
direct ``shield(wait_for(...))`` composition that orphans the
61+
inner Task."""
62+
src = inspect.getsource(cluster_module)
63+
# The shield(...) call must consume an explicit Task variable, not
64+
# an inline asyncio.wait_for(...) — that's the regression shape.
65+
assert "asyncio.shield(inner_drain)" in src, (
66+
"Leader-probe finally must shield an explicit inner-Task variable "
67+
"with an exception-observer done-callback (NOT a composed "
68+
"shield(wait_for(...)))"
69+
)
70+
assert "add_done_callback(_observe_drain_exception)" in src
71+
72+
73+
@pytest.mark.asyncio
74+
async def test_outer_cancel_during_drain_does_not_emit_unobserved_warning() -> None:
75+
"""Behavioural pin: cancel the outer task mid-drain, let the inner
76+
task run to its TimeoutError, drain the loop, and verify NO
77+
"Task exception was never retrieved" warning fires.
78+
79+
Drives a synthetic case mirroring the production shape — a task
80+
that runs the same shielded pattern in a tight loop, with the
81+
outer cancelled mid-flight.
82+
"""
83+
84+
async def _slow() -> None:
85+
await asyncio.sleep(60)
86+
87+
async def _drain_under_shield() -> None:
88+
inner = asyncio.ensure_future(asyncio.wait_for(_slow(), timeout=0.05))
89+
inner.add_done_callback(cluster_module._observe_drain_exception)
90+
with contextlib.suppress(OSError, TimeoutError):
91+
await asyncio.shield(inner)
92+
93+
with warnings.catch_warnings(record=True) as captured:
94+
warnings.simplefilter("always")
95+
t = asyncio.create_task(_drain_under_shield())
96+
await asyncio.sleep(0.001)
97+
t.cancel()
98+
with contextlib.suppress(asyncio.CancelledError):
99+
await t
100+
# Let the inner timeout fire and finalise.
101+
await asyncio.sleep(0.1)
102+
103+
# Ensure no asyncio "Task exception was never retrieved" warning
104+
# was captured on the warnings channel.
105+
asyncio_warnings = [
106+
w for w in captured if "Task exception was never retrieved" in str(w.message)
107+
]
108+
assert not asyncio_warnings, (
109+
f"Expected no unobserved-task warnings; got {[str(w.message) for w in asyncio_warnings]}"
110+
)

0 commit comments

Comments
 (0)