Skip to content

Commit 68b3c09

Browse files
fix: wrap read_message/skip_message/read_bytes in BaseException poison guard
All three consume methods advance `_pos` then CALL `_maybe_compact()`. The CALL is a CPython eval-breaker checkpoint — a pending signal (KeyboardInterrupt, PyThreadState_SetAsyncExc, CancelledError) is delivered there before _maybe_compact's body runs. If the signal fires at that boundary, `_pos` has advanced (bytes consumed from the buffer) but the return value is lost with the unwinding frame, and no poison fires because _maybe_compact's own try/except never executes. In a request-response protocol, silently dropping one response causes the caller to receive the next response as the answer to the current request — application-layer data corruption. Wrap each method's mutation block in `try/except BaseException: poison; raise`, matching the template from issues 037 (_maybe_compact) and 048 (feed). The DecodeError for oversized headers stays outside the try block to preserve the non-poisoning recoverable-via-skip_message contract. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 1ac48ec commit 68b3c09

2 files changed

Lines changed: 134 additions & 24 deletions

File tree

src/dqlitewire/buffer.py

Lines changed: 49 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -316,9 +316,17 @@ def read_message(self) -> bytes | None:
316316
if available < total_size:
317317
return None
318318

319-
message = bytes(self._data[self._pos : self._pos + total_size])
320-
self._pos += total_size
321-
self._maybe_compact()
319+
try:
320+
message = bytes(self._data[self._pos : self._pos + total_size])
321+
self._pos += total_size
322+
self._maybe_compact()
323+
except BaseException as e:
324+
if self._poisoned is None:
325+
if isinstance(e, Exception):
326+
self._poisoned = e
327+
else:
328+
self._poisoned = RuntimeError(f"read_message interrupted: {type(e).__name__}")
329+
raise
322330

323331
return message
324332

@@ -362,24 +370,33 @@ def skip_message(self) -> bool:
362370
raise err
363371
total_size = HEADER_SIZE + (size_words * WORD_SIZE)
364372

365-
if total_size <= self._max_message_size:
366-
# Normal-sized message: only skip when complete to avoid
367-
# stream corruption from partially consumed messages.
368-
if available < total_size:
369-
return False
370-
self._pos += total_size
371-
else:
372-
# Oversized message: discard what we have and track remaining
373-
# bytes to be discarded in subsequent feed() calls.
374-
# Cap to max_message_size to prevent amplification attacks where
375-
# an 8-byte header claiming a ~32 GiB body would cause that much
376-
# legitimate data to be silently discarded.
377-
effective_total = min(total_size, self._max_message_size)
378-
skip_now = min(effective_total, available)
379-
self._pos += skip_now
380-
self._skip_remaining = effective_total - skip_now
381-
382-
self._maybe_compact()
373+
# Normal-sized message: only skip when complete to avoid
374+
# stream corruption from partially consumed messages.
375+
if total_size <= self._max_message_size and available < total_size:
376+
return False
377+
378+
try:
379+
if total_size <= self._max_message_size:
380+
self._pos += total_size
381+
else:
382+
# Oversized message: discard what we have and track remaining
383+
# bytes to be discarded in subsequent feed() calls.
384+
# Cap to max_message_size to prevent amplification attacks
385+
# where an 8-byte header claiming a ~32 GiB body would cause
386+
# that much legitimate data to be silently discarded.
387+
effective_total = min(total_size, self._max_message_size)
388+
skip_now = min(effective_total, available)
389+
self._pos += skip_now
390+
self._skip_remaining = effective_total - skip_now
391+
self._maybe_compact()
392+
except BaseException as e:
393+
if self._poisoned is None:
394+
if isinstance(e, Exception):
395+
self._poisoned = e
396+
else:
397+
self._poisoned = RuntimeError(f"skip_message interrupted: {type(e).__name__}")
398+
raise
399+
383400
return self._skip_remaining == 0
384401

385402
def read_bytes(self, n: int) -> bytes | None:
@@ -393,9 +410,17 @@ def read_bytes(self, n: int) -> bytes | None:
393410
if available < n:
394411
return None
395412

