@@ -1803,3 +1803,95 @@ def test_oversized_continuation_frame_clears_flag_and_poisons(self) -> None:
18031803
18041804 with pytest .raises (ProtocolError , match = "poisoned" ):
18051805 decoder .decode ()
1806+
1807+
1808+ class TestEndToEndPipeline :
1809+ """End-to-end tests exercising the full MessageEncoder + MessageDecoder pipeline."""
1810+
1811+ def test_handshake_then_request_response_roundtrip (self ) -> None :
1812+ """Full stateful pipeline: handshake → request → response."""
1813+ # Client side
1814+ encoder = MessageEncoder ()
1815+ client_decoder = MessageDecoder (is_request = False )
1816+
1817+ # Server side
1818+ server_decoder = MessageDecoder (is_request = True )
1819+
1820+ # Step 1: Client sends handshake
1821+ handshake_bytes = encoder .encode_handshake ()
1822+
1823+ # Step 2: Server receives and decodes handshake
1824+ server_decoder .feed (handshake_bytes )
1825+ version = server_decoder .decode_handshake ()
1826+ assert version == PROTOCOL_VERSION
1827+
1828+ # Step 3: Client sends a LeaderRequest
1829+ request = LeaderRequest ()
1830+ request_bytes = encoder .encode (request )
1831+
1832+ # Step 4: Server receives and decodes request
1833+ server_decoder .feed (request_bytes )
1834+ assert server_decoder .has_message ()
1835+ decoded_request = server_decoder .decode ()
1836+ assert isinstance (decoded_request , LeaderRequest )
1837+
1838+ # Step 5: Server sends a LeaderResponse
1839+ response = LeaderResponse (node_id = 1 , address = "127.0.0.1:9001" )
1840+ response_bytes = response .encode ()
1841+
1842+ # Step 6: Client receives and decodes response
1843+ client_decoder .feed (response_bytes )
1844+ assert client_decoder .has_message ()
1845+ decoded_response = client_decoder .decode ()
1846+ assert isinstance (decoded_response , LeaderResponse )
1847+ assert decoded_response .node_id == 1
1848+ assert decoded_response .address == "127.0.0.1:9001"
1849+
1850+ def test_multiple_messages_single_feed (self ) -> None :
1851+ """Concatenate multiple encoded messages, feed all at once, decode sequentially."""
1852+ decoder = MessageDecoder (is_request = False )
1853+
1854+ msg1 = WelcomeResponse (heartbeat_timeout = 15000000000 )
1855+ msg2 = DbResponse (db_id = 0 )
1856+ msg3 = StmtResponse (db_id = 0 , stmt_id = 1 , num_params = 2 )
1857+
1858+ # Concatenate all bytes and feed in one call
1859+ all_bytes = msg1 .encode () + msg2 .encode () + msg3 .encode ()
1860+ decoder .feed (all_bytes )
1861+
1862+ decoded1 = decoder .decode ()
1863+ assert isinstance (decoded1 , WelcomeResponse )
1864+ assert decoded1 .heartbeat_timeout == 15000000000
1865+
1866+ decoded2 = decoder .decode ()
1867+ assert isinstance (decoded2 , DbResponse )
1868+ assert decoded2 .db_id == 0
1869+
1870+ decoded3 = decoder .decode ()
1871+ assert isinstance (decoded3 , StmtResponse )
1872+ assert decoded3 .db_id == 0
1873+ assert decoded3 .stmt_id == 1
1874+ assert decoded3 .num_params == 2
1875+
1876+ # No more messages
1877+ assert not decoder .has_message ()
1878+
1879+ def test_partial_feed_chunked (self ) -> None :
1880+ """Split encoded message at arbitrary points, feed in chunks."""
1881+ decoder = MessageDecoder (is_request = False )
1882+ msg = ResultResponse (last_insert_id = 42 , rows_affected = 7 )
1883+ encoded = msg .encode ()
1884+
1885+ # Feed one byte at a time
1886+ for i in range (len (encoded ) - 1 ):
1887+ decoder .feed (encoded [i : i + 1 ])
1888+ assert not decoder .has_message (), f"should not be complete at byte { i } "
1889+
1890+ # Feed the last byte — now the message should be complete
1891+ decoder .feed (encoded [- 1 :])
1892+ assert decoder .has_message ()
1893+
1894+ decoded = decoder .decode ()
1895+ assert isinstance (decoded , ResultResponse )
1896+ assert decoded .last_insert_id == 42
1897+ assert decoded .rows_affected == 7
0 commit comments