|
| 1 | +"""Pin: a transport reset (peer FIN / RST) mid-INSERT surfaces as a |
| 2 | +clear error to the caller and invalidates the protocol — no silent |
| 3 | +retry, no half-applied row, no cursor-reuse hazard. |
| 4 | +
|
| 5 | +dqlite is a clustered DB; a leader flip mid-INSERT can drop the |
| 6 | +server-side connection at any byte. The contract we need: |
| 7 | +
|
| 8 | +* The streaming send raises ``DqliteConnectionError`` (or a wrapped |
| 9 | + PEP-249 ``OperationalError`` higher up). |
| 10 | +* The protocol's ``is_wire_coherent`` flips to False so the pool |
| 11 | + drops the connection on release. |
| 12 | +* A subsequent ``exec_sql`` on the same protocol instance does not |
| 13 | + silently re-attempt against a dead transport. |
| 14 | +
|
| 15 | +This is a unit-mock pin; the integration cluster's leader-flip |
| 16 | +fixture is gated separately (see test_pool_concurrent_tx_leader_flip). |
| 17 | +""" |
| 18 | + |
| 19 | +from __future__ import annotations |
| 20 | + |
| 21 | +from unittest.mock import AsyncMock, MagicMock |
| 22 | + |
| 23 | +import pytest |
| 24 | + |
| 25 | +from dqliteclient.exceptions import DqliteConnectionError |
| 26 | +from dqliteclient.protocol import DqliteProtocol |
| 27 | + |
| 28 | + |
| 29 | +def _protocol_with_failing_drain(error: Exception) -> DqliteProtocol: |
| 30 | + """Build a DqliteProtocol whose drain() raises ``error`` on first |
| 31 | + call (simulating a peer reset / RST mid-write).""" |
| 32 | + reader = MagicMock() |
| 33 | + writer = MagicMock() |
| 34 | + writer.write = MagicMock() |
| 35 | + writer.drain = AsyncMock(side_effect=error) |
| 36 | + proto = DqliteProtocol(reader, writer, timeout=5.0) |
| 37 | + return proto |
| 38 | + |
| 39 | + |
| 40 | +@pytest.mark.asyncio |
| 41 | +async def test_exec_sql_mid_write_reset_raises_dqlite_connection_error() -> None: |
| 42 | + """``exec_sql`` calling ``_send`` against a drain that raises |
| 43 | + ``ConnectionResetError`` (peer RST) wraps as |
| 44 | + ``DqliteConnectionError``.""" |
| 45 | + proto = _protocol_with_failing_drain(ConnectionResetError("connection reset")) |
| 46 | + |
| 47 | + with pytest.raises(DqliteConnectionError, match="Write failed"): |
| 48 | + await proto.exec_sql(db_id=0, sql="INSERT INTO t(b) VALUES (?)", params=[b"x" * 100]) |
| 49 | + |
| 50 | + |
| 51 | +@pytest.mark.asyncio |
| 52 | +async def test_exec_sql_mid_write_broken_pipe_raises_dqlite_connection_error() -> None: |
| 53 | + """BrokenPipeError (FIN-then-write) also surfaces uniformly.""" |
| 54 | + proto = _protocol_with_failing_drain(BrokenPipeError("broken pipe")) |
| 55 | + |
| 56 | + with pytest.raises(DqliteConnectionError, match="Write failed"): |
| 57 | + await proto.exec_sql(db_id=0, sql="INSERT INTO t(b) VALUES (?)", params=[b"x" * 100]) |
| 58 | + |
| 59 | + |
| 60 | +@pytest.mark.asyncio |
| 61 | +async def test_exec_sql_mid_write_oserror_raises_dqlite_connection_error() -> None: |
| 62 | + """Generic OSError (host unreachable mid-stream) surfaces uniformly.""" |
| 63 | + proto = _protocol_with_failing_drain(OSError("transport torn down")) |
| 64 | + |
| 65 | + with pytest.raises(DqliteConnectionError, match="Write failed"): |
| 66 | + await proto.exec_sql(db_id=0, sql="INSERT INTO t(b) VALUES (?)", params=[b"x" * 100]) |
| 67 | + |
| 68 | + |
| 69 | +@pytest.mark.asyncio |
| 70 | +async def test_exec_sql_mid_write_does_not_silently_retry() -> None: |
| 71 | + """A second exec_sql on the same protocol after the failure must |
| 72 | + raise too — no silent retry against a dead transport. |
| 73 | +
|
| 74 | + The first call raises DqliteConnectionError. The drain mock keeps |
| 75 | + raising on every subsequent call, so the second exec_sql also |
| 76 | + raises. This pins "the protocol does not internally retry".""" |
| 77 | + proto = _protocol_with_failing_drain(ConnectionResetError("reset")) |
| 78 | + |
| 79 | + with pytest.raises(DqliteConnectionError): |
| 80 | + await proto.exec_sql(db_id=0, sql="INSERT INTO t VALUES (1)", params=None) |
| 81 | + |
| 82 | + # Same protocol, same drain mock — the second call must propagate |
| 83 | + # the same write-failed error, NOT magically succeed. |
| 84 | + with pytest.raises(DqliteConnectionError): |
| 85 | + await proto.exec_sql(db_id=0, sql="INSERT INTO t VALUES (2)", params=None) |
| 86 | + |
| 87 | + |
| 88 | +@pytest.mark.asyncio |
| 89 | +async def test_large_blob_insert_mid_drain_failure_raises() -> None: |
| 90 | + """The same contract holds for a multi-MB BLOB insert — the |
| 91 | + failure point is in the drain, not in encode, so blob size does |
| 92 | + not matter for the contract. Pin it explicitly so a future |
| 93 | + refactor that streams the encode separately doesn't lose this.""" |
| 94 | + proto = _protocol_with_failing_drain(ConnectionResetError("mid-stream reset")) |
| 95 | + big_blob = b"x" * (4 * 1024 * 1024) # 4 MiB |
| 96 | + |
| 97 | + with pytest.raises(DqliteConnectionError, match="Write failed"): |
| 98 | + await proto.exec_sql(db_id=0, sql="INSERT INTO t(b) VALUES (?)", params=[big_blob]) |
0 commit comments