Skip to content

Commit 817a932

Browse files
fix: remove await writer.wait_closed() from _query_leader finally block
The finally block in _query_leader() called await writer.wait_closed() after writer.close(). If the remote peer is unresponsive, wait_closed() blocks indefinitely, delaying or preventing find_leader() from trying the next node. Since the leader query only reads data (nothing to flush), writer.close() alone is sufficient. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 9244eb7 commit 817a932

File tree

2 files changed

+39
-1
lines changed

2 files changed

+39
-1
lines changed

src/dqliteclient/cluster.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ async def _query_leader(self, address: str) -> str | None:
9191
return None
9292
finally:
9393
writer.close()
94-
await writer.wait_closed()
9594

9695
async def connect(self, database: str = "default") -> DqliteConnection:
9796
"""Connect to the cluster leader.

tests/test_cluster.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,45 @@ async def test_query_leader_closes_writer_on_protocol_init_error(self) -> None:
184184
# Even though DqliteProtocol() failed, the writer must be closed
185185
mock_writer.close.assert_called()
186186

187+
async def test_query_leader_does_not_hang_on_slow_wait_closed(self) -> None:
188+
"""_query_leader must not hang if wait_closed() blocks (e.g., unresponsive peer)."""
189+
import asyncio
190+
191+
store = MemoryNodeStore(["localhost:9001"])
192+
client = ClusterClient(store, timeout=0.5)
193+
194+
mock_reader = AsyncMock()
195+
mock_writer = MagicMock()
196+
mock_writer.drain = AsyncMock()
197+
mock_writer.close = MagicMock()
198+
199+
# wait_closed blocks forever (simulates unresponsive peer)
200+
async def hang_forever():
201+
await asyncio.sleep(999)
202+
203+
mock_writer.wait_closed = hang_forever
204+
205+
from dqlitewire.messages import LeaderResponse, WelcomeResponse
206+
207+
responses = [
208+
WelcomeResponse(heartbeat_timeout=15000).encode(),
209+
LeaderResponse(node_id=1, address="").encode(),
210+
]
211+
mock_reader.read.side_effect = responses
212+
213+
with patch("asyncio.open_connection", return_value=(mock_reader, mock_writer)):
214+
# This should complete within the timeout, not hang on wait_closed
215+
try:
216+
leader = await asyncio.wait_for(client.find_leader(), timeout=2.0)
217+
assert leader == "localhost:9001"
218+
except TimeoutError:
219+
pytest.fail(
220+
"find_leader() hung because _query_leader's finally block "
221+
"awaited a blocking wait_closed()"
222+
)
223+
224+
mock_writer.close.assert_called()
225+
187226
async def test_update_nodes(self) -> None:
188227
store = MemoryNodeStore()
189228
client = ClusterClient(store)

0 commit comments

Comments
 (0)