Skip to content

Commit bb853ff

Browse files
Cluster + protocol robustness: validation, anchored substrings, IPv6 zones
Six client-layer correctness gaps closed in one bundle: - protocol.interrupt() drain now treats ResultResponse and EmptyResponse as terminal. The EXEC / EXEC_SQL done-callback emits ResultResponse before the EmptyResponse acknowledgement when the interrupted statement is on the EXEC side; without this branch a cancel landing on EXEC poisoned the wire with "Expected EmptyResponse" ProtocolError. - protocol.prepare() now cross-validates StmtResponse.db_id against the requested db_id. Mismatch is a ProtocolError that invalidates the connection — silent registry drift would route writes to the wrong DB. - NO_TRANSACTION_MESSAGE_SUBSTRINGS: drop the bare "cannot rollback" token. The remaining "no transaction is active" anchor covers both upstream wordings ("cannot rollback - no transaction is active" and "cannot commit - no transaction is active") and is not subject to the DQLITE_ERROR=1 / SQLITE_ERROR=1 collision — any unrelated code-1 message containing "cannot rollback" was previously triggering silent-swallow. - MemoryNodeStore now strips/dedups/rejects empty seed entries. A typoed "localhost:9001\n" used to leak ValueError through find_leader's narrow exception filter; duplicates inflated the probe count and the per-node error lines. - _parse_address rejects pathological IPv6 zone-id shapes (empty, whitespace-containing, slash-containing) at the strict-bracket parser instead of letting them slip past to the regex-fallback's "not a valid hostname or IP literal" diagnostic. - README documents fork-after-init contract (gunicorn --preload, multiprocessing, Celery prefork pool guidance); pool.initialize() docstring documents the parallel-warm-up shape and the leader-side serialisation cost. Tests updated for the anchored-substring contract. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent b09a5a3 commit bb853ff

7 files changed

Lines changed: 140 additions & 12 deletions

File tree

