Skip to content

Commit cf6733e

Browse files
fix: signal state change on capacity decrements; remove 500ms poll
acquire() used asyncio.wait_for(pool.get(), timeout=0.5) as a polling loop. With K waiters, that's 2K timer events/s and a 250 ms median wake latency even when state has changed. Extended the closed-event to a general state-change signal (same primitive, cleared-and-set per wait cycle) and fire it on every size decrement (release, full queue, drain, user-exception cleanup). Wake latency is now bounded by the actual state change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 16df9d6 commit cf6733e

2 files changed

Lines changed: 74 additions & 3 deletions

File tree

src/dqliteclient/pool.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,17 @@ def _get_closed_event(self) -> asyncio.Event:
111111
self._closed_event.set()
112112
return self._closed_event
113113

114+
def _signal_state_change(self) -> None:
115+
"""Wake any acquire() waiters to re-check pool state.
116+
117+
Reuses the closed-event path so we don't add a second primitive.
118+
acquire() clears the event right before it parks each iteration,
119+
so set()s that come through this helper reliably wake the current
120+
wait; waiters always re-check _closed at the loop top.
121+
"""
122+
if self._closed_event is not None:
123+
self._closed_event.set()
124+
114125
async def _drain_idle(self) -> None:
115126
"""Close all idle connections in the pool.
116127
@@ -129,6 +140,7 @@ async def _drain_idle(self) -> None:
129140
pass
130141
finally:
131142
self._size -= 1
143+
self._signal_state_change()
132144

133145
@asynccontextmanager
134146
async def acquire(self) -> AsyncIterator[DqliteConnection]:
@@ -167,15 +179,19 @@ async def acquire(self) -> AsyncIterator[DqliteConnection]:
167179
f"Timed out waiting for a connection from the pool "
168180
f"(max_size={self._max_size}, timeout={self._timeout}s)"
169181
)
170-
# Race the queue against the closed event so close() wakes
171-
# waiters promptly instead of leaving them on the polling loop.
182+
# Race the queue against the state-change event so any pool
183+
# state change (close, size decrement, drain) wakes waiters
184+
# promptly. Clear right before parking so we only wake on
185+
# subsequent signals; the loop top re-checks _closed.
172186
closed_event = self._get_closed_event()
187+
if not self._closed:
188+
closed_event.clear()
173189
get_task: asyncio.Task[DqliteConnection] = asyncio.create_task(self._pool.get())
174190
closed_task = asyncio.create_task(closed_event.wait())
175191
try:
176192
done, _pending = await asyncio.wait(
177193
{get_task, closed_task},
178-
timeout=min(remaining, 0.5),
194+
timeout=remaining,
179195
return_when=asyncio.FIRST_COMPLETED,
180196
)
181197
finally:
@@ -211,13 +227,15 @@ async def acquire(self) -> AsyncIterator[DqliteConnection]:
211227
with contextlib.suppress(Exception):
212228
await conn.close()
213229
self._size -= 1
230+
self._signal_state_change()
214231
raise
215232
conn._pool_released = True
216233
try:
217234
self._pool.put_nowait(conn)
218235
except asyncio.QueueFull:
219236
await conn.close()
220237
self._size -= 1
238+
self._signal_state_change()
221239
else:
222240
# Connection is broken (invalidated by execute/fetch error handlers).
223241
# Drain other idle connections — they likely point to the same dead server.
@@ -226,6 +244,7 @@ async def acquire(self) -> AsyncIterator[DqliteConnection]:
226244
with contextlib.suppress(BaseException):
227245
await conn.close()
228246
self._size -= 1
247+
self._signal_state_change()
229248
raise
230249
else:
231250
await self._release(conn)
@@ -256,13 +275,15 @@ async def _release(self, conn: DqliteConnection) -> None:
256275
conn._pool_released = True
257276
await conn.close()
258277
self._size -= 1
278+
self._signal_state_change()
259279
return
260280

261281
if not await self._reset_connection(conn):
262282
conn._pool_released = True
263283
with contextlib.suppress(Exception):
264284
await conn.close()
265285
self._size -= 1
286+
self._signal_state_change()
266287
return
267288

268289
conn._pool_released = True
@@ -271,6 +292,7 @@ async def _release(self, conn: DqliteConnection) -> None:
271292
except asyncio.QueueFull:
272293
await conn.close()
273294
self._size -= 1
295+
self._signal_state_change()
274296

275297
async def execute(self, sql: str, params: Sequence[Any] | None = None) -> tuple[int, int]:
276298
"""Execute a SQL statement using a pooled connection."""

tests/test_pool.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -736,6 +736,55 @@ async def track_exec(*args, **kwargs):
736736
assert result is False
737737
assert not exec_called, "ROLLBACK must not be sent on a dead socket"
738738

739+
async def test_acquire_wakes_on_release_without_polling_delay(self) -> None:
740+
"""When a held connection is released, a waiter must wake up
741+
promptly — not sit on the 500 ms polling cadence.
742+
"""
743+
import asyncio
744+
import time
745+
746+
from dqliteclient.connection import DqliteConnection
747+
748+
pool = ConnectionPool(["localhost:9001"], max_size=1, timeout=5.0)
749+
750+
mock_conn = MagicMock(spec=DqliteConnection)
751+
mock_conn.is_connected = True
752+
mock_conn.close = AsyncMock()
753+
mock_conn._in_transaction = False
754+
mock_conn._in_use = False
755+
mock_conn._bound_loop = None
756+
mock_conn._pool_released = False
757+
mock_conn._check_in_use = MagicMock()
758+
759+
with patch.object(pool._cluster, "connect", return_value=mock_conn):
760+
await pool.initialize()
761+
762+
holder_acquired = asyncio.Event()
763+
release_holder = asyncio.Event()
764+
765+
async def hold_connection() -> None:
766+
async with pool.acquire():
767+
holder_acquired.set()
768+
await release_holder.wait()
769+
770+
holder = asyncio.create_task(hold_connection())
771+
await holder_acquired.wait()
772+
773+
async def grab() -> float:
774+
t0 = time.monotonic()
775+
async with pool.acquire():
776+
return time.monotonic() - t0
777+
778+
waiter = asyncio.create_task(grab())
779+
await asyncio.sleep(0.05)
780+
release_holder.set()
781+
elapsed = await asyncio.wait_for(waiter, timeout=1.0)
782+
await holder
783+
784+
await pool.close()
785+
786+
assert elapsed < 0.2, f"waiter should wake immediately on release; took {elapsed:.3f}s"
787+
739788
async def test_close_wakes_waiter_promptly(self) -> None:
740789
"""A task blocked in acquire() waiting for a connection must be
741790
woken quickly when pool.close() is called — not sit on the queue

0 commit comments

Comments
 (0)