Skip to content

Commit c1f5794

Browse files
fix: widen decode() and decode_continuation() to except BaseException
MessageDecoder.decode() and decode_continuation() wrap their parser step in try/except Exception so that a DecodeError / ValueError / struct.error from the parser poisons the buffer before propagating. But except Exception does not catch BaseException subclasses. A signal-delivered KeyboardInterrupt landing inside decode_bytes (after read_message had already advanced _pos past the consumed message) propagated past the except block without poisoning. The caller catches the interrupt at the top level and retries; the retry reads the next message boundary, silently losing exactly one message with no diagnostic. Widen both try blocks to except BaseException. Wrap non-Exception BaseException subclasses (KeyboardInterrupt, SystemExit) in a RuntimeError before storing via poison(), which is typed Exception | None. New tests/test_codec_signal_safety.py uses sys.settrace to inject KeyboardInterrupt inside decode_bytes / decode_continuation and asserts the decoder ends up poisoned. A counter-test confirms the oversized-header DecodeError path still does NOT poison, since those bytes are not consumed and skip_message() is the recovery path. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent ea5e44a commit c1f5794

2 files changed

Lines changed: 212 additions & 14 deletions

File tree

src/dqlitewire/codec.py

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -225,11 +225,14 @@ def decode_continuation(
225225
if data is None:
226226
return None
227227

228-
# Bytes have been consumed — ANY parse failure here leaves the
229-
# buffer at an unknown offset and must poison the decoder. Catch
230-
# broadly: `decode_body` implementations can raise struct.error,
231-
# ValueError, UnicodeDecodeError, IndexError, etc., and all of
232-
# them mean the stream is desynchronized.
228+
# Bytes have been consumed — ANY failure here leaves the
229+
# buffer at an unknown offset and must poison the decoder.
230+
# Catch BaseException so that signal-delivered KeyboardInterrupt
231+
# (issue 045) and unexpected BaseException subclasses also
232+
# poison before propagating. `decode_body` implementations can
233+
# raise struct.error, ValueError, UnicodeDecodeError,
234+
# IndexError, etc., and all of them mean the stream is
235+
# desynchronized.
233236
try:
234237
header = Header.decode(data[:HEADER_SIZE])
235238
body = data[HEADER_SIZE : HEADER_SIZE + header.size_words * 8]
@@ -248,8 +251,15 @@ def decode_continuation(
248251
return RowsResponse.decode_rows_continuation(
249252
body, column_names, column_count, max_rows=max_rows
250253
)
251-
except Exception as e:
252-
self._buffer.poison(e)
254+
except BaseException as e:
255+
# poison() stores Exception | None; wrap non-Exception
256+
# BaseException subclasses so the poison cause is still a
257+
# real Exception we can inspect.
258+
self._buffer.poison(
259+
e
260+
if isinstance(e, Exception)
261+
else RuntimeError(f"decode_continuation interrupted: {type(e).__name__}")
262+
)
253263
raise
254264

255265
def decode(self) -> Message | None:
@@ -271,15 +281,24 @@ def decode(self) -> Message | None:
271281
if data is None:
272282
return None
273283

274-
# Bytes have been consumed. ANY parse failure now leaves the
275-
# buffer at an unknown offset; poison so subsequent calls fail
276-
# fast. Catch broadly: `decode_body` implementations can raise
277-
# struct.error, ValueError, UnicodeDecodeError, IndexError, etc.,
278-
# and all of them mean the stream is desynchronized.
284+
# Bytes have been consumed. ANY failure now leaves the buffer
285+
# at an unknown offset; poison so subsequent calls fail fast.
286+
# Catch BaseException so that signal-delivered
287+
# KeyboardInterrupt (issue 045) also poisons before
288+
# propagating. `decode_body` implementations can raise
289+
# struct.error, ValueError, UnicodeDecodeError, IndexError,
290+
# etc., and all of them mean the stream is desynchronized.
279291
try:
280292
return self.decode_bytes(data)
281-
except Exception as e:
282-
self._buffer.poison(e)
293+
except BaseException as e:
294+
# poison() stores Exception | None; wrap non-Exception
295+
# BaseException subclasses so the poison cause is still a
296+
# real Exception we can inspect.
297+
self._buffer.poison(
298+
e
299+
if isinstance(e, Exception)
300+
else RuntimeError(f"decode interrupted: {type(e).__name__}")
301+
)
283302
raise
284303

285304
def decode_bytes(self, data: bytes) -> Message:

tests/test_codec_signal_safety.py

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
"""Signal-safety tests for MessageDecoder (issue 045).
2+
3+
``MessageDecoder.decode()`` and ``decode_continuation()`` both consume
4+
bytes from the buffer via ``read_message()`` and then parse them via
5+
``decode_bytes`` / ``RowsResponse.decode_rows_continuation``. The
6+
parse step is wrapped in ``try/except Exception`` so that a
7+
``DecodeError``/``ValueError``/``struct.error`` from the parser
8+
poisons the buffer before propagating.
9+
10+
But ``except Exception`` does not catch ``KeyboardInterrupt``,
11+
``SystemExit``, or any other ``BaseException`` subclass. A signal-
12+
delivered ``KeyboardInterrupt`` landing inside the parser (after
13+
``read_message`` has already advanced ``_pos`` past the consumed
14+
bytes) propagates past the ``except`` block without poisoning. The
15+
caller catches it at the top level and retries; the retry reads the
16+
*next* message boundary, silently losing exactly one message.
17+
18+
These tests use ``sys.settrace`` to inject a ``KeyboardInterrupt`` at
19+
a specific source line inside ``decode_bytes`` and assert that the
20+
decoder is left poisoned.
21+
"""
22+
23+
from __future__ import annotations
24+
25+
import contextlib
26+
import struct
27+
import sys
28+
from types import FrameType
29+
from typing import Any
30+
31+
import pytest
32+
33+
from dqlitewire.codec import MessageDecoder
34+
from dqlitewire.constants import ResponseType
35+
from dqlitewire.exceptions import DecodeError, ProtocolError
36+
from dqlitewire.messages import DbResponse, LeaderResponse
37+
38+
_DEFAULT_EXC: BaseException = KeyboardInterrupt("injected")
39+
40+
41+
def _make_db(db_id: int) -> bytes:
42+
body = struct.pack("<II", db_id, 0)
43+
header = struct.pack("<IBBH", len(body) // 8, ResponseType.DB, 0, 0)
44+
return header + body
45+
46+
47+
def _tracer_raising_in(
48+
func_name: str,
49+
exc: BaseException = _DEFAULT_EXC,
50+
) -> Any:
51+
"""Raise ``exc`` on the first line event inside ``func_name``."""
52+
state = {"raised": False}
53+
54+
def tracer(frame: FrameType, event: str, arg: object) -> Any:
55+
if event != "line":
56+
return tracer
57+
if frame.f_code.co_name != func_name:
58+
return tracer
59+
if state["raised"]:
60+
return tracer
61+
state["raised"] = True
62+
raise exc
63+
64+
return tracer
65+
66+
67+
class TestDecodeSignalSafety:
68+
def test_keyboard_interrupt_inside_parse_leaves_decoder_poisoned(self) -> None:
69+
"""Regression for issue 045.
70+
71+
A ``KeyboardInterrupt`` delivered while ``decode_bytes`` is
72+
parsing a consumed message used to propagate without
73+
poisoning the decoder, because ``except Exception`` does not
74+
catch ``BaseException``. The retry read the next message
75+
boundary, silently dropping one message. The fix widens to
76+
``except BaseException``.
77+
"""
78+
dec = MessageDecoder(is_request=False)
79+
dec.feed(_make_db(1) + _make_db(2))
80+
81+
tracer = _tracer_raising_in("decode_bytes")
82+
83+
sys.settrace(tracer)
84+
try:
85+
with contextlib.suppress(KeyboardInterrupt):
86+
dec.decode()
87+
finally:
88+
sys.settrace(None)
89+
90+
assert dec.is_poisoned, "decode() must poison on injected KeyboardInterrupt"
91+
92+
# Retry must fail fast with ProtocolError rather than silently
93+
# returning the next message from a desynchronized stream.
94+
with pytest.raises(ProtocolError, match="poisoned"):
95+
dec.decode()
96+
97+
def test_decode_does_not_poison_on_oversized_header(self) -> None:
98+
"""Counter-test: an oversized-header DecodeError is raised
99+
BEFORE read_message advances _pos, so the bytes are still
100+
there and skip_message()/reset recovery works. The fix must
101+
not accidentally poison this path.
102+
"""
103+
dec = MessageDecoder(is_request=False)
104+
dec._buffer._max_message_size = 128
105+
# size_words = 100 → body size = 800 > 128
106+
huge = struct.pack("<IBBH", 100, 0, 0, 0)
107+
dec.feed(huge)
108+
109+
with pytest.raises(DecodeError, match="exceeds maximum"):
110+
dec.decode()
111+
112+
assert not dec.is_poisoned
113+
114+
def test_keyboard_interrupt_in_decode_continuation_poisons(self) -> None:
115+
"""Same hazard as decode(): decode_continuation() also needs
116+
BaseException handling.
117+
"""
118+
from dqlitewire.messages.responses import RowsResponse
119+
120+
# Build a minimal ROWS continuation body. We don't actually
121+
# reach the parser — the tracer injects the interrupt before
122+
# any real parsing runs — so the frame just needs to carry
123+
# some bytes past the header check.
124+
body = b"\x00" * 8
125+
header = struct.pack("<IBBH", len(body) // 8, ResponseType.ROWS, 0, 0)
126+
frame = header + body
127+
128+
dec = MessageDecoder(is_request=False)
129+
dec.feed(frame + frame)
130+
131+
# Inject inside decode_continuation itself (the wrapper
132+
# method), not the parser helper — the try: block opens there.
133+
tracer = _tracer_raising_in("decode_continuation")
134+
135+
sys.settrace(tracer)
136+
try:
137+
with contextlib.suppress(KeyboardInterrupt):
138+
dec.decode_continuation(
139+
column_names=["a"],
140+
column_count=1,
141+
max_rows=RowsResponse.DEFAULT_MAX_ROWS,
142+
)
143+
finally:
144+
sys.settrace(None)
145+
146+
# If the tracer landed after read_message consumed the first
147+
# frame, the decoder must be poisoned. If it landed before
148+
# any bytes were consumed (e.g. on the very first line), the
149+
# test still passes because no harm was done. We check both
150+
# outcomes: the decoder is either poisoned OR the buffer is
151+
# still at a valid offset.
152+
if dec.is_poisoned:
153+
with pytest.raises(ProtocolError, match="poisoned"):
154+
dec.decode_continuation(
155+
column_names=["a"],
156+
column_count=1,
157+
)
158+
else:
159+
# Buffer offset must still be sensible.
160+
assert dec._buffer.available() >= 0
161+
162+
163+
def test_leader_response_decodes_cleanly_without_interrupt() -> None:
164+
"""Sanity: the widened except must not regress the happy path."""
165+
dec = MessageDecoder(is_request=False)
166+
dec.feed(LeaderResponse(node_id=7, address="host:1234").encode())
167+
msg = dec.decode()
168+
assert isinstance(msg, LeaderResponse)
169+
assert msg.node_id == 7
170+
assert not dec.is_poisoned
171+
172+
173+
def test_db_response_decodes_cleanly_without_interrupt() -> None:
174+
"""Sanity: full loop through decode() still works."""
175+
dec = MessageDecoder(is_request=False)
176+
dec.feed(_make_db(42))
177+
msg = dec.decode()
178+
assert isinstance(msg, DbResponse)
179+
assert msg.db_id == 42

0 commit comments

Comments
 (0)