Skip to content

Commit e9eeaf0

Browse files
fix: initialize pool connections concurrently
initialize() used a serial for-loop, so create_pool(min_size=N) blocked for N × per-connect RTT. Even a cold leader lookup stretched this into seconds. Use asyncio.gather to create all min_size connections concurrently instead. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent cf6733e commit e9eeaf0

2 files changed

Lines changed: 41 additions & 3 deletions

File tree

src/dqliteclient/pool.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,16 @@ async def initialize(self) -> None:
9292
async with self._lock:
9393
if self._initialized:
9494
return
95-
for _ in range(self._min_size):
96-
conn = await self._create_connection()
97-
await self._pool.put(conn)
95+
# Create min_size connections concurrently so pool startup
96+
# latency doesn't scale with min_size × per-connect RTT. If any
97+
# one fails, cancel the rest and propagate; leave _initialized
98+
# False so the caller can retry.
99+
if self._min_size > 0:
100+
conns = await asyncio.gather(
101+
*(self._create_connection() for _ in range(self._min_size))
102+
)
103+
for conn in conns:
104+
await self._pool.put(conn)
98105
self._initialized = True
99106

100107
async def _create_connection(self) -> DqliteConnection:

tests/test_pool.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -843,6 +843,37 @@ async def try_acquire() -> BaseException | None:
843843
f"acquire() should wake within ~100ms of pool.close(); took {elapsed:.3f}s"
844844
)
845845

846+
async def test_initialize_creates_min_size_in_parallel(self) -> None:
847+
"""initialize() should connect min_size in parallel, not sequentially.
848+
With min_size=5 and per-connect latency 0.1s, serial init takes ~0.5s;
849+
parallel should finish in ~0.1s.
850+
"""
851+
import asyncio
852+
import time
853+
854+
pool = ConnectionPool(["localhost:9001"], min_size=5, max_size=10)
855+
856+
async def slow_connect(**kwargs):
857+
await asyncio.sleep(0.1)
858+
mock_conn = MagicMock()
859+
mock_conn.is_connected = True
860+
mock_conn.close = AsyncMock()
861+
return mock_conn
862+
863+
with patch.object(pool._cluster, "connect", side_effect=slow_connect):
864+
t0 = time.monotonic()
865+
await pool.initialize()
866+
elapsed = time.monotonic() - t0
867+
868+
assert pool._size == 5
869+
assert elapsed < 0.3, (
870+
f"expected parallel init (~0.1s), got {elapsed:.3f}s "
871+
f"— looks like the connections were created sequentially"
872+
)
873+
874+
await pool.close()
875+
876+
846877
async def test_reset_connection_returns_false_on_cancelled_error(self) -> None:
847878
"""_reset_connection must return False (not raise) when ROLLBACK is cancelled."""
848879
import asyncio

0 commit comments

Comments
 (0)