Skip to content

Commit 20610a6

Browse files
fix: narrow find_leader exception catch and chain cause
find_leader previously caught bare Exception, stringified any error (even TypeError/KeyError programming bugs) into the errors list, and raised ClusterError with no __cause__. ClusterError is in retry_with_backoff's retryable set, so a programming bug got retried 5× before propagating as a generic error with no traceback. Narrow the catch to the real transport-level errors (DqliteConnectionError, ProtocolError, OperationalError, OSError) so programming bugs propagate untouched, and chain the final ClusterError's __cause__ to the last caught exception so logs surface the underlying reason. Also wrap wire-library protocol errors in DqliteProtocol reads so they surface as the client's ProtocolError rather than raw DecodeError/etc., which keeps the narrow catch honest. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d2d2f66 commit 20610a6

3 files changed

Lines changed: 75 additions & 15 deletions

File tree

src/dqliteclient/cluster.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,12 @@
33
import asyncio
44

55
from dqliteclient.connection import DqliteConnection, _parse_address
6-
from dqliteclient.exceptions import ClusterError, DqliteConnectionError, OperationalError
6+
from dqliteclient.exceptions import (
7+
ClusterError,
8+
DqliteConnectionError,
9+
OperationalError,
10+
ProtocolError,
11+
)
712
from dqliteclient.node_store import MemoryNodeStore, NodeInfo, NodeStore
813
from dqliteclient.protocol import DqliteProtocol
914
from dqliteclient.retry import retry_with_backoff
@@ -46,6 +51,7 @@ async def find_leader(self) -> str:
4651
raise ClusterError("No nodes configured")
4752

4853
errors: list[str] = []
54+
last_exc: BaseException | None = None
4955

5056
for node in nodes:
5157
try:
@@ -54,14 +60,19 @@ async def find_leader(self) -> str:
5460
)
5561
if leader_address:
5662
return leader_address
57-
except TimeoutError:
63+
except TimeoutError as e:
5864
errors.append(f"{node.address}: timed out")
65+
last_exc = e
5966
continue
60-
except Exception as e:
67+
except (DqliteConnectionError, ProtocolError, OperationalError, OSError) as e:
68+
# Narrow the catch so programming bugs (TypeError, KeyError,
69+
# etc.) propagate directly instead of being stringified into
70+
# a retryable ClusterError.
6171
errors.append(f"{node.address}: {e}")
72+
last_exc = e
6273
continue
6374

64-
raise ClusterError(f"Could not find leader. Errors: {'; '.join(errors)}")
75+
raise ClusterError(f"Could not find leader. Errors: {'; '.join(errors)}") from last_exc
6576

6677
async def _query_leader(self, address: str) -> str | None:
6778
"""Query a node for the current leader."""

src/dqliteclient/protocol.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from dqliteclient.exceptions import DqliteConnectionError, OperationalError, ProtocolError
99
from dqlitewire import MessageDecoder, MessageEncoder
10+
from dqlitewire.exceptions import ProtocolError as _WireProtocolError
1011
from dqlitewire.messages import (
1112
ClientRequest,
1213
DbResponse,
@@ -298,12 +299,15 @@ async def _read_continuation(self, deadline: float | None = None) -> RowsRespons
298299
"""
299300
if deadline is None:
300301
deadline = self._operation_deadline()
301-
while True:
302-
result = self._decoder.decode_continuation()
303-
if result is not None:
304-
return result
305-
data = await self._read_data(deadline=deadline)
306-
self._decoder.feed(data)
302+
try:
303+
while True:
304+
result = self._decoder.decode_continuation()
305+
if result is not None:
306+
return result
307+
data = await self._read_data(deadline=deadline)
308+
self._decoder.feed(data)
309+
except _WireProtocolError as e:
310+
raise ProtocolError(f"Wire decode failed: {e}") from e
307311

308312
async def _read_response(self, deadline: float | None = None) -> Message:
309313
"""Read and decode the next response message.
@@ -315,11 +319,15 @@ async def _read_response(self, deadline: float | None = None) -> Message:
315319
"""
316320
if deadline is None:
317321
deadline = self._operation_deadline()
318-
while not self._decoder.has_message():
319-
data = await self._read_data(deadline=deadline)
320-
self._decoder.feed(data)
322+
try:
323+
while not self._decoder.has_message():
324+
data = await self._read_data(deadline=deadline)
325+
self._decoder.feed(data)
326+
327+
message = self._decoder.decode()
328+
except _WireProtocolError as e:
329+
raise ProtocolError(f"Wire decode failed: {e}") from e
321330

322-
message = self._decoder.decode()
323331
if message is None:
324332
raise ProtocolError("Failed to decode message")
325333

tests/test_cluster.py

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,8 @@ async def test_query_leader_closes_writer_on_handshake_error(self) -> None:
162162

163163
async def test_query_leader_closes_writer_on_protocol_init_error(self) -> None:
164164
"""Writer must be closed even if DqliteProtocol construction fails."""
165+
from dqliteclient.exceptions import DqliteConnectionError
166+
165167
store = MemoryNodeStore(["localhost:9001"])
166168
client = ClusterClient(store, timeout=1.0)
167169

@@ -175,7 +177,7 @@ async def test_query_leader_closes_writer_on_protocol_init_error(self) -> None:
175177
patch("asyncio.open_connection", return_value=(mock_reader, mock_writer)),
176178
patch(
177179
"dqliteclient.cluster.DqliteProtocol",
178-
side_effect=RuntimeError("init failed"),
180+
side_effect=DqliteConnectionError("init failed"),
179181
),
180182
pytest.raises(ClusterError),
181183
):
@@ -223,6 +225,45 @@ async def hang_forever():
223225

224226
mock_writer.close.assert_called()
225227

228+
async def test_find_leader_propagates_programming_bugs(self) -> None:
229+
"""Programming bugs (TypeError etc.) must propagate, not be swallowed
230+
into a generic ClusterError. ClusterError is retryable upstream, so
231+
stringifying a bug here amplifies it N*retries times.
232+
"""
233+
store = MemoryNodeStore(["localhost:9001", "localhost:9002"])
234+
client = ClusterClient(store, timeout=0.5)
235+
236+
async def buggy_query(_address: str) -> str | None:
237+
raise TypeError("programmer mistake")
238+
239+
with (
240+
patch.object(client, "_query_leader", side_effect=buggy_query),
241+
pytest.raises(TypeError, match="programmer mistake"),
242+
):
243+
await client.find_leader()
244+
245+
async def test_find_leader_transport_error_chains_cause(self) -> None:
246+
"""When every node yields a transport error, the final ClusterError
247+
must have __cause__ set so logs show the underlying reason, not just
248+
the generic message.
249+
"""
250+
from dqliteclient.exceptions import DqliteConnectionError
251+
252+
store = MemoryNodeStore(["localhost:9001"])
253+
client = ClusterClient(store, timeout=0.5)
254+
255+
boom = DqliteConnectionError("handshake failed")
256+
257+
async def failing_query(_address: str) -> str | None:
258+
raise boom
259+
260+
with (
261+
patch.object(client, "_query_leader", side_effect=failing_query),
262+
pytest.raises(ClusterError) as exc_info,
263+
):
264+
await client.find_leader()
265+
assert exc_info.value.__cause__ is boom
266+
226267
async def test_update_nodes(self) -> None:
227268
store = MemoryNodeStore()
228269
client = ClusterClient(store)

0 commit comments

Comments
 (0)