22from collections import deque
33from copy import deepcopy
44from functools import partial
5+ from threading import Lock
56from typing import TYPE_CHECKING
67from typing import Any
78from typing import Dict
@@ -81,7 +82,7 @@ def __init__(
8182 self .allow_event_without_transition = allow_event_without_transition
8283 self .__initialized = False
8384 self .__rtc = rtc
84- self .__processing : bool = False
85+ self .__processing = Lock ()
8586 self ._external_queue : deque = deque ()
8687 self ._callbacks_registry = CallbacksRegistry ()
8788 self ._states_for_instance : Dict [State , State ] = {}
@@ -117,12 +118,17 @@ def __repr__(self):
117118
118119 def __deepcopy__ (self , memo ):
119120 deepcopy_method = self .__deepcopy__
120- self .__deepcopy__ = None
121- try :
122- cp = deepcopy (self , memo )
123- finally :
124- self .__deepcopy__ = deepcopy_method
125- cp .__deepcopy__ = deepcopy_method
121+ lock = self .__processing
122+ with lock :
123+ self .__deepcopy__ = None
124+ self .__processing = None
125+ try :
126+ cp = deepcopy (self , memo )
127+ cp .__processing = Lock ()
128+ finally :
129+ self .__deepcopy__ = deepcopy_method
130+ cp .__deepcopy__ = deepcopy_method
131+ self .__processing = lock
126132 cp ._callbacks_registry .clear ()
127133 cp ._register_callbacks ([])
128134 cp .add_listener (* cp ._listeners .keys ())
@@ -312,7 +318,11 @@ async def _trigger(self, trigger_data: TriggerData):
312318
313319 return event_data .result if event_data else None
314320
315- async def _process (self , trigger_data : TriggerData ):
321+ def _put_nonblocking (self , trigger_data : TriggerData ):
322+ """Put the trigger on the queue without blocking the caller."""
323+ self ._external_queue .append (trigger_data )
324+
325+ async def _processing_loop (self ):
316326 """Process event triggers.
317327
318328 The simplest implementation is the non-RTC (synchronous),
@@ -331,31 +341,24 @@ async def _process(self, trigger_data: TriggerData):
331341 will be processed sequentially (and not nested).
332342
333343 """
344+
334345 if not self .__rtc :
335346 # The machine is in "synchronous" mode
347+ trigger_data = self ._external_queue .popleft ()
336348 return await self ._trigger (trigger_data )
337349
338- # The machine is in "queued" mode
339- # Add the trigger to queue and start processing in a loop.
340- self ._external_queue .append (trigger_data )
341-
342350 # We make sure that only the first event enters the processing critical section,
343351 # next events will only be put on the queue and processed by the same loop.
344- if self .__processing :
345- return
346-
347- return await self ._processing_loop ()
348-
349- async def _processing_loop (self ):
350- """Execute the triggers in the queue in order until the queue is empty"""
351- self .__processing = True
352+ if not self .__processing .acquire (blocking = False ):
353+ return None
352354
353355 # We will collect the first result as the processing result to keep backwards compatibility
354356 # so we need to use a sentinel object instead of `None` because the first result may
355357 # be also `None`, and on this case the `first_result` may be overridden by another result.
356358 sentinel = object ()
357359 first_result = sentinel
358360 try :
361+ # Execute the triggers in the queue in FIFO order until the queue is empty
359362 while self ._external_queue :
360363 trigger_data = self ._external_queue .popleft ()
361364 try :
@@ -368,7 +371,7 @@ async def _processing_loop(self):
368371 self ._external_queue .clear ()
369372 raise
370373 finally :
371- self .__processing = False
374+ self .__processing . release ()
372375 return first_result if first_result is not sentinel else None
373376
374377 async def _activate (self , event_data : EventData ):
@@ -412,7 +415,7 @@ def send(self, event: str, *args, **kwargs):
412415 coro = self .async_send (event , * args , ** kwargs )
413416 return run_async_from_sync (coro )
414417
415- async def async_send (self , event : str , * args , ** kwargs ):
418+ def async_send (self , event : str , * args , ** kwargs ):
416419 """Send an :ref:`Event` to the state machine.
417420
418421 .. seealso::
@@ -421,7 +424,7 @@ async def async_send(self, event: str, *args, **kwargs):
421424
422425 """
423426 event_instance : Event = Event (event )
424- return await event_instance .trigger (self , * args , ** kwargs )
427+ return event_instance .trigger (self , * args , ** kwargs )
425428
426429 def _get_callbacks (self , key ) -> CallbacksExecutor :
427430 return self ._callbacks_registry [key ]
0 commit comments