Skip to content

Commit d21414f

Browse files
fix: add poisoned-state flag to ReadBuffer and MessageDecoder
After a parse failure consumes bytes from the buffer but leaves the stream at an unknown offset, continuing to decode silently desynchronizes the stream. Go's protocol.Protocol guards against this with a sticky netErr field; this adds the equivalent for the Python codec. - ReadBuffer gains poison(), is_poisoned, reset(), and an internal _check_poisoned() helper. The flag is sticky (first error wins, never overwritten) and cleared only by reset(); plain clear() does not un-poison so callers cannot accidentally reuse a dead decoder. - MessageDecoder forwards is_poisoned and reset(). decode() and decode_continuation() check poison at entry and wrap body parsing in a try/except that poisons on ANY exception (not just DecodeError / ProtocolError) before re-raising. The broader catch is intentional: decode_body implementations can raise struct.error, ValueError, UnicodeDecodeError, IndexError, etc., and any of those after the bytes have been consumed means the stream is desynchronized. - Oversized-header errors from read_message() are deliberately NOT poisoned: the bytes have not been consumed and skip_message() is the documented recovery path. Only errors from already-consumed bytes mark the decoder dead. - Server-side decoders' reset() also clears handshake state so a reconnect starts from a clean "handshake not yet received" state. decode_handshake() is intentionally NOT poison-checked here — its own bytes-leak-on-failure path is the subject of issue 027. Tests cover: unknown message type poisoning, reset un-poisons, oversized stays recoverable, non-protocol exceptions also poison, decode_continuation wrong-type poisons, first-error-wins idempotency, request-decoder reset clears handshake state. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 3100ac1 commit d21414f

3 files changed

Lines changed: 276 additions & 17 deletions

File tree

src/dqlitewire/buffer.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""Buffer utilities for streaming protocol data."""
22

33
from dqlitewire.constants import HEADER_SIZE, WORD_SIZE
4-
from dqlitewire.exceptions import DecodeError
4+
from dqlitewire.exceptions import DecodeError, ProtocolError
55

66

77
class WriteBuffer:
@@ -46,6 +46,36 @@ def __init__(self, max_message_size: int = DEFAULT_MAX_MESSAGE_SIZE) -> None:
4646
self._pos = 0
4747
self._max_message_size = max_message_size
4848
self._skip_remaining = 0
49+
self._poisoned: Exception | None = None
50+
51+
@property
52+
def is_poisoned(self) -> bool:
53+
"""True if a mid-stream error has marked this buffer unrecoverable."""
54+
return self._poisoned is not None
55+
56+
def poison(self, error: Exception) -> None:
57+
"""Mark the buffer as unrecoverable.
58+
59+
Call this when a decode error means the stream offset is no longer
60+
trustworthy (parsing consumed bytes but failed afterwards). Once
61+
poisoned, every public method raises ``ProtocolError`` until
62+
``reset()`` is called.
63+
"""
64+
if self._poisoned is None:
65+
self._poisoned = error
66+
67+
def reset(self) -> None:
68+
"""Clear buffer state and un-poison. Use after a reconnect."""
69+
self._data.clear()
70+
self._pos = 0
71+
self._skip_remaining = 0
72+
self._poisoned = None
73+
74+
def _check_poisoned(self) -> None:
75+
if self._poisoned is not None:
76+
raise ProtocolError(
77+
"buffer is poisoned; call reset() and reconnect"
78+
) from self._poisoned
4979

5080
def feed(self, data: bytes) -> None:
5181
"""Add received data to the buffer.

src/dqlitewire/codec.py

Lines changed: 63 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,28 @@ def version(self) -> int | None:
154154
"""Protocol version from handshake, or None if not yet received."""
155155
return self._version
156156

