Skip to content

Commit 3eb5523

Browse files
fix: poison buffer after capped oversized-message skip
When skip_message() caps the skip to max_message_size for an oversized message, the peer actually sent more bytes than were discarded. The stream is permanently desynchronized — the remaining undiscarded bytes would be interpreted as a new message header, producing silent garbage. Now the buffer is poisoned (via deferred _poison_after_skip flag) once the capped skip completes, forcing callers to reset() and reconnect. Previously skip_message() returned True as if recovery succeeded. Closes #121 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent cce9da4 commit 3eb5523

3 files changed

Lines changed: 120 additions & 65 deletions

File tree

src/dqlitewire/buffer.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ def __init__(self, max_message_size: int = DEFAULT_MAX_MESSAGE_SIZE) -> None:
9494
self._max_message_size = max_message_size
9595
self._skip_remaining = 0
9696
self._poisoned: Exception | None = None
97+
self._poison_after_skip: Exception | None = None
9798

9899
@property
99100
def is_poisoned(self) -> bool:
@@ -117,6 +118,7 @@ def reset(self) -> None:
117118
self._pos = 0
118119
self._skip_remaining = 0
119120
self._poisoned = None
121+
self._poison_after_skip = None
120122

121123
def _check_poisoned(self) -> None:
122124
if self._poisoned is not None:
@@ -182,6 +184,12 @@ def feed(self, data: bytes) -> None:
182184
discard = min(len(data), self._skip_remaining)
183185
data = data[discard:]
184186
self._skip_remaining -= discard
187+
# Trigger deferred poison once the capped skip completes.
188+
# The stream is desynchronized — remaining peer bytes were
189+
# not discarded — so all further reads would be garbage.
190+
if self._skip_remaining == 0 and self._poison_after_skip is not None:
191+
self._poisoned = self._poison_after_skip
192+
self._poison_after_skip = None
185193
if not data:
186194
return
187195
self._maybe_compact()
@@ -358,6 +366,27 @@ def skip_message(self) -> bool:
358366
skip_now = min(effective_total, available)
359367
self._pos += skip_now
360368
self._skip_remaining = effective_total - skip_now
369+
# If the cap is smaller than the actual message, mark for
370+
# deferred poisoning. We cannot poison immediately because
371+
# feed() needs to keep discarding bytes during the skip.
372+
# Once _skip_remaining reaches 0, feed() will poison.
373+
if effective_total < total_size:
374+
if self._skip_remaining == 0:
375+
# Capped skip completed immediately — poison now.
376+
self.poison(
377+
DecodeError(
378+
f"Oversized message skip capped to "
379+
f"{effective_total} of {total_size} bytes; "
380+
f"stream is desynchronized. Call reset()."
381+
)
382+
)
383+
else:
384+
# Deferred: feed() will poison once skip completes.
385+
self._poison_after_skip = DecodeError(
386+
f"Oversized message skip capped to "
387+
f"{effective_total} of {total_size} bytes; "
388+
f"stream is desynchronized. Call reset()."
389+
)
361390
self._maybe_compact()
362391
except BaseException as e:
363392
if self._poisoned is None:

tests/test_buffer.py

Lines changed: 73 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -413,24 +413,24 @@ def test_rejects_oversized_message(self) -> None:
413413
with pytest.raises(DecodeError, match="exceeds maximum"):
414414
buf.read_message()
415415

