-
-
Notifications
You must be signed in to change notification settings - Fork 103
Expand file tree
/
Copy pathsync.py
More file actions
200 lines (171 loc) · 8.85 KB
/
sync.py
File metadata and controls
200 lines (171 loc) · 8.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import logging
from time import sleep
from time import time
from typing import TYPE_CHECKING
from statemachine.event import BoundEvent
from statemachine.orderedset import OrderedSet
from ..event_data import TriggerData
from ..exceptions import InvalidDefinition
from ..exceptions import TransitionNotAllowed
from .base import BaseEngine
if TYPE_CHECKING:
from ..transition import Transition
logger = logging.getLogger(__name__)
class SyncEngine(BaseEngine):
def _run_microstep(self, enabled_transitions, trigger_data):
"""Run a microstep for internal/eventless transitions with error handling.
Note: microstep() handles its own errors internally, so this try/except
is a safety net that is not expected to be reached in normal operation.
"""
try:
self.microstep(list(enabled_transitions), trigger_data)
except InvalidDefinition:
raise
except Exception as e: # pragma: no cover
self._handle_error(e, trigger_data)
def start(self):
if self.sm.current_state_value is not None:
return
self.activate_initial_state()
def activate_initial_state(self):
"""
Activate the initial state.
Called automatically on state machine creation from sync code, but in
async code, the user must call this method explicitly.
Given how async works on python, there's no built-in way to activate the initial state that
may depend on async code from the StateMachine.__init__ method.
"""
if self.sm.current_state_value is None:
trigger_data = BoundEvent("__initial__", _sm=self.sm).build_trigger(machine=self.sm)
transitions = self._initial_transitions(trigger_data)
self._processing.acquire(blocking=False)
try:
self._enter_states(transitions, trigger_data, OrderedSet(), OrderedSet())
finally:
self._processing.release()
return self.processing_loop()
def processing_loop(self, caller_future=None): # noqa: C901
"""Process event triggers.
The event is put on a queue, and only the first event will have the result collected.
.. note::
While processing the queue items, if others events are generated, they
will be processed sequentially (and not nested).
"""
# We make sure that only the first event enters the processing critical section,
# next events will only be put on the queue and processed by the same loop.
if not self._processing.acquire(blocking=False):
return None
# We will collect the first result as the processing result to keep backwards compatibility
# so we need to use a sentinel object instead of `None` because the first result may
# be also `None`, and on this case the `first_result` may be overridden by another result.
logger.debug("Processing loop started: %s", self.sm.current_state_value)
first_result = self._sentinel
try:
took_events = True
while took_events:
self.clear_cache()
took_events = False
# Execute the triggers in the queue in FIFO order until the queue is empty
# while self._running and not self.external_queue.is_empty():
macrostep_done = False
enabled_transitions: "OrderedSet[Transition] | None" = None
# handles eventless transitions and internal events
while not macrostep_done:
logger.debug("Macrostep: eventless/internal queue")
self.clear_cache()
internal_event = TriggerData(
self.sm, event=None
) # this one is a "null object"
enabled_transitions = self.select_eventless_transitions(internal_event)
if not enabled_transitions:
if self.internal_queue.is_empty():
macrostep_done = True
else:
internal_event = self.internal_queue.pop()
enabled_transitions = self.select_transitions(internal_event)
if enabled_transitions:
logger.debug("Enabled transitions: %s", enabled_transitions)
took_events = True
self._run_microstep(enabled_transitions, internal_event)
# TODO: Invoke platform-specific logic
# for state in sorted(self.states_to_invoke, key=self.entry_order):
# for inv in sorted(state.invoke, key=self.document_order):
# self.invoke(inv)
# self.states_to_invoke.clear()
# Process remaining internal events before external events.
# Note: the macrostep loop above already drains the internal queue,
# so this is a safety net per SCXML spec for invoke-generated events.
while not self.internal_queue.is_empty(): # pragma: no cover
internal_event = self.internal_queue.pop()
enabled_transitions = self.select_transitions(internal_event)
if enabled_transitions:
self._run_microstep(enabled_transitions, internal_event)
# Process external events
logger.debug("Macrostep: external queue")
while not self.external_queue.is_empty():
self.clear_cache()
took_events = True
external_event = self.external_queue.pop()
current_time = time()
if external_event.execution_time > current_time:
self.put(external_event, _delayed=True)
sleep(self.sm._loop_sleep_in_ms)
# Break to Phase 1 so internal events and eventless
# transitions can be processed while we wait.
break
logger.debug("External event: %s", external_event.event)
# # TODO: Handle cancel event
# if self.is_cancel_event(external_event):
# self.running = False
# return
# TODO: Invoke states
# for state in self.configuration:
# for inv in state.invoke:
# if inv.invokeid == external_event.invokeid:
# self.apply_finalize(inv, external_event)
# if inv.autoforward:
# self.send(inv.id, external_event)
enabled_transitions = self.select_transitions(external_event)
logger.debug("Enabled transitions: %s", enabled_transitions)
if enabled_transitions:
try:
result = self.microstep(list(enabled_transitions), external_event)
if first_result is self._sentinel:
first_result = result
except Exception:
# We clear the queue as we don't have an expected behavior
# and cannot keep processing
self.clear()
raise
else:
if not self.sm.allow_event_without_transition:
raise TransitionNotAllowed(external_event.event, self.sm.configuration)
finally:
self._processing.release()
return first_result if first_result is not self._sentinel else None
def enabled_events(self, *args, **kwargs):
sm = self.sm
enabled = {}
for state in sm.configuration:
for transition in state.transitions:
for event in transition.events:
if event in enabled:
continue
extended_kwargs = kwargs.copy()
extended_kwargs.update(
{
"machine": sm,
"model": sm.model,
"event": getattr(sm, event),
"source": transition.source,
"target": transition.target,
"state": state,
"transition": transition,
}
)
try:
if sm._callbacks.all(transition.cond.key, *args, **extended_kwargs):
enabled[event] = getattr(sm, event)
except Exception:
enabled[event] = getattr(sm, event)
return list(enabled.values())