Skip to content

Commit 9db8390

Browse files
feat: split ProtocolError into specific subclasses
ProtocolError was raised at six qualitatively different sites — server-returned FAILURE, stream-desync, handshake-not-done, bad protocol version, ROWS continuation state-machine misuse, and buffer poisoning — forcing callers to string-match exception messages to recover. Add specific subclasses under ProtocolError so callers can discriminate via the exception class: - ServerFailure (with structured code/message attributes): peer sent a FAILURE response; connection is still usable. - StreamError: stream is at unknown offset; reconnect required. - PoisonedError (StreamError): buffer was poisoned by a prior failure; __cause__ carries the original exception. - HandshakeError: handshake not done, already done, or bad version. - ContinuationError: ROWS continuation state-machine misuse. Every new subclass inherits from ProtocolError, so existing `except ProtocolError` catches continue to work unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 7068a34 commit 9db8390

6 files changed

Lines changed: 335 additions & 29 deletions

File tree

src/dqlitewire/__init__.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,28 @@
5454
ResponseType,
5555
ValueType,
5656
)
57-
from dqlitewire.exceptions import DecodeError, EncodeError, ProtocolError
57+
from dqlitewire.exceptions import (
58+
ContinuationError,
59+
DecodeError,
60+
EncodeError,
61+
HandshakeError,
62+
PoisonedError,
63+
ProtocolError,
64+
ServerFailure,
65+
StreamError,
66+
)
5867