416-
def test_skip_message_recovers_from_oversized(self) -> None:
417-
"""After skipping an oversized message, stream should not be corrupted.
416+
def test_skip_message_poisons_after_capped_oversized(self) -> None:
417+
"""After a capped oversized skip, stream is desynchronized and
418+
the buffer must be poisoned (issue 121).
418419
419420
skip_message caps _skip_remaining to max_message_size to prevent
420-
amplification attacks. The test feeds at most max_message_size bytes
421-
of oversized body, then verifies the next message decodes correctly.
421+
amplification attacks. Since the peer sent more bytes than the cap,
422+
the stream is permanently desynchronized. The buffer is poisoned
423+
once the capped skip completes.
422424
"""
423425
import struct
424426

425427
import pytest
426428

427-
from dqlitewire.exceptions import DecodeError
429+
from dqlitewire.exceptions import DecodeError, ProtocolError
428430

429431
buf = ReadBuffer(max_message_size=1024)
430432
# Header claiming a huge body: size_words=1000 (8000 bytes > 1024 limit)
431433
oversized_header = struct.pack("<IBBH", 1000, 0, 0, 0)
432-
valid_msg = LeaderResponse(node_id=1, address="node1:9001")
433-
valid_encoded = valid_msg.encode()
434434

435435
# Feed header + first chunk
436436
buf.feed(oversized_header + b"\xab" * 500)
@@ -443,25 +443,60 @@ def test_skip_message_recovers_from_oversized(self) -> None:
443443
# skip_message should return False — partial skip
444444
assert buf.skip_message() is False
445445

446-
# Feed enough bytes to complete the capped skip + valid message.
447-
# _skip_remaining is capped to max_message_size (1024), and skip_message
448-
# already consumed the 508 bytes in the buffer (8 header + 500 body),
449-
# so _skip_remaining = 1024 - 508 = 516 bytes.
450-
buf.feed(b"\xab" * 516 + valid_encoded)
446+
# Feed enough bytes to complete the capped skip.
447+
remaining = buf._skip_remaining
448+
buf.feed(b"\xab" * remaining)
451449

452-
# The capped oversized bytes should be discarded; valid message readable
453-
assert buf.has_message()
454-
data = buf.read_message()
455-
assert data is not None
456-
assert data == valid_encoded
450+
# Skip is complete but buffer is poisoned — stream is desynchronized
451+
assert not buf.is_skipping
452+
assert buf.is_poisoned
453+
454+
with pytest.raises(ProtocolError, match="poisoned"):
455+
buf.read_message()
457456

458-
def test_single_feed_completes_skip_and_appends_message(self) -> None:
459-
"""118: single feed() that completes skip AND contains next valid message."""
457+
def test_capped_oversized_skip_poisons_buffer(self) -> None:
458+
"""121: capped skip of oversized message must poison the buffer.
459+
460+
When the header claims a body larger than max_message_size, the skip
461+
is capped to max_message_size bytes. Since the peer sent more bytes
462+
than were discarded, the stream is permanently desynchronized. The
463+
buffer must be poisoned to prevent silent garbage reads.
464+
"""
460465
import struct
461466

462467
import pytest
463468

464-
from dqlitewire.exceptions import DecodeError
469+
from dqlitewire.exceptions import DecodeError, ProtocolError
470+
471+
buf = ReadBuffer(max_message_size=64)
472+
# Header claiming 200 words = 1600 bytes body (>> 64 byte limit)
473+
oversized_header = struct.pack("<IBBH", 200, 0, 0, 0)
474+
buf.feed(oversized_header)
475+
476+
assert buf.has_message() is True
477+
with pytest.raises(DecodeError, match="exceeds maximum"):
478+
buf.read_message()
479+
480+
# skip_message triggers capped skip
481+
buf.skip_message()
482+
483+
# After the capped skip completes, buffer must be poisoned
484+
remaining = buf._skip_remaining
485+
buf.feed(b"\x00" * remaining)
486+
assert not buf.is_skipping
487+
assert buf.is_poisoned
488+
489+
# Further operations should raise ProtocolError
490+
with pytest.raises(ProtocolError, match="poisoned"):
491+
buf.read_message()
492+
493+
def test_single_feed_completes_capped_skip_and_poisons(self) -> None:
494+
"""118/121: single feed() that completes capped skip poisons buffer."""
495+
import struct
496+
497+
import pytest
498+
499+
from dqlitewire.exceptions import DecodeError, ProtocolError
465500

466501
buf = ReadBuffer(max_message_size=64)
467502
# 200 words = 1600 bytes body, well over 64-byte limit
@@ -474,37 +509,34 @@ def test_single_feed_completes_skip_and_appends_message(self) -> None:
474509
assert buf.skip_message() is False
475510
assert buf.is_skipping
476511

477-
# Build a single payload: remaining skip bytes + valid message
512+
# Build a single payload: remaining skip bytes + extra data
478513
remaining = buf._skip_remaining
479-
valid_msg = LeaderRequest()
480-
valid_encoded = valid_msg.encode()
481-
combined = b"\xcc" * remaining + valid_encoded
514+
combined = b"\xcc" * remaining + b"\x00" * 16
482515

483516
buf.feed(combined)
484517

485518
assert not buf.is_skipping
486-
assert buf.has_message()
487-
data = buf.read_message()
488-
assert data is not None
489-
assert data == valid_encoded
519+
assert buf.is_poisoned
520+
521+
with pytest.raises(ProtocolError, match="poisoned"):
522+
buf.read_message()
490523

491-
def test_skip_oversized_across_multiple_feeds(self) -> None:
492-
"""Oversized message bytes should be discarded across multiple feed() calls.
524+
def test_skip_oversized_across_multiple_feeds_poisons(self) -> None:
525+
"""Oversized skip across multiple feeds poisons when complete (issue 121).
493526
494527
skip_message caps _skip_remaining to max_message_size, so only that
495-
many bytes are discarded (not the full claimed body size).
528+
many bytes are discarded. Once the capped skip completes, the buffer
529+
is poisoned because the stream is desynchronized.
496530
"""
497531
import struct
498532

