-
-
Notifications
You must be signed in to change notification settings - Fork 103
Expand file tree
/
Copy pathair_conditioner_machine.py
More file actions
69 lines (48 loc) · 1.83 KB
/
air_conditioner_machine.py
File metadata and controls
69 lines (48 loc) · 1.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
"""
Air Conditioner machine
=======================
A StateMachine that exercises reading from a stream of events.
"""
import random
from statemachine.utils import run_async_from_sync
from statemachine import State
from statemachine import StateMachine
def sensor_temperature_reader(seed: int, lower: int = 15, higher: int = 35):
"Infinitely generates random temperature readings."
random.seed(seed)
while True:
yield random.randint(lower, higher)
class AirConditioner(StateMachine):
off = State(initial=True)
cooling = State()
standby = State()
sensor_updated = (
off.to(cooling, cond="is_hot")
| cooling.to(standby, cond="is_good")
| standby.to(cooling, cond="is_hot")
| standby.to(off, cond="is_cool")
| off.to.itself(internal=True)
| cooling.to.itself(internal=True)
| standby.to.itself(internal=True)
)
async def is_hot(self, temperature: int):
return temperature > 25
async def is_good(self, temperature: int):
return temperature < 20
async def is_cool(self, temperature: int):
return temperature < 18
async def after_transition(self, event: str, source: State, target: State, event_data):
print(f"Running {event} from {source!s} to {target!s}: {event_data.trigger_data.kwargs!r}")
# %%
# Testing
async def main():
sensor = sensor_temperature_reader(123456)
print("Will create AirConditioner machine")
sm = AirConditioner()
generator = (("sensor_updated", next(sensor)) for _ in range(20))
for event, temperature in generator:
await sm.send(event, temperature=temperature)
if __name__ == "__main__":
# using `run_async_from_sync` to better integration with an already running loop.
# on real life you should use `asyncio.run(main())`
run_async_from_sync(main())