Skip to content

Commit 36fff71

Browse files
Add redirect_policy to ClusterClient (leader-trust allowlist)
ISSUE-04 — a compromised peer could return any address as the current leader; the client blindly opened a TCP connection to that address (SSRF-style attack surface). ClusterClient now accepts an optional ``redirect_policy`` callable that authorizes each redirect target. Convenience ``allowlist_policy`` helper builds one from a static address list — the common pattern "only redirect to hosts I seed-listed." Backwards-compatible: policy defaults to None which preserves the legacy "accept any redirect" behavior. Users should set it in production where network trust isn't absolute. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 9bf6304 commit 36fff71

File tree

2 files changed

+121
-2
lines changed

2 files changed

+121
-2
lines changed

src/dqliteclient/cluster.py

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import asyncio
44
import random
5+
from collections.abc import Callable
56

67
from dqliteclient.connection import DqliteConnection, _parse_address
78
from dqliteclient.exceptions import (
@@ -15,6 +16,11 @@
1516
from dqliteclient.retry import retry_with_backoff
1617

1718

19+
# Type alias for a redirect-target policy. Returns True if the address
20+
# should be accepted, False to reject with a ClusterError.
21+
RedirectPolicy = Callable[[str], bool]
22+
23+
1824
class ClusterClient:
1925
"""Client with automatic leader detection and failover."""
2026

@@ -23,23 +29,47 @@ def __init__(
2329
node_store: NodeStore,
2430
*,
2531
timeout: float = 10.0,
32+
redirect_policy: RedirectPolicy | None = None,
2633
) -> None:
2734
"""Initialize cluster client.
2835
2936
Args:
3037
node_store: Store for cluster node information
3138
timeout: Connection timeout in seconds
39+
redirect_policy: Optional callable ``(address) -> bool`` that
40+
authorizes each leader redirect target. If None (default),
41+
redirects are accepted — this preserves backward
42+
compatibility but permits a compromised peer to redirect
43+
clients to arbitrary hosts (SSRF-style). Supply a
44+
callable or the ``only_nodes_in_store`` helper to
45+
constrain where redirects can go.
3246
"""
3347
if timeout <= 0:
3448
raise ValueError(f"timeout must be positive, got {timeout}")
3549
self._node_store = node_store
3650
self._timeout = timeout
51+
self._redirect_policy = redirect_policy
3752

3853
@classmethod
39-
def from_addresses(cls, addresses: list[str], timeout: float = 10.0) -> "ClusterClient":
54+
def from_addresses(
55+
cls,
56+
addresses: list[str],
57+
timeout: float = 10.0,
58+
*,
59+
redirect_policy: RedirectPolicy | None = None,
60+
) -> "ClusterClient":
4061
"""Create cluster client from list of addresses."""
4162
store = MemoryNodeStore(addresses)
42-
return cls(store, timeout=timeout)
63+
return cls(store, timeout=timeout, redirect_policy=redirect_policy)
64+
65+
def _check_redirect(self, address: str) -> None:
66+
"""Reject leader-redirect targets that fail the configured policy."""
67+
if self._redirect_policy is None:
68+
return
69+
if not self._redirect_policy(address):
70+
raise ClusterError(
71+
f"Leader redirect to {address!r} rejected by redirect_policy"
72+
)
4373

4474
async def find_leader(self) -> str:
4575
"""Find the current cluster leader.
@@ -68,6 +98,12 @@ async def find_leader(self) -> str:
6898
self._query_leader(node.address), timeout=self._timeout
6999
)
70100
if leader_address:
101+
# Only leader_address values that did NOT come from
102+
# node.address itself need authorizing — those are
103+
# real redirects. If the server returned its own
104+
# address, it's the leader and already in the store.
105+
if leader_address != node.address:
106+
self._check_redirect(leader_address)
71107
return leader_address
72108
except TimeoutError as e:
73109
errors.append(f"{node.address}: timed out")
@@ -143,3 +179,19 @@ async def try_connect() -> DqliteConnection:
143179
async def update_nodes(self, nodes: list[NodeInfo]) -> None:
144180
"""Update the node store with new node information."""
145181
await self._node_store.set_nodes(nodes)
182+
183+
184+
def allowlist_policy(addresses: list[str] | set[str]) -> RedirectPolicy:
185+
"""Build a redirect policy that accepts only the given addresses.
186+
187+
Useful for the common case: "only allow redirects to hosts I've
188+
explicitly seed-listed." Addresses are matched by exact string
189+
equality — callers that need CIDR / DNS / wildcard matching should
190+
supply their own callable.
191+
"""
192+
allowed = set(addresses)
193+
194+
def policy(addr: str) -> bool:
195+
return addr in allowed
196+
197+
return policy

tests/test_redirect_policy.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
"""Leader-redirect allowlist policy (ISSUE-04).
2+
3+
A compromised peer can return any address as the leader; the client
4+
used to open a TCP connection to whatever it was told. ClusterClient
5+
now takes an optional ``redirect_policy`` callable that authorizes
6+
each redirect target; a convenience ``allowlist_policy`` helper builds
7+
one from a static list of addresses.
8+
"""
9+
10+
import asyncio
11+
from unittest.mock import AsyncMock, patch
12+
13+
import pytest
14+
15+
from dqliteclient.cluster import ClusterClient, allowlist_policy
16+
from dqliteclient.exceptions import ClusterError
17+
from dqliteclient.node_store import MemoryNodeStore
18+
19+
20+
class TestRedirectPolicy:
21+
def test_accepts_seeded_redirect(self) -> None:
22+
store = MemoryNodeStore(["10.0.0.1:9001", "10.0.0.2:9001"])
23+
cc = ClusterClient(
24+
store,
25+
timeout=5.0,
26+
redirect_policy=allowlist_policy(["10.0.0.1:9001", "10.0.0.2:9001"]),
27+
)
28+
# Simulate: probing 10.0.0.1 reports 10.0.0.2 as leader.
29+
with patch.object(cc, "_query_leader", new=AsyncMock(return_value="10.0.0.2:9001")):
30+
result = asyncio.run(cc.find_leader())
31+
assert result == "10.0.0.2:9001"
32+
33+
def test_rejects_unknown_redirect(self) -> None:
34+
store = MemoryNodeStore(["10.0.0.1:9001"])
35+
cc = ClusterClient(
36+
store,
37+
timeout=5.0,
38+
redirect_policy=allowlist_policy(["10.0.0.1:9001"]),
39+
)
40+
# A compromised peer redirects to an attacker-controlled host.
41+
with patch.object(cc, "_query_leader", new=AsyncMock(return_value="attacker.com:9001")):
42+
with pytest.raises(ClusterError, match="rejected"):
43+
asyncio.run(cc.find_leader())
44+
45+
def test_no_policy_means_any_redirect_accepted(self) -> None:
46+
"""Default (None) policy preserves legacy behavior."""
47+
store = MemoryNodeStore(["10.0.0.1:9001"])
48+
cc = ClusterClient(store, timeout=5.0)
49+
with patch.object(cc, "_query_leader", new=AsyncMock(return_value="anywhere.invalid:9001")):
50+
result = asyncio.run(cc.find_leader())
51+
assert result == "anywhere.invalid:9001"
52+
53+
def test_self_leader_bypasses_policy(self) -> None:
54+
"""If the queried node is the leader (returns its own address),
55+
the redirect policy doesn't apply — the address is already in the
56+
seed list by definition."""
57+
store = MemoryNodeStore(["10.0.0.1:9001"])
58+
# Policy that rejects everything — but the node returning its own
59+
# address isn't a real redirect, so it's accepted.
60+
cc = ClusterClient(
61+
store,
62+
timeout=5.0,
63+
redirect_policy=lambda _a: False,
64+
)
65+
with patch.object(cc, "_query_leader", new=AsyncMock(return_value="10.0.0.1:9001")):
66+
result = asyncio.run(cc.find_leader())
67+
assert result == "10.0.0.1:9001"

0 commit comments

Comments
 (0)