499533
import pytest
500534

501-
from dqlitewire.exceptions import DecodeError
535+
from dqlitewire.exceptions import DecodeError, ProtocolError
502536

503537
buf = ReadBuffer(max_message_size=64)
504538
# 200 words = 1600 bytes body, well over 64-byte limit
505539
oversized_header = struct.pack("<IBBH", 200, 0, 0, 0)
506-
valid_msg = LeaderRequest()
507-
valid_encoded = valid_msg.encode()
508540

509541
# Feed just the header
510542
buf.feed(oversized_header)
@@ -513,20 +545,20 @@ def test_skip_oversized_across_multiple_feeds(self) -> None:
513545
buf.read_message()
514546
assert buf.skip_message() is False
515547

516-
# Feed capped body in chunks (max_message_size=64, header was 8 bytes,
517-
# so _skip_remaining = 64 - 8 = 56 bytes)
548+
# Feed capped body in chunks
518549
remaining = buf._skip_remaining
519550
body = b"\xcc" * remaining
520551
while body:
521552
chunk = body[:20]
522553
body = body[20:]
523554
buf.feed(chunk)
524555

525-
# Now feed the valid message
526-
buf.feed(valid_encoded)
527-
assert buf.has_message()
528-
data = buf.read_message()
529-
assert data == valid_encoded
556+
# Capped skip is complete — buffer should be poisoned
557+
assert not buf.is_skipping
558+
assert buf.is_poisoned
559+
560+
with pytest.raises(ProtocolError, match="poisoned"):
561+
buf.feed(b"\x00" * 16)
530562

531563
def test_is_skipping_property(self) -> None:
532564
"""is_skipping reflects whether an oversized skip is in progress."""

tests/test_codec.py

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1243,44 +1243,40 @@ def test_skip_message_exists_on_decoder(self) -> None:
12431243
assert hasattr(decoder, "skip_message")
12441244
assert hasattr(decoder, "is_skipping")
12451245

1246-
def test_skip_oversized_and_recover(self) -> None:
1247-
"""After skipping an oversized message, normal messages should decode."""
1246+
def test_skip_oversized_poisons_after_capped_skip(self) -> None:
1247+
"""After a capped oversized skip, buffer is poisoned (issue 121)."""
12481248
import struct
12491249

