Skip to content

Commit 91521cd

Browse files
fix: drain idle connections when a broken connection is detected
When one connection to the dqlite server is found to be broken (e.g., after a leader change or server restart), other idle connections in the pool are likely stale too — they point to the same server. Previously, only the single broken connection was discarded. Other idle connections would be handed out and fail one-by-one, each requiring a full leader discovery + reconnect cycle. In a pool of size N, this caused N-1 unnecessary failures after a leader change. Add _drain_idle() which closes all idle connections in the queue. Call it whenever a broken connection is detected — both at checkout (dead connection replacement) and during use (connection invalidated by execute/fetch error handlers). Subsequent acquire() calls create fresh connections via leader discovery to the new leader. Closes #093 (Phase 1: bulk invalidation) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 1386afe commit 91521cd

File tree

2 files changed

+63
-1
lines changed

2 files changed

+63
-1
lines changed

src/dqliteclient/pool.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,22 @@ async def _create_connection(self) -> DqliteConnection:
7373
self._size += 1
7474
return conn
7575

76+
async def _drain_idle(self) -> None:
77+
"""Close all idle connections in the pool.
78+
79+
Called when a connection is found to be broken (e.g., after a
80+
leader change or server restart), since other idle connections
81+
are likely stale too.
82+
"""
83+
while not self._pool.empty():
84+
try:
85+
conn = self._pool.get_nowait()
86+
with contextlib.suppress(Exception):
87+
await conn.close()
88+
self._size -= 1
89+
except asyncio.QueueEmpty:
90+
break
91+
7692
@asynccontextmanager
7793
async def acquire(self) -> AsyncIterator[DqliteConnection]:
7894
"""Acquire a connection from the pool."""
@@ -113,8 +129,10 @@ async def acquire(self) -> AsyncIterator[DqliteConnection]:
113129
# Don't fail yet — loop back to re-check _size
114130
continue
115131

116-
# If connection is dead, discard and create a fresh one with leader discovery
132+
# If connection is dead, discard and create a fresh one with leader discovery.
133+
# Also drain other idle connections — they likely point to the same dead server.
117134
if not conn.is_connected:
135+
await self._drain_idle()
118136
async with self._lock:
119137
self._size -= 1
120138
conn = await self._create_connection()
@@ -134,6 +152,8 @@ async def acquire(self) -> AsyncIterator[DqliteConnection]:
134152
self._size -= 1
135153
else:
136154
# Connection is broken (invalidated by execute/fetch error handlers).
155+
# Drain other idle connections — they likely point to the same dead server.
156+
await self._drain_idle()
137157
with contextlib.suppress(BaseException):
138158
await conn.close()
139159
self._size -= 1

tests/test_pool.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,48 @@ async def tracking_create():
342342

343343
await pool.close()
344344

345+
async def test_broken_connection_drains_idle_pool(self) -> None:
346+
"""When a connection breaks, all idle connections should be drained."""
347+
pool = ConnectionPool(["localhost:9001"], min_size=3, max_size=5)
348+
349+
conns = []
350+
for _ in range(3):
351+
mock_conn = MagicMock()
352+
mock_conn.is_connected = True
353+
mock_conn.connect = AsyncMock()
354+
mock_conn.close = AsyncMock()
355+
conns.append(mock_conn)
356+
357+
new_conn = MagicMock()
358+
new_conn.is_connected = True
359+
new_conn.connect = AsyncMock()
360+
new_conn.close = AsyncMock()
361+
362+
all_conns = [*conns, new_conn]
363+
conn_iter = iter(all_conns)
364+
with patch.object(pool._cluster, "connect", side_effect=lambda **kw: next(conn_iter)):
365+
await pool.initialize()
366+
367+
assert pool._size == 3
368+
assert pool._pool.qsize() == 3
369+
370+
# Acquire a connection — the first one from the queue
371+
with pytest.raises(DqliteConnectionError):
372+
async with pool.acquire() as acquired:
373+
# Mark as broken (simulates leader change / server error)
374+
acquired.is_connected = False
375+
raise DqliteConnectionError("connection lost")
376+
377+
# The broken connection was discarded. The 2 idle connections should
378+
# also have been drained (they likely point to the same dead server).
379+
idle_closed = sum(1 for c in conns[1:] if c.close.called)
380+
assert idle_closed == 2, (
381+
f"Expected 2 idle connections to be drained, but only {idle_closed} were closed"
382+
)
383+
assert pool._pool.qsize() == 0
384+
385+
await pool.close()
386+
345387
async def test_dead_connection_triggers_leader_rediscovery(self) -> None:
346388
"""A dead connection should be replaced via leader discovery, not reconnected."""
347389
pool = ConnectionPool(["localhost:9001"], max_size=1)

0 commit comments

Comments
 (0)