forked from fgmacedo/python-statemachine
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
40 lines (33 loc) · 1.05 KB
/
utils.py
File metadata and controls
40 lines (33 loc) · 1.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import asyncio
import threading
from typing import Any
_cached_loop = threading.local()
"""Loop that will be used when the SM is running in a synchronous context. One loop per thread."""
def qualname(cls):
"""
Returns a fully qualified name of the class, to avoid name collisions.
"""
return ".".join([cls.__module__, cls.__name__])
def ensure_iterable(obj):
"""
Returns an iterator if obj is not an instance of string or if it
encounters type error, otherwise it returns a list.
"""
if isinstance(obj, str):
return [obj]
try:
return iter(obj)
except TypeError:
return [obj]
def run_async_from_sync(coroutine: Any) -> Any:
"""
Compatibility layer to run an async coroutine from a synchronous context.
"""
global _cached_loop
try:
asyncio.get_running_loop()
return coroutine
except RuntimeError:
if not hasattr(_cached_loop, "loop"):
_cached_loop.loop = asyncio.new_event_loop()
return _cached_loop.loop.run_until_complete(coroutine)