Skip to content

Commit 8117a5e

Browse files
authored
feat: future-based result routing for async concurrent events (#567)
* feat: add future-based result routing for async concurrent events (#509) When multiple coroutines send events concurrently, only the lock holder processes events. Previously, non-lock-holding callers returned None, losing both results and exceptions. Now each external event gets an asyncio.Future attached in AsyncEngine.put(). The processing loop resolves/rejects each future with the microstep result. Callers that couldn't acquire the lock await their future instead of returning None. Uses contextvars.ContextVar to distinguish reentrant calls (from within callbacks, including asyncio.gather child tasks) from concurrent external calls — reentrant calls don't get futures to avoid deadlocks. * test: add regression test for issue #509 example * refactor: move reject_futures to EventQueue, fix test latency and doctest - Move future rejection logic from AsyncEngine into EventQueue.reject_futures() to respect encapsulation (avoids reaching into PriorityQueue internals) - Reduce asyncio.sleep in test_issue509 from 0.1s to 0.01s - Convert plain python block in docs/async.md to a testable doctest * test: cover EventQueue.reject_futures with future=None items
1 parent a214e04 commit 8117a5e

11 files changed

Lines changed: 622 additions & 24 deletions

File tree

AGENTS.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,15 @@ uv run mypy statemachine/ tests/
146146

147147
## Design principles
148148

149+
- **Follow SOLID principles.** In particular:
150+
- **Law of Demeter:** Methods should depend only on the data they need, not on the
151+
objects that contain it. Pass the specific value (e.g., a `Future`) rather than the
152+
parent object (e.g., `TriggerData`) — this reduces coupling and removes the need for
153+
null-checks on intermediate accessors.
154+
- **Single Responsibility:** Each module, class, and function should have one clear reason
155+
to change.
156+
- **Interface Segregation:** Depend on narrow interfaces. If a helper only needs one field
157+
from a dataclass, accept that field directly.
149158
- **Decouple infrastructure from domain:** Modules like `signature.py` and `dispatcher.py` are
150159
general-purpose (signature adaptation, listener/observer pattern) and intentionally not coupled
151160
to the state machine domain. Prefer this separation even for modules that are only used
@@ -163,6 +172,13 @@ uv run sphinx-build docs docs/_build/html
163172
uv run sphinx-autobuild docs docs/_build/html --re-ignore "auto_examples/.*"
164173
```
165174

175+
### Documentation code examples
176+
177+
All code examples in `docs/*.md` **must** be testable doctests (using ```` ```py ```` with
178+
`>>>` prompts), not plain ```` ```python ```` blocks. The test suite collects them via
179+
`--doctest-glob=*.md`. If an example cannot be expressed as a doctest (e.g., it requires
180+
real concurrency), write it as a unit test in `tests/` and reference it from the docs instead.
181+
166182
## Git workflow
167183

168184
- Main branch: `develop`

docs/async.md

Lines changed: 66 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -193,13 +193,74 @@ compound states, parallel states, history pseudo-states, eventless transitions,
193193
and `done.state` events — are fully supported in async code. The same
194194
`activate_initial_state()` pattern applies:
195195

196-
```python
197-
async def run():
198-
sm = MyStateChart()
199-
await sm.activate_initial_state()
200-
await sm.send("event")
196+
```py
197+
>>> async def run():
198+
... sm = AsyncStateMachine()
199+
... await sm.activate_initial_state()
200+
... result = await sm.send("advance")
201+
... return result
202+
203+
>>> asyncio.run(run())
204+
42
205+
201206
```
202207

208+
### Concurrent event sending
209+
210+
```{versionadded} 3.0.0
211+
```
212+
213+
When multiple coroutines send events concurrently (e.g., via `asyncio.gather`),
214+
each caller receives its own event's result — even though only one coroutine
215+
actually runs the processing loop at a time.
216+
217+
```py
218+
>>> class ConcurrentSC(StateChart):
219+
... s1 = State(initial=True)
220+
... s2 = State()
221+
... s3 = State(final=True)
222+
...
223+
... step1 = s1.to(s2)
224+
... step2 = s2.to(s3)
225+
...
226+
... async def on_step1(self):
227+
... return "result_1"
228+
...
229+
... async def on_step2(self):
230+
... return "result_2"
231+
232+
>>> async def run_concurrent():
233+
... import asyncio as _asyncio
234+
... sm = ConcurrentSC()
235+
... await sm.activate_initial_state()
236+
... r1, r2 = await _asyncio.gather(
237+
... sm.send("step1"),
238+
... sm.send("step2"),
239+
... )
240+
... return r1, r2
241+
242+
>>> asyncio.run(run_concurrent())
243+
('result_1', 'result_2')
244+
245+
```
246+
247+
Under the hood, the async engine attaches an `asyncio.Future` to each
248+
externally enqueued event. The coroutine that acquires the processing lock
249+
resolves each event's future as it processes the queue. Callers that couldn't
250+
acquire the lock simply `await` their future.
251+
252+
```{note}
253+
Futures are only created for **external** events sent from outside the
254+
processing loop. Events triggered from within callbacks (reentrant calls)
255+
follow the existing run-to-completion (RTC) model — they are enqueued and
256+
processed within the current macrostep, and the callback receives ``None``.
257+
```
258+
259+
If an exception occurs during processing (with `error_on_execution=False`),
260+
the exception is routed to the caller whose event caused it. Other callers
261+
whose events were still pending will also receive the exception, since the
262+
processing loop clears the queue on failure.
263+
203264
### Async-specific limitations
204265

205266
- **Initial state activation**: In async code, you must `await sm.activate_initial_state()`

docs/releases/3.0.0.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,19 @@ class GameCharacter(StateChart):
410410
See {ref}`weighted-transitions` for full documentation.
411411

412412

413+
### Async concurrent event result routing
414+
415+
When multiple coroutines send events concurrently via `asyncio.gather`, each
416+
caller now receives its own event's result (or exception). Previously, only the
417+
first caller to acquire the processing lock would get a result — subsequent
418+
callers received `None` and exceptions could leak to the wrong caller.
419+
420+
This is implemented by attaching an `asyncio.Future` to each externally
421+
enqueued event in the async engine. See {ref}`async` for details.
422+
423+
Fixes [#509](https://github.com/fgmacedo/python-statemachine/issues/509).
424+
425+
413426
## Bugfixes in 3.0.0
414427

415428
- Fixes [#XXX](https://github.com/fgmacedo/python-statemachine/issues/XXX).

statemachine/engines/async_.py

Lines changed: 97 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import contextvars
23
import logging
34
from itertools import chain
45
from time import time
@@ -21,13 +22,57 @@
2122
logger = logging.getLogger(__name__)
2223

2324

25+
# ContextVar to distinguish reentrant calls (from within callbacks) from
26+
# concurrent external calls. asyncio propagates context to child tasks
27+
# (e.g., those created by asyncio.gather in the callback system), so a
28+
# ContextVar set in the processing loop is visible in all callbacks.
29+
# Independent external coroutines have their own context where this is False.
30+
_in_processing_loop: contextvars.ContextVar[bool] = contextvars.ContextVar(
31+
"_in_processing_loop", default=False
32+
)
33+
34+
2435
class AsyncEngine(BaseEngine):
2536
"""Async engine with full StateChart support.
2637
2738
Mirrors :class:`SyncEngine` algorithm but uses ``async``/``await`` for callback dispatch.
2839
All pure-computation helpers are inherited from :class:`BaseEngine`.
2940
"""
3041

42+
def put(self, trigger_data: TriggerData, internal: bool = False, _delayed: bool = False):
43+
"""Override to attach an asyncio.Future for external events.
44+
45+
Futures are only created when:
46+
- The event is external (not internal)
47+
- No future is already attached
48+
- There is a running asyncio loop
49+
- The call is NOT from within the processing loop (reentrant calls
50+
from callbacks must not get futures, as that would deadlock)
51+
"""
52+
if not internal and trigger_data.future is None and not _in_processing_loop.get():
53+
try:
54+
loop = asyncio.get_running_loop()
55+
trigger_data.future = loop.create_future()
56+
except RuntimeError:
57+
pass # No running loop — sync caller
58+
super().put(trigger_data, internal=internal, _delayed=_delayed)
59+
60+
@staticmethod
61+
def _resolve_future(future: "asyncio.Future[object] | None", result):
62+
"""Resolve a future with the given result, if present and not yet done."""
63+
if future is not None and not future.done():
64+
future.set_result(result)
65+
66+
@staticmethod
67+
def _reject_future(future: "asyncio.Future[object] | None", exc: Exception):
68+
"""Reject a future with the given exception, if present and not yet done."""
69+
if future is not None and not future.done():
70+
future.set_exception(exc)
71+
72+
def _reject_pending_futures(self, exc: Exception):
73+
"""Reject all unresolved futures in the external queue."""
74+
self.external_queue.reject_futures(exc)
75+
3176
# --- Callback dispatch overrides (async versions of BaseEngine methods) ---
3277

3378
async def _get_args_kwargs(
@@ -265,16 +310,27 @@ async def activate_initial_state(self):
265310
"""
266311
return await self.processing_loop()
267312

268-
async def processing_loop(self): # noqa: C901
313+
async def processing_loop( # noqa: C901
314+
self, caller_future: "asyncio.Future[object] | None" = None
315+
):
269316
"""Process event triggers with the 3-phase macrostep architecture.
270317
271318
Phase 1: Eventless transitions + internal queue until quiescence.
272319
Phase 2: Remaining internal events (safety net for invoke-generated events).
273320
Phase 3: External events.
321+
322+
When ``caller_future`` is provided, the caller can ``await`` it to
323+
receive its own event's result — even if another coroutine holds the
324+
processing lock.
274325
"""
275326
if not self._processing.acquire(blocking=False):
327+
# Another coroutine holds the lock and will process our event.
328+
# Await the caller's future so we get our own result back.
329+
if caller_future is not None:
330+
return await caller_future
276331
return None
277332

333+
_ctx_token = _in_processing_loop.set(True)
278334
logger.debug("Processing loop started: %s", self.sm.current_state_value)
279335
first_result = self._sentinel
280336
try:
@@ -336,26 +392,53 @@ async def processing_loop(self): # noqa: C901
336392
)
337393
break
338394

339-
enabled_transitions = await self.select_transitions(external_event)
340-
logger.debug("Enabled transitions: %s", enabled_transitions)
341-
if enabled_transitions:
342-
try:
395+
event_future = external_event.future
396+
try:
397+
enabled_transitions = await self.select_transitions(external_event)
398+
logger.debug("Enabled transitions: %s", enabled_transitions)
399+
if enabled_transitions:
343400
result = await self.microstep(
344401
list(enabled_transitions), external_event
345402
)
403+
self._resolve_future(event_future, result)
346404
if first_result is self._sentinel:
347405
first_result = result
348-
except Exception:
349-
self.clear()
350-
raise
351-
352-
else:
353-
if not self.sm.allow_event_without_transition:
354-
raise TransitionNotAllowed(external_event.event, self.sm.configuration)
355-
406+
else:
407+
if not self.sm.allow_event_without_transition:
408+
tna = TransitionNotAllowed(
409+
external_event.event, self.sm.configuration
410+
)
411+
self._reject_future(event_future, tna)
412+
self._reject_pending_futures(tna)
413+
raise tna
414+
# Event allowed but no transition — resolve with None
415+
self._resolve_future(event_future, None)
416+
except Exception as exc:
417+
self._reject_future(event_future, exc)
418+
self._reject_pending_futures(exc)
419+
self.clear()
420+
raise
421+
422+
except Exception as exc:
423+
if caller_future is not None:
424+
# Route the exception to the caller's future if still pending.
425+
# If already resolved (caller's own event succeeded before a
426+
# later event failed), suppress the exception — the caller will
427+
# get their successful result via ``await future`` below, and
428+
# the failing event's exception was already routed to *its*
429+
# caller's future by ``_reject_future(event_future, ...)``.
430+
self._reject_future(caller_future, exc)
431+
else:
432+
raise
356433
finally:
434+
_in_processing_loop.reset(_ctx_token)
357435
self._processing.release()
358-
return first_result if first_result is not self._sentinel else None
436+
437+
result = first_result if first_result is not self._sentinel else None
438+
# If the caller has a future, await it (already resolved by now).
439+
if caller_future is not None:
440+
return await caller_future
441+
return result
359442

360443
async def enabled_events(self, *args, **kwargs):
361444
sm = self.sm

statemachine/engines/base.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,18 @@ def clear(self):
5959
with self.queue.mutex:
6060
self.queue.queue.clear()
6161

62+
def reject_futures(self, exc: Exception):
63+
"""Reject all unresolved futures in the queue.
64+
65+
Called when the processing loop exits abnormally so that coroutines
66+
awaiting their futures don't hang forever.
67+
"""
68+
with self.queue.mutex:
69+
for trigger_data in self.queue.queue:
70+
future = trigger_data.future
71+
if future is not None and not future.done():
72+
future.set_exception(exc)
73+
6274
def remove(self, send_id: str):
6375
# We use the internal `queue` to make thins faster as the mutex
6476
# is protecting the block below

statemachine/engines/sync.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def activate_initial_state(self):
5757
self._processing.release()
5858
return self.processing_loop()
5959

60-
def processing_loop(self): # noqa: C901
60+
def processing_loop(self, caller_future=None): # noqa: C901
6161
"""Process event triggers.
6262
6363
The event is put on a queue, and only the first event will have the result collected.

statemachine/event.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,8 @@ def __call__(self, *args, **kwargs) -> Any:
156156
# can be called as a method. But it is not meant to be called without
157157
# an SM instance. Such SM instance is provided by `__get__` method when
158158
# used as a property descriptor.
159-
self.put(*args, **kwargs)
160-
return self._sm._processing_loop() # type: ignore[union-attr]
159+
trigger_data = self.put(*args, **kwargs)
160+
return self._sm._processing_loop(trigger_data.future) # type: ignore[union-attr]
161161

162162
def split( # type: ignore[override]
163163
self, sep: "str | None" = None, maxsplit: int = -1

statemachine/event_data.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@ class TriggerData:
3636
kwargs: dict = field(default_factory=dict, compare=False)
3737
"""All keyword arguments provided on the :ref:`Event`."""
3838

39+
future: Any = field(default=None, compare=False, repr=False, init=False)
40+
"""An optional :class:`asyncio.Future` for async result routing.
41+
42+
When set, the processing loop will resolve this future with the microstep
43+
result (or exception), allowing the caller to ``await`` it.
44+
"""
45+
3946
def __post_init__(self):
4047
self.model = self.machine.model
4148
delay = self.event.delay if self.event and self.event.delay else 0

statemachine/statemachine.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,8 @@ def activate_initial_state(self) -> Any:
175175
return result
176176
return run_async_from_sync(result)
177177

178-
def _processing_loop(self) -> Any:
179-
result = self._engine.processing_loop()
178+
def _processing_loop(self, caller_future: "Any | None" = None) -> Any:
179+
result = self._engine.processing_loop(caller_future)
180180
if not isawaitable(result):
181181
return result
182182
return run_async_from_sync(result)

0 commit comments

Comments
 (0)