-
-
Notifications
You must be signed in to change notification settings - Fork 103
Expand file tree
/
Copy pathfactory.py
More file actions
368 lines (316 loc) · 13.6 KB
/
factory.py
File metadata and controls
368 lines (316 loc) · 13.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from . import registry
from .callbacks import CallbackGroup
from .callbacks import CallbackPriority
from .callbacks import CallbackSpecList
from .event import Event
from .exceptions import InvalidDefinition
from .graph import disconnected_states
from .graph import iterate_states
from .graph import iterate_states_and_transitions
from .graph import states_without_path_to_final_states
from .i18n import _
from .state import State
from .states import States
from .transition import Transition
from .transition_list import TransitionList
class StateMachineMetaclass(type):
"Metaclass for constructing StateMachine classes"
validate_disconnected_states: bool = True
"""If `True`, the state machine will validate that there are no unreachable states."""
validate_trap_states: bool = True
"""If ``True``, non-final states without outgoing transitions raise ``InvalidDefinition``."""
validate_final_reachability: bool = True
"""If ``True`` and final states exist, non-final states without a path to any final
state raise ``InvalidDefinition``."""
def __init__(
cls,
name: str,
bases: Tuple[type],
attrs: Dict[str, Any],
) -> None:
super().__init__(name, bases, attrs)
registry.register(cls)
cls.name = cls.__name__
cls.id = cls.name.lower()
# TODO: Experiment with the IDEA of a root state
# cls.root = State(id=cls.id, name=cls.name)
cls.states: States = States()
cls.states_map: Dict[Any, State] = {}
"""Map of ``state.value`` to the corresponding :ref:`state`."""
cls._abstract = True
cls._events: Dict[Event, None] = {} # used Dict to preserve order and avoid duplicates
cls._protected_attrs: set = set()
cls._events_to_update: Dict[Event, Optional[Event]] = {}
cls._specs = CallbackSpecList()
cls.prepare = cls._specs.grouper(CallbackGroup.PREPARE).add(
"prepare_event", priority=CallbackPriority.GENERIC, is_convention=True
)
cls.add_inherited(bases)
cls.add_from_attributes(attrs)
cls._collect_class_listeners(attrs, bases)
cls._unpack_builders_callbacks()
cls._update_event_references()
if not cls.states:
return
cls._initials_by_document_order(list(cls.states), parent=None)
initials = [s for s in cls.states if s.initial]
parallels = [s.id for s in cls.states if s.parallel]
root_only_has_parallels = len(cls.states) == len(parallels)
if len(initials) != 1 and not root_only_has_parallels:
raise InvalidDefinition(
_(
"There should be one and only one initial state. "
"Your currently have these: {0}"
).format(", ".join(s.id for s in initials))
)
if initials:
cls.initial_state = initials[0]
else: # pragma: no cover
cls.initial_state = None
cls.final_states: List[State] = [state for state in cls.states if state.final]
cls._check()
cls._setup()
def _initials_by_document_order( # noqa: C901
cls, states: List[State], parent: "State | None" = None, order: int = 1
):
"""Set initial state by document order if no explicit initial state is set"""
initials: List[State] = []
for s in states:
s.document_order = order
order += 1
if s.states:
cls._initials_by_document_order(s.states, s, order)
if s.initial:
initials.append(s)
if not initials and states:
initial = states[0]
initial._initial = True
initials.append(initial)
if not parent:
return
# If parent already has a multi-target initial transition (e.g., from SCXML initial
# attribute targeting multiple parallel regions), don't create default initial transitions.
if any(t for t in parent.transitions if t.initial and len(t.targets) > 1):
return
for initial in initials:
if not any(t for t in parent.transitions if t.initial and t.target == initial):
parent.to(initial, initial=True)
if not parent.parallel:
return
for state in states:
state._initial = True
if not any(t for t in parent.transitions if t.initial and t.target == state):
parent.to(state, initial=True) # pragma: no cover
def _unpack_builders_callbacks(cls):
callbacks = {}
for state in iterate_states(cls.states):
if state._callbacks:
callbacks.update(state._callbacks)
del state._callbacks
for key, value in callbacks.items():
setattr(cls, key, value)
def _check(cls):
has_states = bool(cls.states)
cls._abstract = not has_states
# do not validate the base abstract classes
if cls._abstract: # pragma: no cover
return
cls._check_initial_state()
cls._check_final_states()
cls._check_disconnected_state()
cls._check_trap_states()
cls._check_reachable_final_states()
def _check_initial_state(cls):
initials = [s for s in cls.states if s.initial]
if len(initials) != 1: # pragma: no cover
raise InvalidDefinition(
_(
"There should be one and only one initial state. "
"You currently have these: {!r}"
).format([s.id for s in initials])
)
# TODO: Check if this is still needed
# if not initials[0].transitions.transitions:
# raise InvalidDefinition(_("There are no transitions."))
def _check_final_states(cls):
final_state_with_invalid_transitions = [
state for state in cls.final_states if state.transitions
]
if final_state_with_invalid_transitions:
raise InvalidDefinition(
_("Cannot declare transitions from final state. Invalid state(s): {}").format(
[s.id for s in final_state_with_invalid_transitions]
)
)
def _check_trap_states(cls):
if not cls.validate_trap_states:
return
trap_states = [s for s in cls.states if not s.final and not s.transitions]
if trap_states:
raise InvalidDefinition(
_(
"All non-final states should have at least one outgoing transition. "
"These states have no outgoing transition: {!r}"
).format([s.id for s in trap_states])
)
def _check_reachable_final_states(cls):
if not cls.validate_final_reachability:
return
if not any(s.final for s in cls.states):
return # No need to check final reachability
disconnected_states = list(states_without_path_to_final_states(cls.states))
if disconnected_states:
raise InvalidDefinition(
_(
"All non-final states should have at least one path to a final state. "
"These states have no path to a final state: {!r}"
).format([s.id for s in disconnected_states])
)
def _check_disconnected_state(cls):
if not cls.validate_disconnected_states:
return
assert cls.initial_state
states = disconnected_states(cls.initial_state, set(cls.states_map.values()))
if states:
raise InvalidDefinition(
_(
"There are unreachable states. "
"The statemachine graph should have a single component. "
"Disconnected states: {}"
).format([s.id for s in states])
)
def _setup(cls):
for visited in iterate_states_and_transitions(cls.states):
visited._setup()
cls._protected_attrs = {
"_abstract",
"model",
"state_field",
"start_value",
"initial_state",
"final_states",
"states",
"_events",
"states_map",
"send",
} | {s.id for s in cls.states}
def _collect_class_listeners(cls, attrs: Dict[str, Any], bases: Tuple[type]):
"""Collect class-level listener declarations from attrs and MRO.
Listeners declared on parent classes are prepended (MRO order),
unless the child sets ``listeners_inherit = False``.
"""
class_listeners: List[Any] = []
if attrs.get("listeners_inherit", True):
for base in reversed(bases):
class_listeners.extend(getattr(base, "_class_listeners", []))
for entry in attrs.get("listeners", []):
if entry is None or isinstance(entry, (str, int, float, bool)):
raise InvalidDefinition(
_(
"Invalid entry in 'listeners': {!r}. "
"Expected a class, callable, or listener instance."
).format(entry)
)
class_listeners.append(entry)
cls._class_listeners: List[Any] = class_listeners
def add_inherited(cls, bases):
for base in bases:
for state in getattr(base, "states", []):
cls.add_state(state.id, state)
events = getattr(base, "_events", {})
for event in events:
cls.add_event(event=Event(id=event.id, name=event.name))
def add_from_attributes(cls, attrs): # noqa: C901
for key, value in attrs.items():
if isinstance(value, States):
cls._add_states_from_dict(value)
if isinstance(value, State):
cls.add_state(key, value)
elif isinstance(value, (Transition, TransitionList)):
event_id = key
if key.startswith("error_"):
event_id = f"{key} {key.replace('_', '.')}"
elif key.startswith("done_state_"):
suffix = key[len("done_state_") :]
event_id = f"{key} done.state.{suffix}"
cls.add_event(event=Event(transitions=value, id=event_id, name=key))
elif isinstance(value, (Event,)):
if value._has_real_id:
event_id = value.id
elif key.startswith("error_"):
event_id = f"{key} {key.replace('_', '.')}"
elif key.startswith("done_state_"):
suffix = key[len("done_state_") :]
event_id = f"{key} done.state.{suffix}"
else:
event_id = key
new_event = Event(
transitions=value._transitions,
id=event_id,
name=value.name,
)
cls.add_event(event=new_event, old_event=value)
# Ensure the event is accessible by the Python attribute name
if event_id != key:
setattr(cls, key, new_event)
elif getattr(value, "attr_name", None):
cls._add_unbounded_callback(key, value)
def _add_states_from_dict(cls, states):
for state_id, state in states.items():
cls.add_state(state_id, state)
def _add_unbounded_callback(cls, attr_name, func):
# if func is an event, the `attr_name` will be replaced by an event trigger,
# so we'll also give the ``func`` a new unique name to be used by the callback
# machinery that is stored at ``func.attr_name``
setattr(cls, func.attr_name, func)
if func.is_event:
cls.add_event(event=Event(func._transitions, id=attr_name, name=attr_name))
def add_state(cls, id, state: State):
state._set_id(id)
cls.states_map[state.value] = state
if not state.parent:
cls.states.append(state)
if not hasattr(cls, id):
setattr(cls, id, state)
# also register all events associated directly with transitions
for event in state.transitions.unique_events:
cls.add_event(event)
for substate in state.states:
cls.add_state(substate.id, substate)
def add_event(
cls,
event: Event,
old_event: "Event | None" = None,
):
if not event._has_real_id:
if event not in cls._events_to_update:
cls._events_to_update[event] = None
return
transitions = event._transitions
if transitions is not None:
transitions._on_event_defined(event=event, states=list(cls.states))
if event not in cls._events:
cls._events[event] = None
setattr(cls, event.id, event)
if old_event is not None:
cls._events_to_update[old_event] = event
return cls._events[event]
def _update_event_references(cls):
for old_event, new_event in cls._events_to_update.items():
for state in cls.states:
for transition in state.transitions:
if transition._events.match(old_event):
if new_event is None:
raise InvalidDefinition(
_("An event in the '{}' has no id.").format(transition)
)
transition.events._replace(old_event, new_event)
cls._events_to_update = {}
@property
def events(self):
return list(self._events)