5968
__all__ = [
69+
"ContinuationError",
6070
"DecodeError",
6171
"EncodeError",
72+
"HandshakeError",
6273
"MessageDecoder",
6374
"MessageEncoder",
6475
"NodeRole",
6576
"PROTOCOL_VERSION",
6677
"PROTOCOL_VERSION_LEGACY",
78+
"PoisonedError",
6779
"ProtocolError",
6880
"ROW_DONE_BYTE",
6981
"ROW_DONE_MARKER",
@@ -72,6 +84,8 @@
7284
"ReadBuffer",
7385
"RequestType",
7486
"ResponseType",
87+
"ServerFailure",
88+
"StreamError",
7589
"ValueType",
7690
"WriteBuffer",
7791
"decode_message",

src/dqlitewire/buffer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""Buffer utilities for streaming protocol data."""
22

33
from dqlitewire.constants import HEADER_SIZE, WORD_SIZE
4-
from dqlitewire.exceptions import DecodeError, ProtocolError
4+
from dqlitewire.exceptions import DecodeError, PoisonedError
55

66
_COMPACT_THRESHOLD = 4096
77

@@ -134,7 +134,7 @@ def reset(self) -> None:
134134

135135
def _check_poisoned(self) -> None:
136136
if self._poisoned is not None:
137-
raise ProtocolError(
137+
raise PoisonedError(
138138
"buffer is poisoned; call reset() and reconnect"
139139
) from self._poisoned
140140

src/dqlitewire/codec.py

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,13 @@
88
RequestType,
99
ResponseType,
1010
)
11-
from dqlitewire.exceptions import DecodeError, ProtocolError
11+
from dqlitewire.exceptions import (
12+
ContinuationError,
13+
DecodeError,
14+
HandshakeError,
15+
ServerFailure,
16+
StreamError,
17+
)
1218
from dqlitewire.messages.base import Header, Message
1319
from dqlitewire.messages.requests import (
1420
AddRequest,
@@ -126,7 +132,7 @@ def __init__(self, version: int = PROTOCOL_VERSION) -> None:
126132
(0x86104dd760433fe5) for pre-1.0 dqlite servers.
127133
"""
128134
if version not in _SUPPORTED_VERSIONS:
129-
raise ProtocolError(
135+
raise HandshakeError(
130136
f"Unsupported protocol version: {version:#x}. "
131137
f"Supported: {', '.join(f'{v:#x}' for v in sorted(_SUPPORTED_VERSIONS))}"
132138
)
@@ -201,7 +207,7 @@ def __init__(
201207
this limit raises ``DecodeError``.
202208
"""
203209
if not is_request and version not in _SUPPORTED_VERSIONS:
204-
raise ProtocolError(
210+
raise HandshakeError(
205211
f"Unsupported protocol version: {version:#x}. "
206212
f"Supported: {', '.join(f'{v:#x}' for v in sorted(_SUPPORTED_VERSIONS))}"
207213
)
@@ -270,7 +276,7 @@ def skip_message(self) -> bool:
270276
abandon the stream.
271277
"""
272278
if self._continuation_expected:
273-
raise ProtocolError(
279+
raise ContinuationError(
274280
"Cannot skip a message while a ROWS continuation is "
275281
"in progress. Call decode_continuation() to drain, "
276282
"or reset() to abandon the stream."
@@ -300,7 +306,7 @@ def decode_continuation(self) -> RowsResponse | None:
300306
"""
301307
self._buffer._check_poisoned()
302308
if not self._continuation_expected:
303-
raise ProtocolError(
309+
raise ContinuationError(
304310
"decode_continuation() called but no ROWS continuation "
305311
"is in progress. Use decode() for the initial message."
306312
)
@@ -332,12 +338,10 @@ def decode_continuation(self) -> RowsResponse | None:
332338
if header.msg_type == ResponseType.FAILURE:
333339
self._continuation_expected = False
334340
failure = FailureResponse.decode_body(body, schema=header.schema)
335-
raise ProtocolError(
336-
f"Server error during ROWS continuation: [{failure.code}] {failure.message}"
337-
)
341+
raise ServerFailure(failure.code, failure.message)
338342
if header.msg_type != ResponseType.ROWS:
339343
self._continuation_expected = False
340-
raise ProtocolError(
344+
raise StreamError(
341345
f"Expected ROWS continuation (type {ResponseType.ROWS}), "
342346
f"got type {header.msg_type}"
343347
)
@@ -366,13 +370,13 @@ def decode(self) -> Message | None:
366370
"""
367371
self._buffer._check_poisoned()
368372
if self._continuation_expected:
369-
raise ProtocolError(
373+
raise ContinuationError(
370374
"Cannot decode a new message while a ROWS continuation "
371375
"is in progress. Call decode_continuation() until "
372376
"has_more is False, or call reset() to abandon the stream."
373377
)
374378
if not self._handshake_done:
375-
raise ProtocolError(
379+
raise HandshakeError(
376380
"Protocol handshake not yet received. Call decode_handshake() before decode()."
377381
)
378382
# read_message may raise DecodeError for an oversized header. That
@@ -421,7 +425,7 @@ def decode_bytes(self, data: bytes) -> Message:
421425
"""
422426
self._buffer._check_poisoned()
423427
if not self._handshake_done:
424-
raise ProtocolError(
428+
raise HandshakeError(
425429
"Protocol handshake not yet received. "
426430
"Call decode_handshake() before decode_bytes()."
427431
)
@@ -490,7 +494,7 @@ def decode_handshake(self) -> int | None:
490494
protocol version" error.
491495
"""
492496
if self._handshake_done:
493-
raise ProtocolError("Handshake already completed")
497+
raise HandshakeError("Handshake already completed")
494498
# Peek first so we only commit on a valid version. An invalid version
495499
# leaves the bytes in place — a retry is deterministic rather than
496500
# silently advancing into real message data.
@@ -499,7 +503,7 @@ def decode_handshake(self) -> int | None:
499503
return None
500504
version = int.from_bytes(peek, "little")
501505
if version not in _SUPPORTED_VERSIONS:
502-
raise ProtocolError(f"Unsupported protocol version: {version:#x}")
506+
raise HandshakeError(f"Unsupported protocol version: {version:#x}")
503507
# Commit state BEFORE consuming bytes. If the consume is
504508
# interrupted by an async exception, revert so the peek/commit
505509
# pair becomes atomic from the caller's perspective.

src/dqlitewire/exceptions.py

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,90 @@
1-
"""Exceptions for dqlite wire protocol."""
1+
"""Exceptions for dqlite wire protocol.
2+
3+
All exceptions inherit from ``ProtocolError`` so callers can keep a
4+
single broad ``except ProtocolError`` catch. Callers who need to
5+
distinguish recoverable server errors from fatal stream desync can
6+
catch the more specific subclasses below.
7+
"""
28

39

410
class ProtocolError(Exception):
511
"""Base exception for protocol errors."""
612

7-
pass
8-
913

1014
class EncodeError(ProtocolError):
1115
"""Error during message encoding."""
1216

13-
pass
14-
1517

1618
class DecodeError(ProtocolError):
1719
"""Error during message decoding."""
1820

19-
pass
21+
22+
class StreamError(ProtocolError):
23+
"""The stream is at an unknown offset; the connection must be rebuilt.
24+
25+
Callers catching this should close and re-establish the underlying
26+
socket. The decoder and buffer are not recoverable — any remaining
27+
buffered bytes are at an unknown protocol offset.
28+
"""
29+
30+
31+
class PoisonedError(StreamError):
32+
"""Raised by ``ReadBuffer._check_poisoned`` when the buffer was
33+
previously poisoned by a decode failure or signal interruption.
34+
35+
The original exception that caused the poisoning is available via
36+
``__cause__``.
37+
"""
38+
39+
40+
class HandshakeError(ProtocolError):
41+
"""The handshake has not been completed, has been completed with
42+
an unsupported version, or an attempt was made to re-perform an
43+
already-completed handshake. Caller should call ``decode_handshake()``
44+
before attempting to decode messages, and should only call it once.
45+
"""
46+
47+
48+
class ContinuationError(ProtocolError):
49+
"""The ROWS continuation state machine was used incorrectly — e.g.
50+
``decode()`` was called while a multipart ``RowsResponse`` was in
51+
progress, or ``decode_continuation()`` was called when no
52+
continuation was pending.
53+
54+
Recoverable: call the correct method (``decode_continuation()`` or
55+
``reset()``) and proceed.
56+
"""
57+
58+
59+
class ServerFailure(ProtocolError):
60+
"""The peer returned a FAILURE response.
61+
62+
Semantically a server-side error (e.g. SQL constraint violation,
63+
busy database, or the dqlite-specific "not leader" response) rather
64+
than a wire-level corruption. The SQLite error code is exposed
65+
programmatically so callers can distinguish recoverable conditions
66+
(like ``SQLITE_IOERR_NOT_LEADER``, which callers may handle by
67+
redirecting to the cluster leader) from terminal ones.
68+
69+
Connection state: in the normal ``decode()`` path the server
70+
returns a ``FailureResponse`` *object* (not an exception), so no
71+
state is disturbed. This exception is raised only from
72+
``decode_continuation()`` when FAILURE arrives mid-stream during
73+
a multi-part ``RowsResponse``. In that path the buffer is also
74+
poisoned (see issue 083): the remaining continuation frames the
75+
caller was expecting will never arrive, and the next decode must
76+
start fresh. Callers catching ``ServerFailure`` in that path
77+
should treat the decoder as requiring ``reset()`` and reconnect.
78+
79+
Attributes:
80+
code: SQLite error code (or extended dqlite code). Common codes
81+
include ``SQLITE_BUSY`` (5), ``SQLITE_ERROR`` (1), and the
82+
dqlite-specific ``SQLITE_IOERR_NOT_LEADER``. See the SQLite
83+
documentation for the full list.
84+
message: Human-readable error message from the server.
85+
"""
86+
87+
def __init__(self, code: int, message: str) -> None:
88+
super().__init__(f"[{code}] {message}")
89+
self.code = code
90+
self.message = message

tests/test_codec.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1120,8 +1120,16 @@ def test_decode_continuation_returns_none_when_no_data(self) -> None:
11201120
assert result is None
11211121

11221122
def test_decode_continuation_raises_on_failure_response(self) -> None:
1123-
"""decode_continuation should surface server error, not a confusing DecodeError."""
1124-
from dqlitewire.exceptions import ProtocolError
1123+
"""decode_continuation should surface server error as ServerFailure.
1124+
1125+
The exception must be a ServerFailure (subclass of ProtocolError)
1126+
carrying structured ``code`` and ``message`` attributes — NOT a
1127+
DecodeError (which would mean the failure body was misinterpreted
1128+
as row data) and NOT a bare ProtocolError (issue 230 — callers
1129+
need to distinguish recoverable server errors from fatal stream
1130+
desync).
1131+
"""
1132+
from dqlitewire.exceptions import ServerFailure
11251133
from dqlitewire.messages.responses import FailureResponse
11261134

11271135
failure = FailureResponse(code=266, message="disk I/O error")
@@ -1131,11 +1139,10 @@ def test_decode_continuation_raises_on_failure_response(self) -> None:
11311139
decoder._continuation_expected = True
11321140
decoder.feed(failure_bytes)
11331141

1134-
with pytest.raises(ProtocolError, match="disk I/O error") as exc_info:
1142+
with pytest.raises(ServerFailure, match="disk I/O error") as exc_info:
11351143
decoder.decode_continuation()
1136-
# Must be a ProtocolError, NOT a DecodeError (which would mean the
1137-
# failure body was misinterpreted as row data).
1138-
assert type(exc_info.value) is ProtocolError
1144+
assert exc_info.value.code == 266
1145+
assert exc_info.value.message == "disk I/O error"
11391146

11401147
def test_decode_continuation_raises_on_unexpected_type(self) -> None:
11411148
"""decode_continuation should raise ProtocolError for non-ROWS, non-FAILURE type."""

0 commit comments

Comments
 (0)