157+
@property
158+
def is_poisoned(self) -> bool:
159+
"""True if the decoder has hit an unrecoverable mid-stream error.
160+
161+
Once poisoned, every ``decode*`` method raises ``ProtocolError`` until
162+
``reset()`` is called. This protects callers from silently continuing
163+
to decode from an unknown offset after a parse failure desynchronized
164+
the stream.
165+
"""
166+
return self._buffer.is_poisoned
167+
168+
def reset(self) -> None:
169+
"""Reset decoder state to a fresh, un-poisoned condition.
170+
171+
Clears the read buffer and, for server-side decoders, returns the
172+
handshake state to "not yet received". Use this after a reconnect.
173+
"""
174+
self._buffer.reset()
175+
if self._is_request:
176+
self._handshake_done = False
177+
self._version = None
178+
157179
def feed(self, data: bytes) -> None:
158180
"""Feed data to the decoder."""
159181
self._buffer.feed(data)
@@ -198,42 +220,67 @@ def decode_continuation(
198220
199221
Returns None if no complete message is available.
200222
"""
223+
self._buffer._check_poisoned()
201224
data = self._buffer.read_message()
202225
if data is None:
203226
return None
204227

205-
header = Header.decode(data[:HEADER_SIZE])
206-
body = data[HEADER_SIZE : HEADER_SIZE + header.size_words * 8]
207-
208-
if header.msg_type == ResponseType.FAILURE:
209-
failure = FailureResponse.decode_body(body, schema=header.schema)
210-
raise ProtocolError(
211-
f"Server error during ROWS continuation: [{failure.code}] {failure.message}"
228+
# Bytes have been consumed — ANY parse failure here leaves the
229+
# buffer at an unknown offset and must poison the decoder. Catch
230+
# broadly: `decode_body` implementations can raise struct.error,
231+
# ValueError, UnicodeDecodeError, IndexError, etc., and all of
232+
# them mean the stream is desynchronized.
233+
try:
234+
header = Header.decode(data[:HEADER_SIZE])
235+
body = data[HEADER_SIZE : HEADER_SIZE + header.size_words * 8]
236+
237+
if header.msg_type == ResponseType.FAILURE:
238+
failure = FailureResponse.decode_body(body, schema=header.schema)
239+
raise ProtocolError(
240+
f"Server error during ROWS continuation: [{failure.code}] {failure.message}"
241+
)
242+
if header.msg_type != ResponseType.ROWS:
243+
raise ProtocolError(
244+
f"Expected ROWS continuation (type {ResponseType.ROWS}), "
245+
f"got type {header.msg_type}"
246+
)
247+
248+
return RowsResponse.decode_rows_continuation(
249+
body, column_names, column_count, max_rows=max_rows
212250
)
213-
if header.msg_type != ResponseType.ROWS:
214-
raise ProtocolError(
215-
f"Expected ROWS continuation (type {ResponseType.ROWS}), got type {header.msg_type}"
216-
)
217-
218-
return RowsResponse.decode_rows_continuation(
219-
body, column_names, column_count, max_rows=max_rows
220-
)
251+
except Exception as e:
252+
self._buffer.poison(e)
253+
raise
221254

222255
def decode(self) -> Message | None:
223256
"""Decode the next message from the buffer.
224257
225258
Returns None if no complete message is available.
226259
Raises ProtocolError if called on a request decoder before decode_handshake().
260+
Raises ProtocolError if the decoder is poisoned.
227261
"""
262+
self._buffer._check_poisoned()
228263
if not self._handshake_done:
229264
raise ProtocolError(
230265
"Protocol handshake not yet received. Call decode_handshake() before decode()."
231266
)
267+
# read_message may raise DecodeError for an oversized header. That
268+
# error is recoverable via skip_message() — the bytes have not been
269+
# consumed — so we deliberately do NOT poison on it.
232270
data = self._buffer.read_message()
233271
if data is None:
234272
return None
235273

236-
return self.decode_bytes(data)
274+
# Bytes have been consumed. ANY parse failure now leaves the
275+
# buffer at an unknown offset; poison so subsequent calls fail
276+
# fast. Catch broadly: `decode_body` implementations can raise
277+
# struct.error, ValueError, UnicodeDecodeError, IndexError, etc.,
278+
# and all of them mean the stream is desynchronized.
279+
try:
280+
return self.decode_bytes(data)
281+
except Exception as e:
282+
self._buffer.poison(e)
283+
raise
237284

238285
def decode_bytes(self, data: bytes) -> Message:
239286
"""Decode a message from bytes.

tests/test_codec.py

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -882,3 +882,185 @@ def test_skip_oversized_and_recover(self) -> None:
882882
decoded = decoder.decode()
883883
assert isinstance(decoded, FailureResponse)
884884
assert decoded.code == 42
885+
886+
887+
class TestDecoderPoisonedState:
888+
"""Once the decoder has produced a mid-stream error from already-consumed
889+
bytes, subsequent operations must fail fast with a poisoned-state error —
890+
the buffer's _pos is in an unknown position and silently continuing would
891+
corrupt downstream reads. A reset() method returns the decoder to a clean
892+
state after a reconnect.
893+
"""
894+
895+
def test_decode_poisons_on_unknown_message_type(self) -> None:
896+
"""An unknown message type surfaces via decode(), which must poison
897+
the decoder so retries fail loudly.
898+
"""
899+
import struct
900+
901+
from dqlitewire.exceptions import ProtocolError
902+
903+
decoder = MessageDecoder(is_request=False)
904+
# Valid framing, but an unregistered message type (0xFE).
905+
header = struct.pack("<IBBH", 0, 0xFE, 0, 0)
906+
decoder.feed(header)
907+
908+
assert decoder.is_poisoned is False
909+
with pytest.raises(DecodeError):
910+
decoder.decode()
911+
assert decoder.is_poisoned is True
912+
913+
# Subsequent operations fail fast with a ProtocolError from poisoning.
914+
# decode() must raise regardless of what is in the buffer.
915+
decoder.feed(struct.pack("<IBBH", 0, 0xFE, 0, 0))
916+
with pytest.raises(ProtocolError, match="poisoned"):
917+
decoder.decode()
918+
919+
def test_reset_unpoisons_decoder(self) -> None:
920+
"""reset() clears buffer state and the poison flag so the decoder
921+
can be reused after a reconnect.
922+
"""
923+
import struct
924+
925+
from dqlitewire.exceptions import ProtocolError
926+
927+
decoder = MessageDecoder(is_request=False)
928+
header = struct.pack("<IBBH", 0, 0xFE, 0, 0)
929+
decoder.feed(header)
930+
with pytest.raises(DecodeError):
931+
decoder.decode()
932+
assert decoder.is_poisoned is True
933+
934+
decoder.reset()
935+
assert decoder.is_poisoned is False
936+
937+
# Fresh valid message should decode cleanly.
938+
leader = LeaderResponse(node_id=1, address="host:1234").encode()
939+
decoder.feed(leader)
940+
result = decoder.decode()
941+
assert isinstance(result, LeaderResponse)
942+
assert result.node_id == 1
943+
944+
# Sanity: a fresh poison error references a ProtocolError ancestor too.
945+
decoder.feed(struct.pack("<IBBH", 0, 0xFE, 0, 0))
946+
with pytest.raises(ProtocolError):
947+
decoder.decode()
948+
949+
def test_oversized_header_does_not_poison(self) -> None:
950+
"""An oversized-header error from the buffer framing layer is
951+
recoverable via skip_message(); it must NOT poison because the
952+
bytes have not yet been consumed from the buffer.
953+
"""
954+
import struct
955+
956+
decoder = MessageDecoder(is_request=False)
957+
decoder._buffer._max_message_size = 128
958+
oversized = struct.pack("<IBBH", 100, 0, 0, 0) # 808-byte body > 128
959+
decoder.feed(oversized)
960+
961+
with pytest.raises(DecodeError, match="exceeds maximum"):
962+
decoder.decode()
963+
# NOT poisoned — skip_message() is the documented recovery path.
964+
assert decoder.is_poisoned is False
965+
966+
# Recovery via skip_message should still work.
967+
decoder.skip_message()
968+
decoder.feed(b"\x00" * decoder._buffer._skip_remaining)
969+
assert decoder.is_poisoned is False
970+
971+
# And a fresh message decodes afterwards.
972+
normal = FailureResponse(code=1, message="ok").encode()
973+
decoder.feed(normal)
974+
assert isinstance(decoder.decode(), FailureResponse)
975+
976+
def test_poison_catches_non_protocol_exceptions(self) -> None:
977+
"""Any exception from decode_bytes — not just DecodeError/ProtocolError —
978+
must poison the decoder. A `decode_body` implementation could raise
979+
struct.error, ValueError, UnicodeDecodeError, IndexError, etc., and
980+
all of them mean the bytes have been consumed and the offset is now
981+
unknown. Catching only the protocol exception types would let other
982+
failures leak through with the buffer in a desynchronized state.
983+
"""
984+
from dqlitewire.exceptions import ProtocolError
985+
986+
decoder = MessageDecoder(is_request=False)
987+
# Build a real message and feed it.
988+
msg = LeaderResponse(node_id=1, address="x:1").encode()
989+
decoder.feed(msg)
990+
991+
# Monkey-patch decode_bytes to raise a non-protocol exception, simulating
992+
# a bug or unexpected input in a real decode_body implementation.
993+
sentinel = ValueError("simulated body parse failure")
994+
995+
def boom(_data: bytes) -> object:
996+
raise sentinel
997+
998+
decoder.decode_bytes = boom # type: ignore[method-assign]
999+
1000+
with pytest.raises(ValueError, match="simulated body parse failure"):
1001+
decoder.decode()
1002+
assert decoder.is_poisoned is True
1003+
1004+
# Subsequent decode() must raise poisoned, even though decode_bytes
1005+
# is monkey-patched. Restore the real decode_bytes first to make
1006+
# sure the poison check fires before any decode_bytes call.
1007+
del decoder.decode_bytes # type: ignore[attr-defined]
1008+
decoder.feed(msg)
1009+
with pytest.raises(ProtocolError, match="poisoned"):
1010+
decoder.decode()
1011+
1012+
def test_decode_continuation_poisons_on_wrong_type(self) -> None:
1013+
"""decode_continuation() must poison the decoder when the next
1014+
message is not a ROWS continuation. The bytes have been consumed
1015+
from the buffer; subsequent operations cannot trust the offset.
1016+
"""
1017+
from dqlitewire.exceptions import ProtocolError
1018+
1019+
decoder = MessageDecoder(is_request=False)
1020+
# Feed a non-ROWS message where decode_continuation expects ROWS.
1021+
# An EmptyResponse is type 127, body is empty.
1022+
empty = EmptyResponse().encode()
1023+
decoder.feed(empty)
1024+
1025+
with pytest.raises(ProtocolError, match="Expected ROWS continuation"):
1026+
decoder.decode_continuation(column_names=["x"], column_count=1)
1027+
assert decoder.is_poisoned is True
1028+
1029+
# Subsequent decode() also fails poisoned.
1030+
with pytest.raises(ProtocolError, match="poisoned"):
1031+
decoder.decode()
1032+
1033+
def test_poison_is_first_error_wins(self) -> None:
1034+
"""ReadBuffer.poison() must NOT overwrite an existing poison error.
1035+
First error wins so the original __cause__ remains visible.
1036+
"""
1037+
from dqlitewire.buffer import ReadBuffer
1038+
1039+
buf = ReadBuffer()
1040+
first = DecodeError("first failure")
1041+
second = DecodeError("second failure")
1042+
buf.poison(first)
1043+
buf.poison(second)
1044+
assert buf._poisoned is first
1045+
1046+
def test_request_decoder_reset_clears_handshake_state(self) -> None:
1047+
"""For a server-side (request) decoder, reset() must also un-do the
1048+
handshake so a reconnect requires a fresh handshake. Otherwise the
1049+
decoder would silently accept message bytes as a continuation of
1050+
the dead session.
1051+
"""
1052+
decoder = MessageDecoder(is_request=True)
1053+
# Complete a handshake.
1054+
decoder.feed(PROTOCOL_VERSION.to_bytes(8, "little"))
1055+
decoder.decode_handshake()
1056+
assert decoder._handshake_done is True
1057+
assert decoder.version == PROTOCOL_VERSION
1058+
1059+
decoder.reset()
1060+
assert decoder._handshake_done is False
1061+
assert decoder.version is None
1062+
# And the decoder must refuse decode() until a fresh handshake.
1063+
from dqlitewire.exceptions import ProtocolError
1064+
1065+
with pytest.raises(ProtocolError, match="[Hh]andshake"):
1066+
decoder.decode()

0 commit comments

Comments
 (0)