@@ -158,7 +158,12 @@ def test_rejects_oversized_message(self) -> None:
158158 buf .has_message ()
159159
160160 def test_skip_message_recovers_from_oversized (self ) -> None :
161- """After skipping an oversized message, stream should not be corrupted."""
161+ """After skipping an oversized message, stream should not be corrupted.
162+
163+ skip_message caps _skip_remaining to max_message_size to prevent
164+ amplification attacks. The test feeds at most max_message_size bytes
165+ of oversized body, then verifies the next message decodes correctly.
166+ """
162167 import struct
163168
164169 import pytest
@@ -168,32 +173,37 @@ def test_skip_message_recovers_from_oversized(self) -> None:
168173 buf = ReadBuffer (max_message_size = 1024 )
169174 # Header claiming a huge body: size_words=1000 (8000 bytes > 1024 limit)
170175 oversized_header = struct .pack ("<IBBH" , 1000 , 0 , 0 , 0 )
171- # Build oversized body + valid message after it
172- oversized_body = b"\xab " * (1000 * 8 )
173176 valid_msg = LeaderResponse (node_id = 1 , address = "node1:9001" )
174177 valid_encoded = valid_msg .encode ()
175178
176- # Feed header + first chunk of oversized body
177- buf .feed (oversized_header + oversized_body [: 500 ] )
179+ # Feed header + first chunk
180+ buf .feed (oversized_header + b" \xab " * 500 )
178181
179182 # has_message should raise for oversized
180183 with pytest .raises (DecodeError , match = "exceeds maximum" ):
181184 buf .has_message ()
182185
183- # skip_message should return False — partial skip, not done yet
186+ # skip_message should return False — partial skip
184187 assert buf .skip_message () is False
185188
186- # Feed the rest of the oversized body + the valid message
187- buf .feed (oversized_body [500 :] + valid_encoded )
189+ # Feed enough bytes to complete the capped skip + valid message.
190+ # _skip_remaining is capped to max_message_size (1024), and skip_message
191+ # already consumed the 508 bytes in the buffer (8 header + 500 body),
192+ # so _skip_remaining = 1024 - 508 = 516 bytes.
193+ buf .feed (b"\xab " * 516 + valid_encoded )
188194
189- # The oversized bytes should be discarded; valid message should be readable
195+ # The capped oversized bytes should be discarded; valid message readable
190196 assert buf .has_message ()
191197 data = buf .read_message ()
192198 assert data is not None
193199 assert data == valid_encoded
194200
195201 def test_skip_oversized_across_multiple_feeds (self ) -> None :
196- """Oversized message bytes should be discarded across multiple feed() calls."""
202+ """Oversized message bytes should be discarded across multiple feed() calls.
203+
204+ skip_message caps _skip_remaining to max_message_size, so only that
205+ many bytes are discarded (not the full claimed body size).
206+ """
197207 import struct
198208
199209 import pytest
@@ -203,7 +213,6 @@ def test_skip_oversized_across_multiple_feeds(self) -> None:
203213 buf = ReadBuffer (max_message_size = 64 )
204214 # 200 words = 1600 bytes body, well over 64-byte limit
205215 oversized_header = struct .pack ("<IBBH" , 200 , 0 , 0 , 0 )
206- oversized_body = b"\xcc " * (200 * 8 )
207216 valid_msg = LeaderRequest ()
208217 valid_encoded = valid_msg .encode ()
209218
@@ -213,11 +222,13 @@ def test_skip_oversized_across_multiple_feeds(self) -> None:
213222 buf .has_message ()
214223 assert buf .skip_message () is False
215224
216- # Feed body in chunks
217- body = oversized_body
225+ # Feed capped body in chunks (max_message_size=64, header was 8 bytes,
226+ # so _skip_remaining = 64 - 8 = 56 bytes)
227+ remaining = buf ._skip_remaining
228+ body = b"\xcc " * remaining
218229 while body :
219- chunk = body [:50 ]
220- body = body [50 :]
230+ chunk = body [:20 ]
231+ body = body [20 :]
221232 buf .feed (chunk )
222233
223234 # Now feed the valid message
@@ -247,8 +258,9 @@ def test_is_skipping_property(self) -> None:
247258 assert buf .skip_message () is False
248259 assert buf .is_skipping is True
249260
250- # Feed remaining bytes to complete the skip
251- buf .feed (b"\x00 " * (200 * 8 ))
261+ # Feed enough bytes to complete the capped skip
262+ # (max_message_size=64, header was 8, so _skip_remaining = 64 - 8 = 56)
263+ buf .feed (b"\x00 " * buf ._skip_remaining )
252264 assert buf .is_skipping is False
253265
254266 def test_clear_resets_skip_state (self ) -> None :
@@ -352,6 +364,25 @@ def test_skip_message_waits_for_complete_normal_sized_message(self) -> None:
352364 # Buffer position should not have changed
353365 assert buf .available () == 24 # 8 header + 16 partial body
354366
367+ def test_skip_message_caps_remaining_to_max_message_size (self ) -> None :
368+ """skip_message should not set _skip_remaining beyond max_message_size.
369+
370+ A malicious header with size_words=0xFFFFFFFF claims a ~32 GiB body.
371+ Without capping, _skip_remaining would be ~32 GiB, causing all
372+ subsequent feed() calls to silently discard data for an extremely
373+ long time (8-byte header → 32 GiB data loss amplification).
374+ """
375+ import struct
376+
377+ buf = ReadBuffer (max_message_size = 1024 )
378+ # Craft header claiming ~32 GiB body
379+ header = struct .pack ("<IBBH" , 0xFFFFFFFF , 0 , 0 , 0 )
380+ buf .feed (header )
381+
382+ buf .skip_message ()
383+ # _skip_remaining should be capped to max_message_size, not ~32 GiB
384+ assert buf ._skip_remaining <= buf ._max_message_size
385+
355386 def test_feed_compacts_consumed_data (self ) -> None :
356387 """feed() should compact consumed data before extending the buffer.
357388
0 commit comments