Skip to content

Commit 38d06e3

Browse files
Reject use of ClusterClient.find_leader from a forked child process
The single-flight slot map (_find_leader_tasks) holds asyncio.Task instances bound to the parent's loop. A child that forks mid-sweep and calls find_leader observes an inherited task and falls through to await asyncio.shield(<parent-task>) — undefined behavior. Record os.getpid() in __init__ and raise InterfaceError("ClusterClient used after fork; reconstruct from configuration in the target process") at the top of find_leader. Symmetric with the cycle 20 DqliteConnection / ConnectionPool guards. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 33af6f4 commit 38d06e3

2 files changed

Lines changed: 92 additions & 0 deletions

File tree

src/dqliteclient/cluster.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import asyncio
44
import contextlib
55
import logging
6+
import os
67
import random
78
from collections.abc import Callable, Iterable
89
from typing import Final, NoReturn
@@ -12,6 +13,7 @@
1213
ClusterError,
1314
ClusterPolicyError,
1415
DqliteConnectionError,
16+
InterfaceError,
1517
OperationalError,
1618
ProtocolError,
1719
)
@@ -148,6 +150,13 @@ def __init__(
148150
# one pool out of widened heartbeats. Bounded to two slots
149151
# since the flag is a bool.
150152
self._find_leader_tasks: dict[tuple[bool], asyncio.Task[str]] = {}
153+
# Fork-after-init: the slot map holds asyncio.Task instances
154+
# bound to the parent's loop. A child that forks mid-sweep
155+
# would observe an inherited task and ``await
156+
# asyncio.shield(<parent-loop task>)`` — undefined behaviour.
157+
# Symmetric with the DqliteConnection / ConnectionPool
158+
# pid guards added in cycle 20.
159+
self._creator_pid = os.getpid()
151160

152161
@classmethod
153162
def from_addresses(
@@ -214,6 +223,15 @@ async def find_leader(self, *, trust_server_heartbeat: bool = False) -> str:
214223
(re-poll the node store between probes) costs an extra await
215224
per probe to close a window most callers do not exercise.
216225
"""
226+
if os.getpid() != self._creator_pid:
227+
# Fork-after-init: the slot map holds parent-loop tasks
228+
# that the child cannot drive. Surface a clear
229+
# InterfaceError instead of letting a sibling task land
230+
# at ``await asyncio.shield(<parent-task>)``.
231+
raise InterfaceError(
232+
"ClusterClient used after fork; reconstruct from configuration "
233+
"in the target process."
234+
)
217235
key: tuple[bool] = (trust_server_heartbeat,)
218236
task = self._find_leader_tasks.get(key)
219237
if task is None or task.done():
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
"""Pin: ``ClusterClient.find_leader`` raises a clear ``InterfaceError``
2+
when called after ``os.fork``.
3+
4+
The single-flight slot map holds ``asyncio.Task`` instances bound to
5+
the parent's loop. A child that forks mid-sweep and calls
6+
``find_leader`` would observe an inherited task and fall through to
7+
``await asyncio.shield(<parent-loop task>)`` — undefined behaviour.
8+
9+
Cycle 20 added pid guards to ``DqliteConnection`` and
10+
``ConnectionPool``; this extends the same pattern to ``ClusterClient``.
11+
"""
12+
13+
from __future__ import annotations
14+
15+
import os
16+
from unittest.mock import patch
17+
18+
import pytest
19+
20+
from dqliteclient.cluster import ClusterClient
21+
from dqliteclient.exceptions import InterfaceError
22+
from dqliteclient.node_store import MemoryNodeStore
23+
24+
25+
@pytest.mark.asyncio
26+
async def test_find_leader_after_fork_raises_interface_error() -> None:
27+
cluster = ClusterClient(node_store=MemoryNodeStore(["localhost:9001"]))
28+
fake_parent_pid = cluster._creator_pid + 1
29+
cluster._creator_pid = fake_parent_pid
30+
31+
with (
32+
patch("dqliteclient.cluster.os.getpid", return_value=fake_parent_pid + 1),
33+
pytest.raises(InterfaceError, match="fork"),
34+
):
35+
await cluster.find_leader()
36+
37+
38+
@pytest.mark.skipif(not hasattr(os, "fork"), reason="requires os.fork")
39+
def test_find_leader_actual_fork_raises() -> None:
40+
"""End-to-end fork: parent stages a ClusterClient; child calls
41+
find_leader and reports back."""
42+
cluster = ClusterClient(node_store=MemoryNodeStore(["127.0.0.1:9999"]))
43+
44+
r, w = os.pipe()
45+
pid = os.fork()
46+
if pid == 0:
47+
try:
48+
os.close(r)
49+
try:
50+
import asyncio
51+
52+
async def run() -> None:
53+
await cluster.find_leader()
54+
55+
asyncio.run(run())
56+
os.write(w, b"NO_RAISE")
57+
except InterfaceError as e:
58+
os.write(w, b"OK" if "fork" in str(e) else f"WRONG_MSG:{e}".encode())
59+
except Exception as e: # noqa: BLE001
60+
os.write(w, f"WRONG_TYPE:{type(e).__name__}:{e}".encode())
61+
finally:
62+
os.close(w)
63+
finally:
64+
os._exit(0)
65+
os.close(w)
66+
result = b""
67+
while True:
68+
chunk = os.read(r, 4096)
69+
if not chunk:
70+
break
71+
result += chunk
72+
os.close(r)
73+
os.waitpid(pid, 0)
74+
assert result == b"OK", f"child reported: {result!r}"

0 commit comments

Comments
 (0)