-
-
Notifications
You must be signed in to change notification settings - Fork 103
Expand file tree
/
Copy pathstatemachine.py
More file actions
484 lines (398 loc) · 17.2 KB
/
statemachine.py
File metadata and controls
484 lines (398 loc) · 17.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
import warnings
from inspect import isawaitable
from typing import TYPE_CHECKING
from typing import Any
from typing import Dict
from typing import List
from typing import MutableSet
from statemachine.orderedset import OrderedSet
from .callbacks import SPECS_ALL
from .callbacks import SPECS_SAFE
from .callbacks import CallbacksRegistry
from .callbacks import SpecReference
from .dispatcher import Listener
from .dispatcher import Listeners
from .engines.async_ import AsyncEngine
from .engines.sync import SyncEngine
from .event import BoundEvent
from .event_data import TriggerData
from .exceptions import InvalidDefinition
from .exceptions import InvalidStateValue
from .exceptions import TransitionNotAllowed
from .factory import StateMachineMetaclass
from .graph import iterate_states_and_transitions
from .i18n import _
from .model import Model
from .utils import run_async_from_sync
if TYPE_CHECKING:
from .event import Event
from .state import State
class StateChart(metaclass=StateMachineMetaclass):
"""
Args:
model: An optional external object to store state. See :ref:`domain models`.
state_field (str): The model's field which stores the current state.
Default: ``state``.
start_value: An optional start state value if there's no current state assigned
on the :ref:`domain models`. Default: ``None``.
listeners: An optional list of objects that provies attributes to be used as callbacks.
See :ref:`listeners` for more details.
"""
TransitionNotAllowed = TransitionNotAllowed
"""Shortcut alias for easy exception handling.
Example::
try:
sm.send("an-inexistent-event")
except sm.TransitionNotAllowed:
pass
"""
_loop_sleep_in_ms = 0.001
allow_event_without_transition: bool = True
"""If ``False`` when an event does not result in a transition, an exception
``TransitionNotAllowed`` will be raised. If ``True`` the state machine allows triggering
events that may not lead to a state :ref:`transition`, including tolerance to unknown
:ref:`event` triggers. Default: ``True``."""
enable_self_transition_entries: bool = True
"""If `False` (default), when a self-transition is selected,
the state entry/exit actions will not be executed. If `True`, the state entry actions
will be executed, which is conformant with the SCXML spec.
"""
atomic_configuration_update: bool = False
"""If `False` (default), the state machine will follow the SCXML
specification, that means in a microstep, it will first exit and execute exit callbacks
for all the states in the exit set in reversed document order, then execute the
transition content (on callbaks), then enter all the states in the enter set in
document order.
If `True`, the state machine will execute the exit callbacks, the on transition
callbacks, then atomically update the configuration of exited and entered states, then
execute the enter callbacks.
"""
error_on_execution: bool = True
"""If ``True`` (default), runtime exceptions in callbacks (guards, actions, entry/exit)
produce an ``error.execution`` internal event instead of propagating, as mandated by the
SCXML specification. If ``False``, exceptions propagate normally."""
start_configuration_values: List[Any] = []
"""Default state values to be entered when the state machine starts.
If empty (default), the root ``initial`` state will be used.
"""
def __init__(
self,
model: Any = None,
state_field: str = "state",
start_value: Any = None,
listeners: "List[object] | None" = None,
):
self.model = model if model is not None else Model()
self.history_values: Dict[
str, List[State]
] = {} # Mapping of compound states to last active state(s).
self.state_field = state_field
self.start_configuration_values = (
[start_value] if start_value is not None else list(self.start_configuration_values)
)
self._callbacks = CallbacksRegistry()
self._states_for_instance: Dict[State, State] = {}
self._listeners: Dict[int, Any] = {}
"""Listeners that provides attributes to be used as callbacks."""
if self._abstract:
raise InvalidDefinition(_("There are no states or transitions."))
self._register_callbacks(listeners or [])
# Activate the initial state, this only works if the outer scope is sync code.
# for async code, the user should manually call `await sm.activate_initial_state()`
# after state machine creation.
self._engine = self._get_engine()
self._engine.start()
def _get_engine(self):
if self._callbacks.has_async_callbacks:
return AsyncEngine(self)
return SyncEngine(self)
def activate_initial_state(self):
result = self._engine.activate_initial_state()
if not isawaitable(result):
return result
return run_async_from_sync(result)
def _processing_loop(self):
result = self._engine.processing_loop()
if not isawaitable(result):
return result
return run_async_from_sync(result)
def __init_subclass__(cls, strict_states: bool = False):
cls._strict_states = strict_states
super().__init_subclass__()
if TYPE_CHECKING:
"""Makes mypy happy with dynamic created attributes"""
def __getattr__(self, attribute: str) -> Any: ...
def __repr__(self):
configuration_ids = [s.id for s in self.configuration]
return (
f"{type(self).__name__}(model={self.model!r}, state_field={self.state_field!r}, "
f"configuration={configuration_ids!r})"
)
def __getstate__(self):
state = self.__dict__.copy()
del state["_callbacks"]
del state["_states_for_instance"]
del state["_engine"]
return state
def __setstate__(self, state):
listeners = state.pop("_listeners")
self.__dict__.update(state)
self._callbacks = CallbacksRegistry()
self._states_for_instance: Dict[State, State] = {}
self._listeners: Dict[Any, Any] = {}
self._register_callbacks([])
self.add_listener(*listeners.values())
self._engine = self._get_engine()
self._engine.start()
def _get_initial_configuration(self):
initial_state_values = (
self.start_configuration_values
if self.start_configuration_values
else [self.initial_state.value]
)
try:
return [self.states_map[value] for value in initial_state_values]
except KeyError as err:
raise InvalidStateValue(initial_state_values) from err
def bind_events_to(self, *targets):
"""Bind the state machine events to the target objects."""
for event in self.events:
trigger = getattr(self, event)
for target in targets:
if hasattr(target, event):
warnings.warn(
f"Attribute '{event}' already exists on {target!r}. Skipping binding.",
UserWarning,
stacklevel=2,
)
continue
setattr(target, event, trigger)
def _add_listener(self, listeners: "Listeners", allowed_references: SpecReference = SPECS_ALL):
registry = self._callbacks
listeners.resolve(self._specs, registry=registry, allowed_references=allowed_references)
for visited in iterate_states_and_transitions(self.states):
listeners.resolve(
visited._specs,
registry=registry,
allowed_references=allowed_references,
)
return self
def _register_callbacks(self, listeners: List[object]):
self._listeners.update({id(listener): listener for listener in listeners})
self._add_listener(
Listeners.from_listeners(
(
Listener.from_obj(self, skip_attrs=self._protected_attrs),
Listener.from_obj(self.model, skip_attrs={self.state_field}),
*(Listener.from_obj(listener) for listener in listeners),
)
)
)
check_callbacks = self._callbacks.check
for visited in iterate_states_and_transitions(self.states):
try:
check_callbacks(visited._specs)
except Exception as err:
raise InvalidDefinition(
f"Error on {visited!s} when resolving callbacks: {err}"
) from err
self._callbacks.async_or_sync()
def add_listener(self, *listeners):
"""Add a listener.
Listener are a way to generically add behavior to a :ref:`StateMachine` without changing
its internal implementation.
.. seealso::
:ref:`listeners`.
"""
self._listeners.update({id(listener): listener for listener in listeners})
return self._add_listener(
Listeners.from_listeners(Listener.from_obj(listener) for listener in listeners),
allowed_references=SPECS_SAFE,
)
def _repr_html_(self):
return f'<div class="statemachine">{self._repr_svg_()}</div>'
def _repr_svg_(self):
return self._graph().create_svg().decode()
def _graph(self):
from .contrib.diagram import DotGraphMachine
return DotGraphMachine(self).get_graph()
@property
def configuration_values(self) -> OrderedSet[Any]:
"""The state configuration values is the set of currently active states's values
(or ids if no custom value is defined)."""
if isinstance(self.current_state_value, OrderedSet):
return self.current_state_value
return OrderedSet([self.current_state_value])
@property
def configuration(self) -> OrderedSet["State"]:
"""The set of currently active states."""
if self.current_state_value is None:
return OrderedSet()
if not isinstance(self.current_state_value, MutableSet):
return OrderedSet(
[
self.states_map[self.current_state_value].for_instance(
machine=self,
cache=self._states_for_instance,
)
]
)
return OrderedSet(
[
self.states_map[value].for_instance(
machine=self,
cache=self._states_for_instance,
)
for value in self.current_state_value
]
)
@configuration.setter
def configuration(self, new_configuration: OrderedSet["State"]):
if len(new_configuration) == 0:
self.current_state_value = None
elif len(new_configuration) == 1:
self.current_state_value = new_configuration.pop().value
else:
self.current_state_value = OrderedSet(s.value for s in new_configuration)
@property
def current_state_value(self):
"""Get/Set the current :ref:`state` value.
This is a low level API, that can be used to assign any valid state value
completely bypassing all the hooks and validations.
"""
return getattr(self.model, self.state_field, None)
@current_state_value.setter
def current_state_value(self, value):
if (
value is not None
and not isinstance(value, MutableSet)
and value not in self.states_map
):
raise InvalidStateValue(value)
setattr(self.model, self.state_field, value)
@property
def current_state(self) -> "State | MutableSet[State]":
"""Get/Set the current :ref:`state`.
This is a low level API, that can be to assign any valid state
completely bypassing all the hooks and validations.
"""
warnings.warn(
"""Property `current_state` is deprecated in favor of `configuration`.""",
DeprecationWarning,
stacklevel=2,
)
current_value = self.current_state_value
try:
if isinstance(current_value, list):
return OrderedSet(
[
self.states_map[value].for_instance(
machine=self,
cache=self._states_for_instance,
)
for value in current_value
]
)
state: State = self.states_map[current_value].for_instance(
machine=self,
cache=self._states_for_instance,
)
return state
except KeyError as err:
if self.current_state_value is None:
raise InvalidStateValue(
self.current_state_value,
_(
"There's no current state set. In async code, "
"did you activate the initial state? "
"(e.g., `await sm.activate_initial_state()`)"
),
) from err
raise InvalidStateValue(self.current_state_value) from err
@current_state.setter
def current_state(self, value): # pragma: no cover
self.current_state_value = value.value
@property
def events(self) -> "List[Event]":
return [getattr(self, event) for event in self.__class__._events]
@property
def allowed_events(self) -> "List[Event]":
"""List of the current allowed events."""
return [
getattr(self, event)
for state in self.configuration
for event in state.transitions.unique_events
]
def enabled_events(self, *args, **kwargs):
"""List of the current enabled events, considering guard conditions.
An event is **enabled** if at least one of its transitions from the current
state has all ``cond``/``unless`` guards satisfied.
Args:
*args: Positional arguments forwarded to condition callbacks.
**kwargs: Keyword arguments forwarded to condition callbacks.
Returns:
A list of enabled :ref:`Event` instances.
"""
result = self._engine.enabled_events(*args, **kwargs)
if not isawaitable(result):
return result
return run_async_from_sync(result)
def _put_nonblocking(self, trigger_data: TriggerData, internal: bool = False):
"""Put the trigger on the queue without blocking the caller."""
self._engine.put(trigger_data, internal=internal)
def send(
self,
event: str,
*args,
delay: float = 0,
send_id: "str | None" = None,
internal: bool = False,
**kwargs,
):
"""Send an :ref:`Event` to the state machine.
:param event: The trigger for the state machine, specified as an event id string.
:param args: Additional positional arguments to pass to the event.
:param delay: A time delay in milliseconds to process the event. Default is 0.
:param send_id: An identifier for the event, used with ``cancel_event()`` to cancel
delayed events.
:param kwargs: Additional keyword arguments to pass to the event.
.. seealso::
See: :ref:`triggering events`.
"""
know_event = getattr(self, event, None)
event_name = know_event.name if know_event else event
delay = (
delay if delay else know_event and know_event.delay or 0
) # first the param, then the event, or 0
event_instance = BoundEvent(
id=event, name=event_name, delay=delay, internal=internal, _sm=self
)
result = event_instance(*args, send_id=send_id, **kwargs)
if not isawaitable(result):
return result
return run_async_from_sync(result)
def raise_(self, event: str, *args, delay: float = 0, send_id: "str | None" = None, **kwargs):
"""Send an :ref:`Event` to the state machine in the internal event queue.
Events on the internal queue are processed immediately within the current
macrostep, before any pending external events. This is equivalent to calling
``send(..., internal=True)``.
.. seealso::
See: :ref:`triggering-events`.
"""
return self.send(event, *args, delay=delay, send_id=send_id, internal=True, **kwargs)
def cancel_event(self, send_id: str):
"""Cancel all the delayed events with the given ``send_id``."""
self._engine.cancel_event(send_id)
@property
def is_terminated(self):
"""Whether the state machine has reached a final state.
Returns ``True`` when a top-level final state has been entered and the
engine is no longer running. This is the recommended way to check for
completion -- it works for flat, compound, and parallel topologies.
"""
return not self._engine.running
class StateMachine(StateChart):
allow_event_without_transition: bool = False
enable_self_transition_entries: bool = False
atomic_configuration_update: bool = True
error_on_execution: bool = False