|
| 1 | +"""Integration test for multi-frame rows continuation (ISSUE-66). |
| 2 | +
|
| 3 | +The dqlite server batches query results into frames that fit in its |
| 4 | +per-response buffer. For result sets larger than one frame's worth of |
| 5 | +rows, the server sends an initial ``ROWS`` response with |
| 6 | +``has_more=True`` followed by one or more continuation frames ending |
| 7 | +with a final frame that sets ``has_more=False``. |
| 8 | +
|
| 9 | +Previously only a Python-round-trip test exercised this path |
| 10 | +(``test_decode_continuation_roundtrip``). This integration test |
| 11 | +queries a real dqlite server for a result set large enough to cross |
| 12 | +the continuation boundary, exercising the full server→wire→ |
| 13 | +``_drain_continuations``→client path end-to-end. |
| 14 | +""" |
| 15 | + |
| 16 | +from __future__ import annotations |
| 17 | + |
| 18 | +from unittest.mock import patch |
| 19 | + |
| 20 | +import pytest |
| 21 | + |
| 22 | +from dqliteclient import connect |
| 23 | + |
| 24 | + |
| 25 | +@pytest.mark.integration |
| 26 | +class TestContinuationFrames: |
| 27 | + async def test_large_result_set_crosses_continuation_boundary( |
| 28 | + self, cluster_address: str |
| 29 | + ) -> None: |
| 30 | + """Select a result set large enough that the server emits at |
| 31 | + least one continuation frame. Verify row reassembly is exact. |
| 32 | +
|
| 33 | + Row size: 8-byte INTEGER + ~50-byte TEXT per row ≈ 60 bytes |
| 34 | + plus tuple framing. The server's per-frame buffer is on the |
| 35 | + order of tens of KiB, so a few thousand rows guarantees at |
| 36 | + least one continuation. |
| 37 | + """ |
| 38 | + N_ROWS = 5000 |
| 39 | + async with await connect(cluster_address) as conn: |
| 40 | + await conn.execute("DROP TABLE IF EXISTS test_continuation") |
| 41 | + await conn.execute("CREATE TABLE test_continuation (id INTEGER PRIMARY KEY, val TEXT)") |
| 42 | + |
| 43 | + # Batch inserts into a single transaction for speed. |
| 44 | + async with conn.transaction(): |
| 45 | + # SQLite has a default compile-time limit of 999 |
| 46 | + # parameters per statement; chunk to stay under. |
| 47 | + batch = 500 |
| 48 | + for start in range(0, N_ROWS, batch): |
| 49 | + values = [] |
| 50 | + params: list[object] = [] |
| 51 | + for i in range(start, min(start + batch, N_ROWS)): |
| 52 | + values.append("(?, ?)") |
| 53 | + params.extend([i, f"row-{i:06d}-padding-padding"]) |
| 54 | + await conn.execute( |
| 55 | + "INSERT INTO test_continuation (id, val) VALUES " + ",".join(values), |
| 56 | + params, |
| 57 | + ) |
| 58 | + |
| 59 | + rows = await conn.fetchall("SELECT id, val FROM test_continuation ORDER BY id") |
| 60 | + assert len(rows) == N_ROWS |
| 61 | + assert rows[0][0] == 0 |
| 62 | + assert rows[0][1] == "row-000000-padding-padding" |
| 63 | + assert rows[-1][0] == N_ROWS - 1 |
| 64 | + assert rows[-1][1] == f"row-{N_ROWS - 1:06d}-padding-padding" |
| 65 | + |
| 66 | + # Spot-check a middle row. |
| 67 | + mid = N_ROWS // 2 |
| 68 | + assert rows[mid][0] == mid |
| 69 | + |
| 70 | + async def test_continuation_boundary_actually_crossed(self, cluster_address: str) -> None: |
| 71 | + """Confirm the server actually emitted at least one |
| 72 | + continuation frame — a smaller result set that still fits in |
| 73 | + one frame would give a false sense of coverage. Instrument |
| 74 | + ``_drain_continuations`` and assert ``frames > 1``. |
| 75 | + """ |
| 76 | + from dqliteclient.protocol import DqliteProtocol |
| 77 | + |
| 78 | + N_ROWS = 5000 |
| 79 | + frames_seen: list[int] = [] |
| 80 | + |
| 81 | + async def _spy(self, initial, deadline): # type: ignore[no-untyped-def] |
| 82 | + # Copied structurally from the real method but with a |
| 83 | + # frames counter escape hatch. Keeps the original on the |
| 84 | + # class intact for other tests. |
| 85 | + response = initial |
| 86 | + all_rows = list(initial.rows) |
| 87 | + frames = 1 |
| 88 | + while response.has_more: |
| 89 | + next_response = await self._read_continuation(deadline=deadline) |
| 90 | + frames += 1 |
| 91 | + if not next_response.rows and next_response.has_more: |
| 92 | + break # defer to original's error handling |
| 93 | + all_rows.extend(next_response.rows) |
| 94 | + response = next_response |
| 95 | + frames_seen.append(frames) |
| 96 | + return all_rows |
| 97 | + |
| 98 | + async with await connect(cluster_address) as conn: |
| 99 | + await conn.execute("DROP TABLE IF EXISTS test_cont_spy") |
| 100 | + await conn.execute("CREATE TABLE test_cont_spy (id INTEGER PRIMARY KEY, val TEXT)") |
| 101 | + async with conn.transaction(): |
| 102 | + batch = 500 |
| 103 | + for start in range(0, N_ROWS, batch): |
| 104 | + values = [] |
| 105 | + params: list[object] = [] |
| 106 | + for i in range(start, min(start + batch, N_ROWS)): |
| 107 | + values.append("(?, ?)") |
| 108 | + params.extend([i, f"row-{i:06d}-pad"]) |
| 109 | + await conn.execute( |
| 110 | + "INSERT INTO test_cont_spy (id, val) VALUES " + ",".join(values), |
| 111 | + params, |
| 112 | + ) |
| 113 | + |
| 114 | + with patch.object(DqliteProtocol, "_drain_continuations", _spy): |
| 115 | + rows = await conn.fetchall("SELECT id, val FROM test_cont_spy") |
| 116 | + |
| 117 | + assert len(rows) == N_ROWS |
| 118 | + assert frames_seen, "spy never recorded a drain" |
| 119 | + assert any(f > 1 for f in frames_seen), ( |
| 120 | + f"Server emitted only single-frame responses for {N_ROWS} rows; " |
| 121 | + f"frames_seen={frames_seen}. Test setup must produce a result " |
| 122 | + f"set large enough to cross the continuation boundary — raise " |
| 123 | + f"N_ROWS until frames > 1." |
| 124 | + ) |
0 commit comments