Skip to content

Commit 6f77529

Browse files
fix: add per-node timeout in find_leader()
Wraps _query_leader() in asyncio.wait_for() so a node that accepts TCP but hangs during handshake/get_leader doesn't block the entire leader discovery process indefinitely. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 7f02664 commit 6f77529

2 files changed

Lines changed: 32 additions & 1 deletion

File tree

src/dqliteclient/cluster.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,15 @@ async def find_leader(self) -> str:
4848

4949
for node in nodes:
5050
try:
51-
leader_address = await self._query_leader(node.address)
51+
leader_address = await asyncio.wait_for(
52+
self._query_leader(node.address), timeout=self._timeout
53+
)
5254
if leader_address:
5355
self._leader_address = leader_address
5456
return leader_address
57+
except TimeoutError:
58+
errors.append(f"{node.address}: timed out")
59+
continue
5560
except Exception as e:
5661
errors.append(f"{node.address}: {e}")
5762
continue

tests/test_cluster.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,32 @@ async def test_find_leader_no_leader_known(self) -> None:
106106
):
107107
await client.find_leader()
108108

109+
async def test_find_leader_node_hangs_after_connect(self) -> None:
110+
"""A node that accepts TCP but hangs on handshake should be timed out."""
111+
import asyncio
112+
113+
store = MemoryNodeStore(["localhost:9001"])
114+
client = ClusterClient(store, timeout=0.1)
115+
116+
mock_reader = AsyncMock()
117+
mock_writer = MagicMock()
118+
mock_writer.drain = AsyncMock()
119+
mock_writer.close = MagicMock()
120+
mock_writer.wait_closed = AsyncMock()
121+
122+
# Server accepts connection but never sends a response
123+
async def hang_forever(*args, **kwargs):
124+
await asyncio.sleep(100)
125+
return b""
126+
127+
mock_reader.read.side_effect = hang_forever
128+
129+
with (
130+
patch("asyncio.open_connection", return_value=(mock_reader, mock_writer)),
131+
pytest.raises(ClusterError, match="Could not find leader"),
132+
):
133+
await client.find_leader()
134+
109135
async def test_update_nodes(self) -> None:
110136
store = MemoryNodeStore()
111137
client = ClusterClient(store)

0 commit comments

Comments
 (0)