Skip to content

Commit 53de6a3

Browse files
fix: track in-use connections in pool and close them on shutdown
Pool now tracks checked-out connections in an _in_use set. close() closes both idle and in-use connections. Connections returned after close() are closed immediately instead of being put back in the queue. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent efb1ae4 commit 53de6a3

2 files changed

Lines changed: 51 additions & 5 deletions

File tree

src/dqliteclient/pool.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ def __init__(
3939

4040
self._cluster = ClusterClient.from_addresses(addresses, timeout=timeout)
4141
self._pool: asyncio.Queue[DqliteConnection] = asyncio.Queue(maxsize=max_size)
42+
self._in_use: set[DqliteConnection] = set()
4243
self._size = 0
4344
self._lock = asyncio.Lock()
4445
self._closed = False
@@ -84,6 +85,7 @@ async def acquire(self) -> AsyncIterator[DqliteConnection]:
8485
f"(max_size={self._max_size}, timeout={self._timeout}s)"
8586
) from None
8687

88+
self._in_use.add(conn)
8789
try:
8890
# Verify connection is still good
8991
if not conn.is_connected:
@@ -96,16 +98,22 @@ async def acquire(self) -> AsyncIterator[DqliteConnection]:
9698

9799
with contextlib.suppress(BaseException):
98100
await conn.close()
101+
self._in_use.discard(conn)
99102
self._size -= 1
100103
raise
101104
else:
105+
self._in_use.discard(conn)
102106
# Return to pool
103-
try:
104-
self._pool.put_nowait(conn)
105-
except asyncio.QueueFull:
106-
# Pool full, close connection
107+
if self._closed:
107108
await conn.close()
108109
self._size -= 1
110+
else:
111+
try:
112+
self._pool.put_nowait(conn)
113+
except asyncio.QueueFull:
114+
# Pool full, close connection
115+
await conn.close()
116+
self._size -= 1
109117

110118
async def execute(self, sql: str, params: Sequence[Any] | None = None) -> tuple[int, int]:
111119
"""Execute a SQL statement using a pooled connection."""
@@ -118,16 +126,25 @@ async def fetch(self, sql: str, params: Sequence[Any] | None = None) -> list[dic
118126
return await conn.fetch(sql, params)
119127

120128
async def close(self) -> None:
121-
"""Close all connections in the pool."""
129+
"""Close all connections (both idle and in-use)."""
122130
self._closed = True
123131

132+
# Close idle connections
124133
while not self._pool.empty():
125134
try:
126135
conn = self._pool.get_nowait()
127136
await conn.close()
128137
except asyncio.QueueEmpty:
129138
break
130139

140+
# Close in-use connections
141+
for conn in list(self._in_use):
142+
import contextlib
143+
144+
with contextlib.suppress(Exception):
145+
await conn.close()
146+
self._in_use.clear()
147+
131148
self._size = 0
132149

133150
async def __aenter__(self) -> "ConnectionPool":

tests/test_pool.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,35 @@ async def test_acquire_timeout_when_pool_exhausted(self) -> None:
8585
pass
8686

8787

88+
async def test_close_handles_checked_out_connections(self) -> None:
89+
"""close() should close in-flight connections, not just idle ones."""
90+
pool = ConnectionPool(["localhost:9001"], max_size=2)
91+
92+
mock_conn1 = MagicMock()
93+
mock_conn1.is_connected = True
94+
mock_conn1.connect = AsyncMock()
95+
mock_conn1.close = AsyncMock()
96+
97+
mock_conn2 = MagicMock()
98+
mock_conn2.is_connected = True
99+
mock_conn2.connect = AsyncMock()
100+
mock_conn2.close = AsyncMock()
101+
102+
conns = iter([mock_conn1, mock_conn2])
103+
with patch.object(pool._cluster, "connect", side_effect=lambda **kw: next(conns)):
104+
await pool.initialize() # Creates mock_conn1
105+
106+
# Acquire a connection (checks it out)
107+
ctx = pool.acquire()
108+
conn = await ctx.__aenter__()
109+
110+
# Close the pool while connection is checked out
111+
await pool.close()
112+
113+
# The checked-out connection should have been closed
114+
mock_conn1.close.assert_called()
115+
116+
88117
class TestConnectionPoolIntegration:
89118
"""Integration tests requiring mocked connections."""
90119

0 commit comments

Comments
 (0)