Skip to content

Commit 9c85405

Browse files
Fix skip_message to not corrupt stream on partial normal-sized messages
skip_message previously advanced past whatever bytes were available, even for incomplete normal-sized messages. This corrupted the stream when remaining bytes arrived later. Now only skips complete messages for normal sizes, reserving partial skip for oversized recovery. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent ee7e44b commit 9c85405

2 files changed

Lines changed: 36 additions & 8 deletions

File tree

src/dqlitewire/buffer.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,13 @@ def read_message(self) -> bytes | None:
107107
def skip_message(self) -> bool:
108108
"""Skip the current message in the buffer.
109109
110-
Reads the message size from the header and advances past however many
111-
bytes are available (up to the full message size). Useful for recovering
112-
after an oversized or malformed message that has_message() rejects.
110+
For normal-sized messages (within max_message_size), waits until the
111+
full message is available before skipping. For oversized messages that
112+
exceed max_message_size, skips whatever bytes are available — this is
113+
the recovery path for messages that has_message() rejects.
113114
114-
Returns True if a message header was found and skipped, False if
115-
not enough data for a header.
115+
Returns True if a message was fully skipped, False if not enough data
116+
for a header or if a normal-sized message is still incomplete.
116117
"""
117118
available = len(self._data) - self._pos
118119
if available < HEADER_SIZE:
@@ -121,9 +122,17 @@ def skip_message(self) -> bool:
121122
size_words = int.from_bytes(self._data[self._pos : self._pos + 4], "little")
122123
total_size = HEADER_SIZE + (size_words * WORD_SIZE)
123124

124-
# Skip whatever is available (might be less than total_size for partial messages)
125-
skip = min(total_size, available)
126-
self._pos += skip
125+
if total_size <= self._max_message_size:
126+
# Normal-sized message: only skip when complete to avoid
127+
# stream corruption from partially consumed messages.
128+
if available < total_size:
129+
return False
130+
self._pos += total_size
131+
else:
132+
# Oversized message (recovery path): skip whatever is available
133+
# since we'll never buffer the full message anyway.
134+
self._pos += min(total_size, available)
135+
127136
self._maybe_compact()
128137
return True
129138

tests/test_buffer.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,25 @@ def test_skip_message_allows_oversized_for_recovery(self) -> None:
253253
# skip_message should succeed (it's the recovery tool for oversized messages)
254254
assert buf.skip_message() is True
255255

256+
def test_skip_message_waits_for_complete_normal_sized_message(self) -> None:
257+
"""skip_message should return False for incomplete normal-sized messages.
258+
259+
If a message fits within max_message_size but hasn't fully arrived,
260+
skip_message must not advance past partial data — doing so would
261+
corrupt the stream when the remaining bytes arrive later.
262+
"""
263+
import struct
264+
265+
buf = ReadBuffer(max_message_size=4096)
266+
# Header claiming 5 words (40 bytes body), total 48 bytes — fits in limit
267+
header = struct.pack("<IBBH", 5, 0, 0, 0)
268+
# Feed only header + 16 bytes of body (incomplete: need 40)
269+
buf.feed(header + b"\x00" * 16)
270+
# Should return False since message is incomplete but not oversized
271+
assert buf.skip_message() is False
272+
# Buffer position should not have changed
273+
assert buf.available() == 24 # 8 header + 16 partial body
274+
256275
def test_buffer_compaction(self) -> None:
257276
buf = ReadBuffer()
258277
# Feed a lot of small messages to trigger compaction

0 commit comments

Comments
 (0)