-
-
Notifications
You must be signed in to change notification settings - Fork 103
Expand file tree
/
Copy pathasync_.py
More file actions
484 lines (409 loc) · 20.1 KB
/
async_.py
File metadata and controls
484 lines (409 loc) · 20.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
import asyncio
import contextvars
import logging
from itertools import chain
from time import time
from typing import TYPE_CHECKING
from typing import Callable
from typing import List
from ..event_data import EventData
from ..event_data import TriggerData
from ..exceptions import InvalidDefinition
from ..exceptions import TransitionNotAllowed
from ..orderedset import OrderedSet
from ..state import State
from .base import BaseEngine
if TYPE_CHECKING:
from ..event import Event
from ..transition import Transition
logger = logging.getLogger(__name__)
# ContextVar to distinguish reentrant calls (from within callbacks) from
# concurrent external calls. asyncio propagates context to child tasks
# (e.g., those created by asyncio.gather in the callback system), so a
# ContextVar set in the processing loop is visible in all callbacks.
# Independent external coroutines have their own context where this is False.
_in_processing_loop: contextvars.ContextVar[bool] = contextvars.ContextVar(
"_in_processing_loop", default=False
)
class AsyncEngine(BaseEngine):
"""Async engine with full StateChart support.
Mirrors :class:`SyncEngine` algorithm but uses ``async``/``await`` for callback dispatch.
All pure-computation helpers are inherited from :class:`BaseEngine`.
"""
def put(self, trigger_data: TriggerData, internal: bool = False, _delayed: bool = False):
"""Override to attach an asyncio.Future for external events.
Futures are only created when:
- The event is external (not internal)
- No future is already attached
- There is a running asyncio loop
- The call is NOT from within the processing loop (reentrant calls
from callbacks must not get futures, as that would deadlock)
"""
if not internal and trigger_data.future is None and not _in_processing_loop.get():
try:
loop = asyncio.get_running_loop()
trigger_data.future = loop.create_future()
except RuntimeError:
pass # No running loop — sync caller
super().put(trigger_data, internal=internal, _delayed=_delayed)
@staticmethod
def _resolve_future(future: "asyncio.Future[object] | None", result):
"""Resolve a future with the given result, if present and not yet done."""
if future is not None and not future.done():
future.set_result(result)
@staticmethod
def _reject_future(future: "asyncio.Future[object] | None", exc: Exception):
"""Reject a future with the given exception, if present and not yet done."""
if future is not None and not future.done():
future.set_exception(exc)
def _reject_pending_futures(self, exc: Exception):
"""Reject all unresolved futures in the external queue."""
self.external_queue.reject_futures(exc)
# --- Callback dispatch overrides (async versions of BaseEngine methods) ---
async def _get_args_kwargs(
self, transition: "Transition", trigger_data: TriggerData, target: "State | None" = None
):
cache_key = (id(transition), id(trigger_data), id(target))
if cache_key in self._cache:
return self._cache[cache_key]
event_data = EventData(trigger_data=trigger_data, transition=transition)
if target:
event_data.state = target
event_data.target = target
args, kwargs = event_data.args, event_data.extended_kwargs
result = await self.sm._callbacks.async_call(self.sm.prepare.key, *args, **kwargs)
for new_kwargs in result:
kwargs.update(new_kwargs)
self._cache[cache_key] = (args, kwargs)
return args, kwargs
async def _conditions_match(self, transition: "Transition", trigger_data: TriggerData):
args, kwargs = await self._get_args_kwargs(transition, trigger_data)
on_error = self._on_error_handler()
await self.sm._callbacks.async_call(
transition.validators.key, *args, on_error=on_error, **kwargs
)
return await self.sm._callbacks.async_all(
transition.cond.key, *args, on_error=on_error, **kwargs
)
async def _select_transitions( # type: ignore[override]
self, trigger_data: TriggerData, predicate: Callable
) -> "OrderedSet[Transition]":
enabled_transitions: "OrderedSet[Transition]" = OrderedSet()
atomic_states = (state for state in self.sm.configuration if state.is_atomic)
async def first_transition_that_matches(
state: State, event: "Event | None"
) -> "Transition | None":
for s in chain([state], state.ancestors()):
transition: "Transition"
for transition in s.transitions:
if (
not transition.initial
and predicate(transition, event)
and await self._conditions_match(transition, trigger_data)
):
return transition
return None
for state in atomic_states:
transition = await first_transition_that_matches(state, trigger_data.event)
if transition is not None:
enabled_transitions.add(transition)
return self._filter_conflicting_transitions(enabled_transitions)
async def select_eventless_transitions(self, trigger_data: TriggerData):
return await self._select_transitions(trigger_data, lambda t, _e: t.is_eventless)
async def select_transitions(self, trigger_data: TriggerData) -> "OrderedSet[Transition]": # type: ignore[override]
return await self._select_transitions(trigger_data, lambda t, e: t.match(e))
async def _execute_transition_content(
self,
enabled_transitions: "List[Transition]",
trigger_data: TriggerData,
get_key: "Callable[[Transition], str]",
set_target_as_state: bool = False,
**kwargs_extra,
):
result = []
for transition in enabled_transitions:
target = transition.target if set_target_as_state else None
args, kwargs = await self._get_args_kwargs(
transition,
trigger_data,
target=target,
)
kwargs.update(kwargs_extra)
result += await self.sm._callbacks.async_call(get_key(transition), *args, **kwargs)
return result
async def _exit_states( # type: ignore[override]
self, enabled_transitions: "List[Transition]", trigger_data: TriggerData
) -> "OrderedSet[State]":
ordered_states, result = self._prepare_exit_states(enabled_transitions)
on_error = self._on_error_handler()
for info in ordered_states:
# Cancel invocations for this state before executing exit handlers.
if info.state is not None: # pragma: no branch
self._invoke_manager.cancel_for_state(info.state)
args, kwargs = await self._get_args_kwargs(info.transition, trigger_data)
if info.state is not None: # pragma: no branch
await self.sm._callbacks.async_call(
info.state.exit.key, *args, on_error=on_error, **kwargs
)
self._remove_state_from_configuration(info.state)
return result
async def _enter_states( # noqa: C901
self,
enabled_transitions: "List[Transition]",
trigger_data: TriggerData,
states_to_exit: "OrderedSet[State]",
previous_configuration: "OrderedSet[State]",
):
on_error = self._on_error_handler()
ordered_states, states_for_default_entry, default_history_content, new_configuration = (
self._prepare_entry_states(enabled_transitions, states_to_exit, previous_configuration)
)
result = await self._execute_transition_content(
enabled_transitions,
trigger_data,
lambda t: t.on.key,
previous_configuration=previous_configuration,
new_configuration=new_configuration,
)
if self.sm.atomic_configuration_update:
self.sm.configuration = new_configuration
for info in ordered_states:
target = info.state
transition = info.transition
args, kwargs = await self._get_args_kwargs(
transition,
trigger_data,
target=target,
)
logger.debug("Entering state: %s", target)
self._add_state_to_configuration(target)
on_entry_result = await self.sm._callbacks.async_call(
target.enter.key, *args, on_error=on_error, **kwargs
)
# Handle default initial states
if target.id in {t.state.id for t in states_for_default_entry if t.state}:
initial_transitions = [t for t in target.transitions if t.initial]
if len(initial_transitions) == 1:
result += await self.sm._callbacks.async_call(
initial_transitions[0].on.key, *args, **kwargs
)
# Handle default history states
default_history_transitions = [
i.transition for i in default_history_content.get(target.id, [])
]
if default_history_transitions:
await self._execute_transition_content(
default_history_transitions,
trigger_data,
lambda t: t.on.key,
previous_configuration=previous_configuration,
new_configuration=new_configuration,
)
# Mark state for invocation if it has invoke callbacks registered
if target.invoke.key in self.sm._callbacks:
self._invoke_manager.mark_for_invoke(target, trigger_data.kwargs)
# Handle final states
if target.final:
self._handle_final_state(target, on_entry_result)
return result
async def microstep(self, transitions: "List[Transition]", trigger_data: TriggerData):
previous_configuration = self.sm.configuration
try:
result = await self._execute_transition_content(
transitions, trigger_data, lambda t: t.before.key
)
states_to_exit = await self._exit_states(transitions, trigger_data)
result += await self._enter_states(
transitions, trigger_data, states_to_exit, previous_configuration
)
except InvalidDefinition:
self.sm.configuration = previous_configuration
raise
except Exception as e:
self.sm.configuration = previous_configuration
self._handle_error(e, trigger_data)
return None
try:
await self._execute_transition_content(
transitions,
trigger_data,
lambda t: t.after.key,
set_target_as_state=True,
)
except InvalidDefinition:
raise
except Exception as e:
self._handle_error(e, trigger_data)
if len(result) == 0:
result = None
elif len(result) == 1:
result = result[0]
return result
# --- Engine loop ---
async def _run_microstep(self, enabled_transitions, trigger_data): # pragma: no cover
"""Run a microstep for internal/eventless transitions with error handling.
Note: microstep() handles its own errors internally, so this try/except
is a safety net that is not expected to be reached in normal operation.
"""
try:
await self.microstep(list(enabled_transitions), trigger_data)
except InvalidDefinition:
raise
except Exception as e:
self._handle_error(e, trigger_data)
async def activate_initial_state(self, **kwargs):
"""Activate the initial state.
In async code, the user must call this method explicitly (or it will be lazily
activated on the first event). There's no built-in way to call async code from
``StateMachine.__init__``.
Any ``**kwargs`` are forwarded to initial state entry callbacks via dependency
injection, just like event kwargs on ``send()``.
"""
return await self.processing_loop()
async def processing_loop( # noqa: C901
self, caller_future: "asyncio.Future[object] | None" = None
):
"""Process event triggers with the 3-phase macrostep architecture.
Phase 1: Eventless transitions + internal queue until quiescence.
Phase 2: Remaining internal events (safety net for invoke-generated events).
Phase 3: External events.
When ``caller_future`` is provided, the caller can ``await`` it to
receive its own event's result — even if another coroutine holds the
processing lock.
"""
if not self._processing.acquire(blocking=False):
# Another coroutine holds the lock and will process our event.
# Await the caller's future so we get our own result back.
if caller_future is not None:
return await caller_future
return None
_ctx_token = _in_processing_loop.set(True)
logger.debug("Processing loop started: %s", self.sm.current_state_value)
first_result = self._sentinel
try:
took_events = True
while took_events:
self.clear_cache()
took_events = False
macrostep_done = False
# Phase 1: eventless transitions and internal events
while not macrostep_done:
logger.debug("Macrostep: eventless/internal queue")
self.clear_cache()
internal_event = TriggerData(self.sm, event=None) # null object for eventless
enabled_transitions = await self.select_eventless_transitions(internal_event)
if not enabled_transitions:
if self.internal_queue.is_empty():
macrostep_done = True
else:
internal_event = self.internal_queue.pop()
enabled_transitions = await self.select_transitions(internal_event)
if enabled_transitions:
logger.debug("Enabled transitions: %s", enabled_transitions)
took_events = True
await self._run_microstep(enabled_transitions, internal_event)
# Spawn invoke handlers for states entered during this macrostep.
await self._invoke_manager.spawn_pending_async()
# Phase 2: remaining internal events
while not self.internal_queue.is_empty(): # pragma: no cover
internal_event = self.internal_queue.pop()
enabled_transitions = await self.select_transitions(internal_event)
if enabled_transitions:
await self._run_microstep(enabled_transitions, internal_event)
# Phase 3: external events
logger.debug("Macrostep: external queue")
while not self.external_queue.is_empty():
self.clear_cache()
took_events = True
external_event = self.external_queue.pop()
current_time = time()
if external_event.execution_time > current_time:
self.put(external_event, _delayed=True)
await asyncio.sleep(self.sm._loop_sleep_in_ms)
# Break to Phase 1 so internal events and eventless
# transitions can be processed while we wait.
break
logger.debug("External event: %s", external_event.event)
# Handle lazy initial state activation.
# Break out of phase 3 so the outer loop restarts from phase 1
# (eventless/internal), ensuring internal events queued during
# initial entry are processed before any external events.
if external_event.event == "__initial__":
transitions = self._initial_transitions(external_event)
await self._enter_states(
transitions, external_event, OrderedSet(), OrderedSet()
)
break
event_future = external_event.future
try:
enabled_transitions = await self.select_transitions(external_event)
logger.debug("Enabled transitions: %s", enabled_transitions)
if enabled_transitions:
result = await self.microstep(
list(enabled_transitions), external_event
)
self._resolve_future(event_future, result)
if first_result is self._sentinel:
first_result = result
else:
if not self.sm.allow_event_without_transition:
tna = TransitionNotAllowed(
external_event.event, self.sm.configuration
)
self._reject_future(event_future, tna)
self._reject_pending_futures(tna)
raise tna
# Event allowed but no transition — resolve with None
self._resolve_future(event_future, None)
except Exception as exc:
self._reject_future(event_future, exc)
self._reject_pending_futures(exc)
self.clear()
raise
except Exception as exc:
if caller_future is not None:
# Route the exception to the caller's future if still pending.
# If already resolved (caller's own event succeeded before a
# later event failed), suppress the exception — the caller will
# get their successful result via ``await future`` below, and
# the failing event's exception was already routed to *its*
# caller's future by ``_reject_future(event_future, ...)``.
self._reject_future(caller_future, exc)
else:
raise
finally:
_in_processing_loop.reset(_ctx_token)
self._processing.release()
result = first_result if first_result is not self._sentinel else None
# If the caller has a future, await it (already resolved by now).
if caller_future is not None:
return await caller_future
return result
async def enabled_events(self, *args, **kwargs):
sm = self.sm
enabled = {}
for state in sm.configuration:
for transition in state.transitions:
for event in transition.events:
if event in enabled:
continue
extended_kwargs = kwargs.copy()
extended_kwargs.update(
{
"machine": sm,
"model": sm.model,
"event": getattr(sm, event),
"source": transition.source,
"target": transition.target,
"state": state,
"transition": transition,
}
)
try:
if await sm._callbacks.async_all(
transition.cond.key, *args, **extended_kwargs
):
enabled[event] = getattr(sm, event)
except Exception:
enabled[event] = getattr(sm, event)
return list(enabled.values())