Skip to content

Commit 1386afe

Browse files
fix: preserve healthy connections in pool when user code raises exceptions
The pool's acquire() context manager previously closed the connection unconditionally on any exception — including application errors like ValueError, KeyError, or constraint-violation OperationalError. The connection was perfectly healthy in these cases but got destroyed, forcing a full TCP + dqlite handshake + leader discovery to replace it. Now the except handler checks conn.is_connected before deciding: - If True (healthy): return the connection to the pool for reuse - If False (broken, already invalidated by execute/fetch handlers): close and discard This matches asyncpg's behavior, where connections are only discarded when they are actually in an error state. Closes #090 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 5ea8dc7 commit 1386afe

File tree

2 files changed

+92
-23
lines changed

2 files changed

+92
-23
lines changed

src/dqliteclient/pool.py

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -123,25 +123,36 @@ async def acquire(self) -> AsyncIterator[DqliteConnection]:
123123
try:
124124
yield conn
125125
except BaseException:
126-
# On error (including cancellation), close connection
127-
with contextlib.suppress(BaseException):
128-
await conn.close()
129-
self._in_use.discard(conn)
130-
self._size -= 1
131-
raise
132-
else:
133126
self._in_use.discard(conn)
134-
# Return to pool
135-
if self._closed:
136-
await conn.close()
137-
self._size -= 1
138-
else:
127+
if conn.is_connected and not self._closed:
128+
# Connection is healthy — user code raised a non-connection error.
129+
# Return it to the pool instead of destroying it.
139130
try:
140131
self._pool.put_nowait(conn)
141132
except asyncio.QueueFull:
142-
# Pool full, close connection
143133
await conn.close()
144134
self._size -= 1
135+
else:
136+
# Connection is broken (invalidated by execute/fetch error handlers).
137+
with contextlib.suppress(BaseException):
138+
await conn.close()
139+
self._size -= 1
140+
raise
141+
else:
142+
self._in_use.discard(conn)
143+
await self._release(conn)
144+
145+
async def _release(self, conn: DqliteConnection) -> None:
146+
"""Return a connection to the pool or close it."""
147+
if self._closed:
148+
await conn.close()
149+
self._size -= 1
150+
else:
151+
try:
152+
self._pool.put_nowait(conn)
153+
except asyncio.QueueFull:
154+
await conn.close()
155+
self._size -= 1
145156

146157
async def execute(self, sql: str, params: Sequence[Any] | None = None) -> tuple[int, int]:
147158
"""Execute a SQL statement using a pooled connection."""

tests/test_pool.py

Lines changed: 68 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,8 @@ async def fail_then_succeed(**kwargs):
132132

133133
await pool.close()
134134

135-
async def test_cancellation_does_not_leak_connection(self) -> None:
136-
"""Cancelling a task that holds a connection should clean it up."""
135+
async def test_cancellation_returns_healthy_connection_to_pool(self) -> None:
136+
"""Cancelling a task holding a healthy connection should return it to the pool."""
137137
import asyncio
138138

139139
pool = ConnectionPool(["localhost:9001"], max_size=1)
@@ -152,18 +152,21 @@ async def test_cancellation_does_not_leak_connection(self) -> None:
152152
async def hold_connection():
153153
async with pool.acquire() as _:
154154
acquired.set()
155-
await asyncio.sleep(10) # Hold forever
155+
await asyncio.sleep(10) # Hold forever — will be cancelled
156156

157157
task = asyncio.create_task(hold_connection())
158-
await acquired.wait() # Deterministic: wait until connection is acquired
158+
await acquired.wait()
159159

160160
task.cancel()
161161
with contextlib.suppress(asyncio.CancelledError):
162162
await task
163163

164-
# Connection should have been closed, size decremented
165-
mock_conn.close.assert_called()
166-
assert pool._size < initial_size
164+
# Connection is still healthy (CancelledError was in user code, not in
165+
# a protocol operation), so it should be returned to the pool
166+
assert pool._size == initial_size
167+
assert pool._pool.qsize() == 1
168+
169+
await pool.close()
167170

168171
async def test_acquire_timeout_when_pool_exhausted(self) -> None:
169172
"""acquire() should timeout, not block forever, when pool is exhausted."""
@@ -252,7 +255,9 @@ async def holder():
252255
assert conn1 is mock_conn1
253256
holder_acquired.set()
254257
await holder_release.wait()
255-
raise RuntimeError("simulated failure")
258+
# Simulate a real connection failure (broken connection)
259+
conn1.is_connected = False
260+
raise DqliteConnectionError("connection lost")
256261

257262
holder_task = asyncio.create_task(holder())
258263
await holder_acquired.wait()
@@ -273,9 +278,9 @@ async def waiter():
273278
# check (_size == max_size) and enter the queue.get() wait
274279
await asyncio.sleep(0.1)
275280

276-
# Now release the holder — it fails, closing conn and decrementing _size
281+
# Now release the holder — connection breaks, _size decrements
277282
holder_release.set()
278-
with contextlib.suppress(RuntimeError):
283+
with contextlib.suppress(DqliteConnectionError):
279284
await holder_task
280285
# At this point _size == 0. The waiter is blocked on queue.get().
281286
# With the fix, the waiter should wake up and create a new connection.
@@ -384,6 +389,59 @@ async def test_close_then_return_closes_connection(self) -> None:
384389
mock_conn.close.assert_called()
385390

386391

392+
async def test_user_exception_preserves_healthy_connection(self) -> None:
393+
"""A user-code exception should not destroy a healthy connection."""
394+
pool = ConnectionPool(["localhost:9001"], max_size=2)
395+
396+
mock_conn = MagicMock()
397+
mock_conn.is_connected = True
398+
mock_conn.connect = AsyncMock()
399+
mock_conn.close = AsyncMock()
400+
401+
with patch.object(pool._cluster, "connect", return_value=mock_conn):
402+
await pool.initialize()
403+
404+
assert pool._size == 1
405+
406+
# User code raises a non-connection error
407+
with pytest.raises(ValueError, match="application error"):
408+
async with pool.acquire():
409+
raise ValueError("application error")
410+
411+
# Connection should NOT have been closed — it's healthy
412+
mock_conn.close.assert_not_called()
413+
# Pool size should be unchanged
414+
assert pool._size == 1
415+
# Connection should be back in the pool queue
416+
assert pool._pool.qsize() == 1
417+
418+
await pool.close()
419+
420+
async def test_broken_connection_discarded_on_exception(self) -> None:
421+
"""A broken connection should be discarded even if user code raised."""
422+
pool = ConnectionPool(["localhost:9001"], max_size=2)
423+
424+
mock_conn = MagicMock()
425+
mock_conn.is_connected = True
426+
mock_conn.connect = AsyncMock()
427+
mock_conn.close = AsyncMock()
428+
429+
with patch.object(pool._cluster, "connect", return_value=mock_conn):
430+
await pool.initialize()
431+
432+
# Simulate a connection that becomes broken during use
433+
with pytest.raises(ValueError, match="user error"):
434+
async with pool.acquire() as conn:
435+
conn.is_connected = False # Mark as broken (simulates invalidation)
436+
raise ValueError("user error")
437+
438+
# Broken connection SHOULD have been closed and discarded
439+
mock_conn.close.assert_called()
440+
assert pool._size == 0
441+
442+
await pool.close()
443+
444+
387445
class TestConnectionPoolIntegration:
388446
"""Integration tests requiring mocked connections."""
389447

0 commit comments

Comments
 (0)