Skip to content

Commit 3a1a6db

Browse files
fix: prevent pool _size from going negative on concurrent close
pool.close() previously set _size = 0 unconditionally and force-closed in-use connections. When in-flight acquire() coroutines later hit their cleanup paths, they decremented _size below zero, permanently corrupting the pool's size accounting. Change close() to only close idle connections (decrementing _size per connection) and leave in-use connections to be cleaned up by acquire()'s context manager, which already checks _closed and closes instead of returning to the pool. This eliminates the race between close() and acquire()'s cleanup paths. Also resolves #088 (connections added during close being leaked by _in_use.clear()) since close() no longer touches _in_use at all. Closes #080 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 4c3d08d commit 3a1a6db

File tree

2 files changed

+59
-30
lines changed

2 files changed

+59
-30
lines changed

src/dqliteclient/pool.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -150,17 +150,16 @@ async def close(self) -> None:
150150
while not self._pool.empty():
151151
try:
152152
conn = self._pool.get_nowait()
153-
await conn.close()
153+
with contextlib.suppress(Exception):
154+
await conn.close()
155+
self._size -= 1
154156
except asyncio.QueueEmpty:
155157
break
156158

157-
# Close in-use connections
158-
for conn in list(self._in_use):
159-
with contextlib.suppress(Exception):
160-
await conn.close()
161-
self._in_use.clear()
162-
163-
self._size = 0
159+
# In-use connections are closed by acquire()'s cleanup when they
160+
# return — the else branch checks _closed and closes instead of
161+
# returning to the pool. Force-closing them here would race with
162+
# the acquire context manager and corrupt _size (see #080).
164163

165164
async def __aenter__(self) -> "ConnectionPool":
166165
await self.initialize()

tests/test_pool.py

Lines changed: 52 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,45 @@ async def test_acquire_timeout_when_pool_exhausted(self) -> None:
185185
pass
186186

187187

188+
async def test_close_during_acquire_does_not_corrupt_size(self) -> None:
189+
"""Closing the pool while connections are in-use must not make _size negative."""
190+
import asyncio
191+
192+
pool = ConnectionPool(["localhost:9001"], max_size=2)
193+
194+
conns = []
195+
for _ in range(2):
196+
mock_conn = MagicMock()
197+
mock_conn.is_connected = True
198+
mock_conn.connect = AsyncMock()
199+
mock_conn.close = AsyncMock()
200+
conns.append(mock_conn)
201+
202+
conn_iter = iter(conns)
203+
with patch.object(pool._cluster, "connect", side_effect=lambda **kw: next(conn_iter)):
204+
await pool.initialize()
205+
206+
acquired = asyncio.Event()
207+
208+
async def hold_connection():
209+
async with pool.acquire() as _conn:
210+
acquired.set()
211+
await asyncio.sleep(10)
212+
213+
task = asyncio.create_task(hold_connection())
214+
await acquired.wait()
215+
216+
# Close pool while task holds a connection
217+
await pool.close()
218+
219+
# Cancel the task — its cleanup path will try to decrement _size
220+
task.cancel()
221+
with contextlib.suppress(asyncio.CancelledError):
222+
await task
223+
224+
# _size must never go negative
225+
assert pool._size >= 0, f"_size went negative: {pool._size}"
226+
188227
async def test_dead_conn_replacement_respects_max_size(self) -> None:
189228
"""Dead-connection replacement must not allow _size to exceed max_size."""
190229
pool = ConnectionPool(["localhost:9001"], min_size=1, max_size=2)
@@ -255,33 +294,24 @@ async def test_dead_connection_triggers_leader_rediscovery(self) -> None:
255294
# Dead connection should NOT have had connect() called (no stale reconnect)
256295
dead_conn.connect.assert_not_called()
257296

258-
async def test_close_handles_checked_out_connections(self) -> None:
259-
"""close() should close in-flight connections, not just idle ones."""
297+
async def test_close_then_return_closes_connection(self) -> None:
298+
"""Returning a connection to a closed pool should close it, not put it back."""
260299
pool = ConnectionPool(["localhost:9001"], max_size=2)
261300

262-
mock_conn1 = MagicMock()
263-
mock_conn1.is_connected = True
264-
mock_conn1.connect = AsyncMock()
265-
mock_conn1.close = AsyncMock()
266-
267-
mock_conn2 = MagicMock()
268-
mock_conn2.is_connected = True
269-
mock_conn2.connect = AsyncMock()
270-
mock_conn2.close = AsyncMock()
271-
272-
conns = iter([mock_conn1, mock_conn2])
273-
with patch.object(pool._cluster, "connect", side_effect=lambda **kw: next(conns)):
274-
await pool.initialize() # Creates mock_conn1
301+
mock_conn = MagicMock()
302+
mock_conn.is_connected = True
303+
mock_conn.connect = AsyncMock()
304+
mock_conn.close = AsyncMock()
275305

276-
# Acquire a connection (checks it out)
277-
ctx = pool.acquire()
278-
await ctx.__aenter__()
306+
with patch.object(pool._cluster, "connect", return_value=mock_conn):
307+
await pool.initialize()
279308

280-
# Close the pool while connection is checked out
281-
await pool.close()
309+
# Acquire, then close pool, then release — connection should be closed
310+
async with pool.acquire():
311+
await pool.close()
282312

283-
# The checked-out connection should have been closed
284-
mock_conn1.close.assert_called()
313+
# The connection should have been closed on return (not put back in queue)
314+
mock_conn.close.assert_called()
285315

286316

287317
class TestConnectionPoolIntegration:

0 commit comments

Comments
 (0)