Skip to content

Commit 2603102

Browse files
fix: skip ROLLBACK when the socket is already dead
_reset_connection issued ROLLBACK blindly, so a half-closed TCP socket (LB idle reset, peer crash, etc.) took self._timeout (default 10 s) to discover — paid on every pool release through a dead connection. Add a cheap pre-write liveness check: if writer.transport.is_closing() or reader.at_eof() return True, bail early with False. Use isinstance bool checks so mocked attributes don't false-trigger. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 9b7a7f6 commit 2603102

2 files changed

Lines changed: 73 additions & 0 deletions

File tree

src/dqliteclient/pool.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,30 @@
1111
from dqliteclient.exceptions import DqliteConnectionError
1212

1313

14+
def _socket_looks_dead(conn: DqliteConnection) -> bool:
15+
"""Best-effort local detection of a half-closed TCP socket.
16+
17+
Returns True only on an affirmative bool signal from the transport or
18+
reader. Mocked / missing attributes default to False (assume alive) so
19+
the check never produces a false positive against well-behaved peers.
20+
"""
21+
protocol = conn._protocol
22+
if protocol is None:
23+
return True
24+
writer = getattr(protocol, "_writer", None)
25+
reader = getattr(protocol, "_reader", None)
26+
transport = getattr(writer, "transport", None) if writer is not None else None
27+
try:
28+
closing = transport.is_closing() if transport is not None else False
29+
except Exception:
30+
closing = False
31+
try:
32+
eof = reader.at_eof() if reader is not None else False
33+
except Exception:
34+
eof = False
35+
return (isinstance(closing, bool) and closing) or (isinstance(eof, bool) and eof)
36+
37+
1438
class ConnectionPool:
1539
"""Connection pool with automatic leader detection.
1640
@@ -181,6 +205,11 @@ async def _reset_connection(self, conn: DqliteConnection) -> bool:
181205
False if it should be destroyed.
182206
"""
183207
if conn._in_transaction:
208+
# Cheap pre-write liveness check: if the transport is already
209+
# closing or the reader has seen EOF, ROLLBACK would stall on
210+
# _read_data until self._timeout. Bail fast instead.
211+
if _socket_looks_dead(conn):
212+
return False
184213
try:
185214
await conn.execute("ROLLBACK")
186215
except BaseException:

tests/test_pool.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -692,6 +692,50 @@ async def exec_or_cancel(db_id, sql, params=None):
692692

693693
await pool.close()
694694

695+
async def test_reset_connection_skips_rollback_on_dead_socket(self) -> None:
696+
"""If the transport is already closing / EOF, _reset_connection must
697+
return False without waiting for a read timeout. Otherwise every
698+
release through a half-closed socket costs self._timeout seconds.
699+
"""
700+
import asyncio
701+
702+
from dqliteclient.connection import DqliteConnection
703+
704+
pool = ConnectionPool(["localhost:9001"], max_size=1, timeout=10.0)
705+
706+
conn = DqliteConnection("127.0.0.1:9001")
707+
conn._protocol = MagicMock()
708+
conn._db_id = 1
709+
conn._bound_loop = asyncio.get_running_loop()
710+
conn._in_transaction = True
711+
712+
# Reader has seen EOF / transport is closing.
713+
dead_reader = MagicMock()
714+
dead_reader.at_eof = MagicMock(return_value=True)
715+
dead_transport = MagicMock()
716+
dead_transport.is_closing = MagicMock(return_value=True)
717+
dead_writer = MagicMock()
718+
dead_writer.transport = dead_transport
719+
conn._protocol._reader = dead_reader
720+
conn._protocol._writer = dead_writer
721+
722+
exec_called = False
723+
724+
async def track_exec(*args, **kwargs):
725+
nonlocal exec_called
726+
exec_called = True
727+
# Never resolves — we'd pay self._timeout if reached.
728+
await asyncio.sleep(1000)
729+
return (0, 0)
730+
731+
conn._protocol.exec_sql = track_exec
732+
conn._protocol.close = MagicMock()
733+
conn._protocol.wait_closed = AsyncMock()
734+
735+
result = await asyncio.wait_for(pool._reset_connection(conn), timeout=1.0)
736+
assert result is False
737+
assert not exec_called, "ROLLBACK must not be sent on a dead socket"
738+
695739
async def test_reset_connection_returns_false_on_cancelled_error(self) -> None:
696740
"""_reset_connection must return False (not raise) when ROLLBACK is cancelled."""
697741
import asyncio

0 commit comments

Comments
 (0)