Skip to content

Commit 4c3d08d

Browse files
fix: prevent double pool initialization from concurrent callers
Wrap initialize() body in async with self._lock so that two concurrent callers (e.g., via asyncio.gather on __aenter__) cannot both pass the _initialized check and create 2*min_size connections. Without the lock, the await points in _create_connection() and _pool.put() allowed interleaving, causing the pool to exceed max_size or even deadlock when min_size == max_size (the second initializer blocks trying to put into a full queue). Closes #082 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 860fa24 commit 4c3d08d

2 files changed

Lines changed: 36 additions & 6 deletions

File tree

src/dqliteclient/pool.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,13 @@ def __init__(
5959

6060
async def initialize(self) -> None:
6161
"""Initialize the pool with minimum connections."""
62-
if self._initialized:
63-
return
64-
for _ in range(self._min_size):
65-
conn = await self._create_connection()
66-
await self._pool.put(conn)
67-
self._initialized = True
62+
async with self._lock:
63+
if self._initialized:
64+
return
65+
for _ in range(self._min_size):
66+
conn = await self._create_connection()
67+
await self._pool.put(conn)
68+
self._initialized = True
6869

6970
async def _create_connection(self) -> DqliteConnection:
7071
"""Create a new connection to the leader."""

tests/test_pool.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,35 @@ async def mock_connect(**kwargs):
7373

7474
await pool.close()
7575

76+
async def test_concurrent_initialize_creates_only_min_size_connections(self) -> None:
77+
"""Concurrent initialize() calls must not create 2*min_size connections."""
78+
import asyncio
79+
80+
pool = ConnectionPool(["localhost:9001"], min_size=2, max_size=5)
81+
82+
create_count = 0
83+
84+
async def mock_connect(**kwargs):
85+
nonlocal create_count
86+
create_count += 1
87+
await asyncio.sleep(0) # Yield to allow interleaving
88+
mock_conn = MagicMock()
89+
mock_conn.is_connected = True
90+
mock_conn.connect = AsyncMock()
91+
mock_conn.close = AsyncMock()
92+
return mock_conn
93+
94+
with patch.object(pool._cluster, "connect", side_effect=mock_connect):
95+
await asyncio.gather(pool.initialize(), pool.initialize())
96+
97+
assert create_count == 2, (
98+
f"Expected 2 connections (min_size), got {create_count}. "
99+
f"Concurrent initialize() created duplicate connections."
100+
)
101+
assert pool._size == 2
102+
103+
await pool.close()
104+
76105
async def test_initialize_retryable_after_failure(self) -> None:
77106
"""If initialize() fails, it should be retryable (not permanently stuck)."""
78107
pool = ConnectionPool(["localhost:9001"], min_size=1, max_size=5)

0 commit comments

Comments
 (0)