Skip to content

Commit 4bffd78

Browse files
Pre-ping idle connections on acquire to detect peer-FIN zombies
The protocol-level is_connected flag is just self._protocol is not None — it does not detect a half-closed TCP socket. An idle pool connection that saw a clean peer FIN (graceful leader flip, intentional close from the server side) passed the gate and got handed to the caller, whose first query failed on the wire. The pool already had _socket_looks_dead — a cheap non-blocking transport peek (transport.is_closing() / reader.at_eof() / poisoned-decoder short-circuit) — but only used it on the release path, not on acquire. Extend the acquire-side liveness gate to OR in _socket_looks_dead and explicitly close the dead-but-protocol-still-set connection before draining the rest of the idle queue. Make _socket_looks_dead tolerate partial mocks via getattr on _protocol so tests that don't construct a full protocol slice still work. Update the existing _BrokenConn test fake to ship a healthy default protocol mock so the pre-ping does not preempt the broken-cleanup branch the test is actually exercising. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent e0ff5fa commit 4bffd78

3 files changed

Lines changed: 172 additions & 2 deletions

File tree

src/dqliteclient/pool.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,11 @@ def _socket_looks_dead(conn: DqliteConnection) -> bool:
7171
reader. Mocked / missing attributes default to False (assume alive) so
7272
the check never produces a false positive against well-behaved peers.
7373
"""
74-
protocol = conn._protocol
74+
# ``getattr`` (rather than direct attribute access) so the function
75+
# tolerates partial mocks that omit ``_protocol`` entirely — the
76+
# acquire-path pre-ping calls this on every dequeued conn, including
77+
# ones built by tests that don't set up a protocol.
78+
protocol = getattr(conn, "_protocol", None)
7579
if protocol is None:
7680
return True
7781
# Poisoned-decoder short-circuit: a wire desync means the next
@@ -745,12 +749,31 @@ async def acquire(self) -> AsyncIterator[DqliteConnection]:
745749

746750
# If connection is dead, discard and create a fresh one with leader discovery.
747751
# Also drain other idle connections — they likely point to the same dead server.
748-
if not conn.is_connected:
752+
#
753+
# ``is_connected`` is the protocol-level handshake-complete flag;
754+
# ``_socket_looks_dead`` is a non-blocking transport-level peek
755+
# (transport.is_closing() / reader.at_eof() / poisoned-decoder
756+
# short-circuit). An idle connection that has seen a clean peer
757+
# FIN (leader flip with graceful close) passes ``is_connected``
758+
# but trips ``_socket_looks_dead`` — without the second check,
759+
# the pool hands out a zombie connection and the user's first
760+
# query fails. The peek is one syscall; cheap to run on every
761+
# acquire.
762+
if not conn.is_connected or _socket_looks_dead(conn):
749763
logger.debug(
750764
"pool.acquire: drain-idle triggered by stale conn=%r closing_idle=%d",
751765
conn,
752766
self._pool.qsize(),
753767
)
768+
# Close the dead-but-transport-alive connection explicitly:
769+
# ``_drain_idle`` only walks the idle queue, so a connection
770+
# we just dequeued whose transport is half-closed (FIN seen)
771+
# would otherwise leak its writer. ``close()`` is safe on a
772+
# never-connected conn (``_protocol is None``) — it short-
773+
# circuits to a noop. Shielded so an outer cancel does not
774+
# leave the writer dangling.
775+
with contextlib.suppress(*_POOL_CLEANUP_EXCEPTIONS):
776+
await asyncio.shield(conn.close())
754777
# Wrap _drain_idle so a cancel mid-drain releases the dead
755778
# conn's reservation. Without this guard, a CancelledError
756779
# delivered to a checkpoint inside _drain_idle propagates

tests/test_pool_acquire_cleanup.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,21 @@ def __init__(self, close_side_effect: Any = None) -> None:
5353
self._address = "stub:9001"
5454
self._close_side_effect = close_side_effect
5555
self.close_calls = 0
56+
# Pretend healthy at acquire entry so the pre-ping does not
57+
# short-circuit before the test's user-body has a chance to
58+
# flip ``is_connected = False`` and exercise the broken-
59+
# cleanup branch. Mirrors the slice of ``DqliteConnection``
60+
# that ``_socket_looks_dead`` walks (protocol → writer/reader →
61+
# transport.is_closing / reader.at_eof).
62+
protocol = MagicMock()
63+
protocol.is_wire_coherent = True
64+
transport = MagicMock()
65+
transport.is_closing.return_value = False
66+
protocol._writer = MagicMock()
67+
protocol._writer.transport = transport
68+
protocol._reader = MagicMock()
69+
protocol._reader.at_eof.return_value = False
70+
self._protocol = protocol
5671

5772
async def close(self) -> None:
5873
self.close_calls += 1
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
"""Pin: ``ConnectionPool.acquire`` peeks the transport via
2+
``_socket_looks_dead`` *before* yielding an idle connection to the
3+
caller, so a connection that saw a clean peer FIN (leader flip with
4+
graceful close) does not get handed out as a zombie.
5+
6+
The protocol-level ``is_connected`` is just ``self._protocol is not
7+
None`` — it does NOT detect a half-closed socket. Without the
8+
transport-level peek, the user's first query after a leader flip
9+
fails on a dequeue path that the pool could have self-healed.
10+
"""
11+
12+
from __future__ import annotations
13+
14+
import asyncio
15+
from unittest.mock import AsyncMock, MagicMock
16+
17+
import pytest
18+
19+
from dqliteclient.pool import ConnectionPool
20+
21+
22+
def _alive_conn(*, dead: bool = False) -> MagicMock:
23+
"""Build a MagicMock that mimics the slice of ``DqliteConnection``
24+
the pool's acquire flow touches.
25+
26+
With ``dead=True``, the transport peek tells ``_socket_looks_dead``
27+
the connection is gone (transport.is_closing() returns True).
28+
"""
29+
conn = MagicMock()
30+
conn.is_connected = True
31+
conn.close = AsyncMock()
32+
conn._pool_released = False
33+
proto = MagicMock()
34+
proto.is_wire_coherent = True
35+
transport = MagicMock()
36+
transport.is_closing.return_value = dead
37+
proto._writer = MagicMock()
38+
proto._writer.transport = transport
39+
proto._reader = MagicMock()
40+
proto._reader.at_eof.return_value = False
41+
conn._protocol = proto
42+
return conn
43+
44+
45+
@pytest.mark.asyncio
46+
async def test_acquire_drops_dead_idle_connection_and_returns_fresh_one(
47+
monkeypatch: pytest.MonkeyPatch,
48+
) -> None:
49+
"""An idle connection whose transport is closing must not be
50+
yielded to the caller. The pool drains the idle queue and creates
51+
a fresh connection."""
52+
pool = ConnectionPool(["a:9001"], min_size=0, max_size=1)
53+
# Skip real cluster bootstrap; we only exercise the acquire path.
54+
monkeypatch.setattr("dqliteclient.pool.ConnectionPool.initialize", AsyncMock())
55+
pool._initialized = True
56+
# Seed one dead conn into the idle queue and reserve its slot.
57+
dead = _alive_conn(dead=True)
58+
pool._pool.put_nowait(dead)
59+
pool._size = 1
60+
61+
fresh = _alive_conn(dead=False)
62+
create_calls = 0
63+
64+
async def fake_create(self: object) -> MagicMock:
65+
nonlocal create_calls
66+
create_calls += 1
67+
return fresh
68+
69+
monkeypatch.setattr("dqliteclient.pool.ConnectionPool._create_connection", fake_create)
70+
71+
async with pool.acquire() as conn:
72+
assert conn is fresh, "acquire must drop the dead idle connection and yield a fresh one"
73+
assert dead.close.called, "dead connection must have been closed"
74+
assert create_calls == 1
75+
76+
77+
@pytest.mark.asyncio
78+
async def test_acquire_yields_alive_idle_connection_unchanged(
79+
monkeypatch: pytest.MonkeyPatch,
80+
) -> None:
81+
"""Regression pin: a healthy idle connection passes the peek and
82+
is yielded unchanged — the pre-ping does NOT force unnecessary
83+
reconnects."""
84+
pool = ConnectionPool(["a:9001"], min_size=0, max_size=1)
85+
monkeypatch.setattr("dqliteclient.pool.ConnectionPool.initialize", AsyncMock())
86+
pool._initialized = True
87+
alive = _alive_conn(dead=False)
88+
pool._pool.put_nowait(alive)
89+
pool._size = 1
90+
91+
create_calls = 0
92+
93+
async def fake_create(self: object) -> MagicMock:
94+
nonlocal create_calls
95+
create_calls += 1
96+
return _alive_conn()
97+
98+
monkeypatch.setattr("dqliteclient.pool.ConnectionPool._create_connection", fake_create)
99+
100+
async with pool.acquire() as conn:
101+
assert conn is alive, "alive idle connection must pass through unchanged"
102+
assert not alive.close.called
103+
assert create_calls == 0, "no fresh create when the idle connection passes the peek"
104+
105+
106+
@pytest.mark.asyncio
107+
async def test_acquire_peeks_via_eof(
108+
monkeypatch: pytest.MonkeyPatch,
109+
) -> None:
110+
"""``_socket_looks_dead`` triggers on EOF too, not just
111+
transport.is_closing()."""
112+
pool = ConnectionPool(["a:9001"], min_size=0, max_size=1)
113+
monkeypatch.setattr("dqliteclient.pool.ConnectionPool.initialize", AsyncMock())
114+
pool._initialized = True
115+
eof_conn = _alive_conn(dead=False)
116+
eof_conn._protocol._reader.at_eof.return_value = True
117+
pool._pool.put_nowait(eof_conn)
118+
pool._size = 1
119+
120+
fresh = _alive_conn()
121+
122+
async def fake_create(self: object) -> MagicMock:
123+
return fresh
124+
125+
monkeypatch.setattr("dqliteclient.pool.ConnectionPool._create_connection", fake_create)
126+
127+
async with pool.acquire() as conn:
128+
assert conn is fresh
129+
assert eof_conn.close.called
130+
131+
# Avoid leaking the asyncio queue close-task on test teardown
132+
await asyncio.sleep(0)

0 commit comments

Comments
 (0)