Skip to content

Commit e0ff5fa

Browse files
Forward max_attempts from ConnectionPool / create_pool to cluster.connect
ClusterClient.connect already accepts max_attempts (default 3), but the pool surface didn't expose it — pool users were stuck on the default unless they constructed a custom ClusterClient and passed it via cluster=. Operators with known-flaky networks who want to absorb more transient leader-flips inside a single acquire() call had no clean way to do it. Add max_attempts as an additive keyword-only parameter on ConnectionPool.__init__ and create_pool, validate (>= 1 if not None), and forward to the underlying cluster.connect call. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 2154906 commit e0ff5fa

3 files changed

Lines changed: 108 additions & 0 deletions

File tree

src/dqliteclient/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ async def create_pool(
118118
max_continuation_frames: int | None = _DEFAULT_MAX_CONTINUATION_FRAMES,
119119
trust_server_heartbeat: bool = False,
120120
close_timeout: float = 0.5,
121+
max_attempts: int | None = None,
121122
) -> ConnectionPool:
122123
"""Create a connection pool with automatic leader detection.
123124
@@ -148,6 +149,10 @@ async def create_pool(
148149
deployments where FIN/ACK round-trip is slower, or
149150
decrease to tighten SIGTERM-shutdown budgets. See
150151
``DqliteConnection.__init__`` for full rationale.
152+
max_attempts: Maximum leader-discovery attempts per pool
153+
connect (forwarded to ``ClusterClient.connect``). ``None``
154+
(default) uses the cluster client's default of 3. Must be
155+
``>= 1`` if not ``None``.
151156
152157
Returns:
153158
An initialized ConnectionPool
@@ -164,6 +169,7 @@ async def create_pool(
164169
max_continuation_frames=max_continuation_frames,
165170
trust_server_heartbeat=trust_server_heartbeat,
166171
close_timeout=close_timeout,
172+
max_attempts=max_attempts,
167173
)
168174
await pool.initialize()
169175
return pool

src/dqliteclient/pool.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ def __init__(
131131
max_continuation_frames: int | None = _DEFAULT_MAX_CONTINUATION_FRAMES,
132132
trust_server_heartbeat: bool = False,
133133
close_timeout: float = 0.5,
134+
max_attempts: int | None = None,
134135
) -> None:
135136
"""Initialize connection pool.
136137
@@ -192,13 +193,23 @@ def __init__(
192193
round-trip is slower, or decrease to tighten
193194
SIGTERM-shutdown budgets. See
194195
``DqliteConnection.__init__`` for full rationale.
196+
max_attempts: Maximum leader-discovery attempts per
197+
``_create_connection`` (forwarded to
198+
``ClusterClient.connect``). ``None`` (default) uses the
199+
cluster client's default of 3 — covers one leader
200+
change plus one transport hiccup. Increase only with
201+
eyes open: hiding genuine cluster instability behind
202+
a long retry loop just delays the diagnosis. Must be
203+
``>= 1`` if not ``None``.
195204
"""
196205
if min_size < 0:
197206
raise ValueError(f"min_size must be non-negative, got {min_size}")
198207
if max_size < 1:
199208
raise ValueError(f"max_size must be at least 1, got {max_size}")
200209
if min_size > max_size:
201210
raise ValueError(f"min_size ({min_size}) must not exceed max_size ({max_size})")
211+
if max_attempts is not None and max_attempts < 1:
212+
raise ValueError(f"max_attempts must be at least 1 if provided, got {max_attempts}")
202213
_validate_timeout(timeout)
203214
_validate_timeout(close_timeout, name="close_timeout")
204215
if cluster is not None and node_store is not None:
@@ -217,6 +228,7 @@ def __init__(
217228
)
218229
self._trust_server_heartbeat = trust_server_heartbeat
219230
self._close_timeout = close_timeout
231+
self._max_attempts = max_attempts
220232

221233
if cluster is not None:
222234
self._cluster = cluster
@@ -404,6 +416,7 @@ async def _create_connection(self) -> DqliteConnection:
404416
max_continuation_frames=self._max_continuation_frames,
405417
trust_server_heartbeat=self._trust_server_heartbeat,
406418
close_timeout=self._close_timeout,
419+
max_attempts=self._max_attempts,
407420
)
408421

409422
async def _release_reservation(self) -> None:
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
"""``ConnectionPool`` and ``create_pool`` must forward ``max_attempts``
2+
to the underlying ``ClusterClient.connect()`` call so operators can
3+
tune connect-retry behavior without having to construct a custom
4+
``ClusterClient`` and pass it via ``cluster=``.
5+
6+
Today the parameter exists at the cluster layer
7+
(``ClusterClient.connect(max_attempts=...)``) but is invisible from
8+
the pool surface, leaving pool users with the hard-coded default of 3.
9+
"""
10+
11+
from __future__ import annotations
12+
13+
from unittest.mock import AsyncMock
14+
15+
import pytest
16+
17+
from dqliteclient import create_pool
18+
from dqliteclient.pool import ConnectionPool
19+
20+
21+
@pytest.mark.asyncio
22+
async def test_connection_pool_forwards_max_attempts() -> None:
23+
"""``ConnectionPool(..., max_attempts=N)`` forwards N to
24+
``cluster.connect``."""
25+
pool = ConnectionPool(["a:9001", "b:9002"], max_attempts=7)
26+
fake_connect = AsyncMock(return_value=AsyncMock())
27+
pool._cluster.connect = fake_connect
28+
29+
await pool._create_connection()
30+
31+
fake_connect.assert_awaited_once()
32+
assert fake_connect.await_args is not None
33+
call_kwargs = fake_connect.await_args.kwargs
34+
assert call_kwargs.get("max_attempts") == 7
35+
36+
37+
@pytest.mark.asyncio
38+
async def test_connection_pool_default_max_attempts_is_none() -> None:
39+
"""When ``max_attempts`` is not passed, the pool forwards ``None``
40+
so ``ClusterClient.connect`` uses its built-in default."""
41+
pool = ConnectionPool(["a:9001"])
42+
fake_connect = AsyncMock(return_value=AsyncMock())
43+
pool._cluster.connect = fake_connect
44+
45+
await pool._create_connection()
46+
47+
assert fake_connect.await_args is not None
48+
call_kwargs = fake_connect.await_args.kwargs
49+
assert call_kwargs.get("max_attempts") is None
50+
51+
52+
@pytest.mark.asyncio
53+
async def test_create_pool_forwards_max_attempts(monkeypatch: pytest.MonkeyPatch) -> None:
54+
"""``create_pool(..., max_attempts=N)`` reaches the underlying
55+
cluster.connect call too. Bypass real network connect with a
56+
mock cluster on the pool itself; we just need to verify the
57+
parameter flows through."""
58+
captured: dict[str, object] = {}
59+
60+
async def fake_initialize(self: object) -> None:
61+
# Skip the bootstrap connect; we are only verifying parameter
62+
# forwarding from create_pool → ConnectionPool, not real
63+
# cluster reachability.
64+
return
65+
66+
monkeypatch.setattr("dqliteclient.pool.ConnectionPool.initialize", fake_initialize)
67+
pool = await create_pool(["a:9001"], max_attempts=5)
68+
try:
69+
70+
async def fake_connect(**kwargs: object) -> AsyncMock:
71+
captured.update(kwargs)
72+
return AsyncMock()
73+
74+
pool._cluster.connect = fake_connect # type: ignore[assignment]
75+
await pool._create_connection()
76+
assert captured.get("max_attempts") == 5
77+
finally:
78+
# ``close()`` is safe to call on a pool that did not initialize.
79+
await pool.close()
80+
81+
82+
def test_connection_pool_rejects_zero_max_attempts() -> None:
83+
with pytest.raises(ValueError, match="max_attempts"):
84+
ConnectionPool(["a:9001"], max_attempts=0)
85+
86+
87+
def test_connection_pool_rejects_negative_max_attempts() -> None:
88+
with pytest.raises(ValueError, match="max_attempts"):
89+
ConnectionPool(["a:9001"], max_attempts=-1)

0 commit comments

Comments
 (0)