README.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,41 @@ async with pool.acquire() as conn:
4343
rows = await conn.fetch("SELECT 1")
4444
```
4545

46+
The pool issues `min_size` connection handshakes in parallel during
47+
`initialize()`. All initial connects target whichever node the
48+
cluster client identifies as the current leader, so the leader
49+
serialises its incoming-connection acceptance — `min_size=N` does
50+
NOT speed up startup linearly with N. A balanced default
51+
(`min_size=1` or low single digits) keeps cold-start latency
52+
predictable; raise it only when steady-state concurrency demands
53+
warm connections at engine startup.
54+
55+
## Forking and multiprocessing
56+
57+
Connections, pools, and the cluster client are not safe to use
58+
across `os.fork()`. The library detects fork-after-init and raises
59+
`InterfaceError` from any operation in the child process; the
60+
inherited TCP socket would otherwise be shared with the parent
61+
(writes would interleave on the wire) and asyncio primitives are
62+
bound to the parent's event loop.
63+
64+
Common deployment patterns that fork after import:
65+
66+
- **gunicorn** with `--preload`: workers inherit pools created in
67+
the parent. Move pool creation into a per-worker `post_fork`
68+
hook (gunicorn `post_fork` config) instead of the module top
69+
level.
70+
- **multiprocessing**: child processes must reconstruct
71+
connections / pools from configuration (addresses, database
72+
name) rather than receive a parent-built object.
73+
- **Celery prefork pool**: each worker process must create its
74+
own pool inside the worker init signal, not at module load.
75+
76+
The fork detection is best-effort (pid mismatch); silent
77+
double-FIN on the parent's socket is a real risk if the guard is
78+
bypassed (e.g. via `__new__` to skip `__init__`). Just create the
79+
pool / connection in the child process you intend to use it from.
80+
4681
## Layering
4782

4883
`dqlite-client` is the low-level async wire client. Most applications

src/dqliteclient/connection.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -710,6 +710,26 @@ def parse_address(address: str) -> tuple[str, int]:
710710
# by non-hex characters intact, so the no-encoding path
711711
# round-trips byte-for-byte.
712712
host = host[:zone_sep] + unquote(host[zone_sep:])
713+
# Reject pathological zone-id shapes here (rather than
714+
# letting them slip past the IPv6 parse and fall through
715+
# to the regex-fallback "not a valid hostname or IP
716+
# literal" diagnostic, which is a different code path
717+
# with a different error message). Empty zone, embedded
718+
# whitespace, or path-like characters are not valid
719+
# RFC 6874 zone identifiers.
720+
zone = host[zone_sep + 1 :]
721+
if not zone:
722+
raise ValueError(
723+
f"Bracket syntax in {address!r} has an empty IPv6 zone "
724+
f"identifier (after '%'); supply a zone like '%eth0' "
725+
f"or remove the '%'"
726+
)
727+
if any(c.isspace() or c == "/" for c in zone):
728+
raise ValueError(
729+
f"Bracket syntax in {address!r} has an invalid IPv6 "
730+
f"zone identifier {zone!r}; zone IDs must not contain "
731+
f"whitespace or '/'"
732+
)
713733

714734
# Strict bracket discipline: validate the contents are an
715735
# IPv6 literal. ``ipaddress.ip_address`` does not accept

src/dqliteclient/node_store.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,32 @@ def __init__(
9797
raise TypeError("Pass only one of 'addresses' or 'initial_addresses'")
9898
seed = addresses if addresses is not None else initial_addresses
9999
if seed:
100+
# Strip leading/trailing whitespace and reject empty
101+
# entries up front. Without this, a typoed seed like
102+
# ``["localhost:9001\n"]`` (e.g. read from a config
103+
# file) leaks ``ValueError`` through the sweep's narrow
104+
# exception filter and surfaces deep inside
105+
# ``find_leader``. Deduplicate while preserving order
106+
# so a config with the same address twice doesn't
107+
# double the probe count and double the per-node
108+
# error lines in the failure-aggregate message.
109+
seen: set[str] = set()
110+
unique: list[str] = []
111+
for raw in seed:
112+
if not isinstance(raw, str):
113+
raise TypeError(
114+
f"NodeStore addresses must be 'host:port' strings, got {type(raw).__name__}"
115+
)
116+
addr = raw.strip()
117+
if not addr:
118+
raise ValueError("NodeStore addresses must be non-empty 'host:port' strings")
119+
if addr in seen:
120+
continue
121+
seen.add(addr)
122+
unique.append(addr)
100123
self._nodes: tuple[NodeInfo, ...] = tuple(
101124
NodeInfo(node_id=i + 1, address=addr, role=NodeRole.VOTER)
102-
for i, addr in enumerate(seed)
125+
for i, addr in enumerate(unique)
103126
)
104127
else:
105128
self._nodes = ()

src/dqliteclient/pool.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,17 @@ async def initialize(self) -> None:
315315
succeeded — they leak as orphaned transports. Use
316316
``return_exceptions=True`` so every task resolves, then close
317317
survivors explicitly before re-raising the first failure.
318+
319+
Warm-up shape: this issues ``min_size`` connection handshakes
320+
IN PARALLEL via ``asyncio.gather``. Every initial connect
321+
targets the current leader (after cluster discovery), so the
322+
leader-side handshake acceptance is serialised and a large
323+
``min_size`` does NOT linearly speed up startup. Diverges from
324+
Go-dqlite's pool which warms lazily; the parallel-warm shape
325+
is a deliberate trade-off: predictable cold-start memory at
326+
the cost of leader-side serialisation. Keep ``min_size`` low
327+
(single digits) unless steady-state concurrency demands warm
328+
connections at engine startup.
318329
"""
319330
if os.getpid() != self._creator_pid:
320331
raise InterfaceError(

src/dqliteclient/protocol.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,20 @@ async def prepare(self, db_id: int, sql: str) -> tuple[int, int]:
336336
f"Expected StmtResponse, got {type(response).__name__}{self._addr_suffix()}"
337337
)
338338

339+
# Defense-in-depth: confirm the server's StmtResponse echoes
340+
# the db_id we asked it to prepare against. A mismatch would
341+
# mean the server's prepared-statement registry has drifted
342+
# from the client's view, and any future exec/finalize
343+
# against the returned stmt_id would target a different
344+
# database. Surface this as a ProtocolError so the
345+
# connection is invalidated rather than silently routing
346+
# writes against the wrong DB.
347+
if response.db_id != db_id:
348+
raise ProtocolError(
349+
f"StmtResponse db_id {response.db_id} does not match "
350+
f"requested db_id {db_id}{self._addr_suffix()}"
351+
)
352+
339353
return response.stmt_id, response.num_params
340354

341355
async def finalize(self, db_id: int, stmt_id: int) -> None:
@@ -418,6 +432,18 @@ async def interrupt(self, db_id: int) -> None:
418432
response = await self._read_response(deadline=deadline)
419433
if isinstance(response, EmptyResponse):
420434
return
435+
# ``ResultResponse`` is the EXEC-side terminal: when the
436+
# interrupted statement was an EXEC / EXEC_SQL whose
437+
# done-callback emitted RESULT before the INTERRUPT
438+
# took effect, the response queue holds the RESULT
439+
# before the EmptyResponse acknowledgement. Treat it as
440+
# equivalent to EmptyResponse — the wire is in a
441+
# coherent state and the interrupt has been honoured.
442+
# Without this branch, a cancel landing on an EXEC
443+
# path would hit the "Expected EmptyResponse" arm and
444+
# poison the wire.
445+
if isinstance(response, ResultResponse):
446+
return
421447
if isinstance(response, FailureResponse):
422448
raise OperationalError(response.code, self._failure_text(response))
423449
# RowsResponse mid-drain is expected: the server's in-flight

tests/test_no_transaction_substrings_source_of_truth.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,14 @@ def test_recogniser_rejects_unrelated_message() -> None:
3636

3737

3838
def test_substring_constant_has_expected_values() -> None:
39-
"""Pin the canonical substring list — both clauses must be
40-
present so a server-version drift that drops one but not the
41-
other still triggers the recogniser. If a server change requires
42-
an update, the integration pin
43-
``test_no_transaction_error_wording.py`` (in dbapi) catches it
44-
against a live cluster."""
39+
"""Pin the canonical substring — the anchored
40+
``"no transaction is active"`` is the single source of truth.
41+
Both upstream wordings (``"cannot rollback - no transaction is
42+
active"`` and ``"cannot commit - no transaction is active"``)
43+
contain this substring; the prior bare ``"cannot rollback"``
44+
token was removed because it was too permissive (any unrelated
45+
SQLite error or DQLITE_ERROR=1 message containing those words
46+
could trigger the silent-swallow path). The integration pin
47+
``test_no_transaction_error_wording.py`` (in dbapi) catches a
48+
server-version drift against a live cluster."""
4549
assert "no transaction is active" in NO_TRANSACTION_MESSAGE_SUBSTRINGS
46-
assert "cannot rollback" in NO_TRANSACTION_MESSAGE_SUBSTRINGS

tests/test_pool_reset_no_tx_preserves_slot.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,22 @@ def test_classifier_matches_no_tx_active_with_sqlite_error_code(self) -> None:
3535
exc = OperationalError(1, "cannot rollback - no transaction is active")
3636
assert _is_no_tx_rollback_error(exc) is True
3737

38-
def test_classifier_matches_cannot_rollback_wording(self) -> None:
39-
# The substring "cannot rollback" alone is also accepted, since
40-
# the C server has historically used both wordings.
41-
exc = OperationalError(1, "cannot rollback")
38+
def test_classifier_matches_cannot_commit_wording(self) -> None:
39+
# Both upstream wordings contain "no transaction is active"
40+
# so the anchored substring covers commit and rollback
41+
# equally. The bare "cannot rollback" token was previously
42+
# also accepted but was too permissive.
43+
exc = OperationalError(1, "cannot commit - no transaction is active")
4244
assert _is_no_tx_rollback_error(exc) is True
4345

46+
def test_classifier_rejects_bare_cannot_rollback(self) -> None:
47+
# A message containing "cannot rollback" without the anchored
48+
# "no transaction is active" clause must NOT match — any
49+
# unrelated SQLite (or DQLITE_ERROR=1) error happening to
50+
# contain those words would otherwise trigger silent-swallow.
51+
exc = OperationalError(1, "cannot rollback because the disk is full")
52+
assert _is_no_tx_rollback_error(exc) is False
53+
4454
def test_classifier_rejects_no_tx_message_with_wrong_code(self) -> None:
4555
# SQLITE_BUSY (5) primary code with a coincidental no-tx wording
4656
# must NOT match — the user's tx might have been partially

0 commit comments

Comments
 (0)