1250-
from dqlitewire.exceptions import DecodeError
1250+
from dqlitewire.exceptions import DecodeError, ProtocolError
12511251

12521252
# Create a decoder with a small max_message_size
12531253
decoder = MessageDecoder(is_request=False)
12541254
decoder._buffer._max_message_size = 128
12551255

12561256
# Build an oversized message header (size_words=100 → 808 bytes total)
12571257
oversized_header = struct.pack("<IBBH", 100, 0, 0, 0)
1258-
# Build a normal message after it
1259-
normal_msg = FailureResponse(code=42, message="test").encode()
12601258

12611259
# Feed just the oversized header
12621260
decoder.feed(oversized_header)
12631261

1264-
# has_message() is total; the raise surfaces from decode() / read_message().
12651262
assert decoder.has_message() is True
12661263
with pytest.raises(DecodeError, match="exceeds maximum"):
12671264
decoder.decode()
12681265

12691266
# skip_message() should handle the oversized message
12701267
result = decoder.skip_message()
1271-
assert result is False # haven't received full oversized body yet
1268+
assert result is False
12721269
assert decoder.is_skipping is True
12731270

1274-
# Feed enough bytes to complete the capped skip + normal message.
1275-
# _skip_remaining is capped to max_message_size (128), header was 8,
1276-
# so _skip_remaining = 128 - 8 = 120 bytes.
1277-
decoder.feed(b"\x00" * decoder._buffer._skip_remaining + normal_msg)
1271+
# Feed enough bytes to complete the capped skip
1272+
decoder.feed(b"\x00" * decoder._buffer._skip_remaining)
12781273

1279-
# Now should be able to decode the normal message
1274+
# Buffer is now poisoned — stream is desynchronized
12801275
assert decoder.is_skipping is False
1281-
decoded = decoder.decode()
1282-
assert isinstance(decoded, FailureResponse)
1283-
assert decoded.code == 42
1276+
assert decoder.is_poisoned is True
1277+
1278+
with pytest.raises(ProtocolError, match="poisoned"):
1279+
decoder.decode()
12841280

12851281

12861282
class TestDecoderPoisonedState:
@@ -1348,10 +1344,13 @@ def test_reset_unpoisons_decoder(self) -> None:
13481344
with pytest.raises(ProtocolError):
13491345
decoder.decode()
13501346

1351-
def test_oversized_header_does_not_poison(self) -> None:
1347+
def test_oversized_header_does_not_poison_before_skip(self) -> None:
13521348
"""An oversized-header error from the buffer framing layer is
13531349
recoverable via skip_message(); it must NOT poison because the
13541350
bytes have not yet been consumed from the buffer.
1351+
1352+
After a capped skip completes, the buffer IS poisoned because the
1353+
stream is desynchronized (issue 121).
13551354
"""
13561355
import struct
13571356

@@ -1362,18 +1361,13 @@ def test_oversized_header_does_not_poison(self) -> None:
13621361

13631362
with pytest.raises(DecodeError, match="exceeds maximum"):
13641363
decoder.decode()
1365-
# NOT poisoned — skip_message() is the documented recovery path.
1364+
# NOT poisoned before skip — skip_message() is the recovery path.
13661365
assert decoder.is_poisoned is False
13671366

1368-
# Recovery via skip_message should still work.
1367+
# After capped skip completes, buffer IS poisoned (desync).
13691368
decoder.skip_message()
13701369
decoder.feed(b"\x00" * decoder._buffer._skip_remaining)
1371-
assert decoder.is_poisoned is False
1372-
1373-
# And a fresh message decodes afterwards.
1374-
normal = FailureResponse(code=1, message="ok").encode()
1375-
decoder.feed(normal)
1376-
assert isinstance(decoder.decode(), FailureResponse)
1370+
assert decoder.is_poisoned is True
13771371

13781372
def test_poison_catches_non_protocol_exceptions(self) -> None:
13791373
"""Any exception from decode_bytes — not just DecodeError/ProtocolError —

0 commit comments

Comments
 (0)