396-
data = bytes(self._data[self._pos : self._pos + n])
397-
self._pos += n
398-
self._maybe_compact()
413+
try:
414+
data = bytes(self._data[self._pos : self._pos + n])
415+
self._pos += n
416+
self._maybe_compact()
417+
except BaseException as e:
418+
if self._poisoned is None:
419+
if isinstance(e, Exception):
420+
self._poisoned = e
421+
else:
422+
self._poisoned = RuntimeError(f"read_bytes interrupted: {type(e).__name__}")
423+
raise
399424
return data
400425

401426
def peek_bytes(self, n: int) -> bytes | None:

tests/test_buffer_signal_safety.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,3 +429,88 @@ def test_wire_legal_max_size_still_routes_through_oversized_path(self) -> None:
429429
"oversized-but-wire-legal DecodeError must remain non-poisoning "
430430
"to preserve the skip_message() recovery contract"
431431
)
432+
433+
434+
class TestConsumeMethodSignalSafety:
435+
"""Regression tests for issue 061.
436+
437+
``read_message()``, ``skip_message()``, and ``read_bytes()`` each
438+
advance ``_pos`` and then call ``_maybe_compact()``. The CALL to
439+
``_maybe_compact`` is a CPython eval-breaker check point — a pending
440+
signal is delivered there BEFORE the function body runs. If the
441+
signal fires at that boundary, ``_pos`` has advanced (bytes consumed)
442+
but ``_maybe_compact``'s own try/except never executes, so no poison
443+
fires. The return value is lost with the unwinding frame.
444+
445+
The fix wraps the mutation block in ``try/except BaseException``
446+
matching the template from issues 037 (``_maybe_compact``) and
447+
048 (``feed``).
448+
"""
449+
450+
def test_torn_read_message_leaves_buffer_poisoned(self) -> None:
451+
buf = ReadBuffer()
452+
buf.feed(b"\x01\x00\x00\x00\x00\x00\x00\x00" + b"\x00" * 8)
453+
454+
tracer = _raise_on_source_match("read_message", "self._maybe_compact()")
455+
456+
sys.settrace(tracer)
457+
try:
458+
with contextlib.suppress(KeyboardInterrupt):
459+
buf.read_message()
460+
finally:
461+
sys.settrace(None)
462+
463+
assert buf.is_poisoned, (
464+
"read_message must poison when a BaseException escapes its mutation block"
465+
)
466+
with pytest.raises(ProtocolError, match="poisoned"):
467+
buf.read_message()
468+
469+
def test_torn_read_bytes_leaves_buffer_poisoned(self) -> None:
470+
buf = ReadBuffer()
471+
buf.feed(b"\x00" * 32)
472+
473+
tracer = _raise_on_source_match("read_bytes", "self._maybe_compact()")
474+
475+
sys.settrace(tracer)
476+
try:
477+
with contextlib.suppress(KeyboardInterrupt):
478+
buf.read_bytes(8)
479+
finally:
480+
sys.settrace(None)
481+
482+
assert buf.is_poisoned, (
483+
"read_bytes must poison when a BaseException escapes its mutation block"
484+
)
485+
486+
def test_torn_skip_message_leaves_buffer_poisoned(self) -> None:
487+
buf = ReadBuffer()
488+
buf.feed(b"\x01\x00\x00\x00\x00\x00\x00\x00" + b"\x00" * 8)
489+
490+
tracer = _raise_on_source_match("skip_message", "self._maybe_compact()")
491+
492+
sys.settrace(tracer)
493+
try:
494+
with contextlib.suppress(KeyboardInterrupt):
495+
buf.skip_message()
496+
finally:
497+
sys.settrace(None)
498+
499+
assert buf.is_poisoned, (
500+
"skip_message must poison when a BaseException escapes its mutation block"
501+
)
502+
503+
def test_happy_paths_still_work(self) -> None:
504+
"""Sanity: without any interrupt, all three methods work normally."""
505+
buf = ReadBuffer()
506+
buf.feed(b"\x01\x00\x00\x00\x00\x00\x00\x00" + b"\x00" * 8)
507+
msg = buf.read_message()
508+
assert msg is not None and not buf.is_poisoned
509+
510+
buf.feed(b"\x00" * 32)
511+
data = buf.read_bytes(8)
512+
assert data is not None and len(data) == 8
513+
514+
buf.feed(b"\x01\x00\x00\x00\x00\x00\x00\x00" + b"\x00" * 8)
515+
skipped = buf.skip_message()
516+
assert skipped is True and not buf.is_poisoned

0 commit comments

Comments
 (0)