1+ <?php
2+
3+ namespace Voryx \React \EventLoop ;
4+
5+ use Interop \Async \Loop ;
6+ use React \EventLoop \LoopInterface ;
7+ use React \EventLoop \Timer \TimerInterface ;
8+
9+ class ReactAsyncInteropLoop implements LoopInterface
10+ {
11+ private $ readStreams = [];
12+ private $ writeStreams = [];
13+
14+ public function addReadStream ($ stream , callable $ listener )
15+ {
16+ $ key = (int )$ stream ;
17+ if (isset ($ this ->readStreams [$ key ])) {
18+ throw new \Exception ('key set twice ' );
19+ }
20+ $ this ->readStreams [$ key ] = Loop::get ()->onReadable ($ stream , function () use ($ listener , $ stream ) {
21+ $ listener ($ stream );
22+ });
23+ }
24+
25+ public function addWriteStream ($ stream , callable $ listener )
26+ {
27+ $ key = (int )$ stream ;
28+
29+ if (isset ($ this ->writeStreams [$ key ])) {
30+ throw new \Exception ('key set twice ' );
31+ }
32+
33+ $ this ->writeStreams [$ key ] = Loop::get ()->onWritable ($ stream , function () use ($ listener , $ stream ) {
34+ $ listener ($ stream );
35+ });
36+ }
37+
38+ public function removeReadStream ($ stream )
39+ {
40+ $ key = (int )$ stream ;
41+ if (isset ($ this ->readStreams [$ key ])) {
42+ Loop::get ()->cancel ($ this ->readStreams [$ key ]);
43+ unset($ this ->readStreams [$ key ]);
44+ }
45+ }
46+
47+ public function removeWriteStream ($ stream )
48+ {
49+ $ key = (int )$ stream ;
50+ if (isset ($ this ->writeStreams [$ key ])) {
51+ Loop::get ()->cancel ($ this ->writeStreams [$ key ]);
52+ unset($ this ->writeStreams [$ key ]);
53+ }
54+ }
55+
56+ public function removeStream ($ stream )
57+ {
58+ $ this ->removeReadStream ($ stream );
59+ $ this ->removeWriteStream ($ stream );
60+ }
61+
62+ private function addWrappedTimer ($ interval , callable $ callback , $ isPeriodic = false )
63+ {
64+ $ wrappedCallback = function () use (&$ timer , $ callback ) {
65+ $ callback ($ timer );
66+ };
67+ $ millis = $ interval * 1000 ;
68+ if ($ isPeriodic ) {
69+ $ timerKey = Loop::get ()->repeat ($ millis , $ wrappedCallback );
70+ } else {
71+ $ timerKey = Loop::get ()->delay ($ millis , $ wrappedCallback );
72+ }
73+ $ timer = new ReactAsyncInteropTimer (
74+ $ timerKey ,
75+ $ interval ,
76+ $ callback ,
77+ $ this ,
78+ false
79+ );
80+ return $ timer ;
81+ }
82+
83+ public function addTimer ($ interval , callable $ callback )
84+ {
85+ return $ this ->addWrappedTimer ($ interval , $ callback );
86+ }
87+
88+ public function addPeriodicTimer ($ interval , callable $ callback )
89+ {
90+ return $ this ->addWrappedTimer ($ interval , $ callback , true );
91+ }
92+
93+ public function cancelTimer (TimerInterface $ timer )
94+ {
95+ $ timer ->cancel ();
96+ }
97+
98+ public function isTimerActive (TimerInterface $ timer )
99+ {
100+ return $ timer ->isActive ();
101+ }
102+
103+ public function nextTick (callable $ listener )
104+ {
105+ Loop::get ()->defer (function () use ($ listener ) {
106+ $ listener ($ this );
107+ });
108+ }
109+
110+ public function futureTick (callable $ listener )
111+ {
112+ $ this ->nextTick ($ listener );
113+ }
114+
115+ public function tick ()
116+ {
117+ $ loop = Loop::get ();
118+
119+ $ loop ->defer (function () use ($ loop ) {
120+ $ loop ->stop ();
121+ });
122+
123+ $ loop ->run ();
124+ }
125+
126+ public function run ()
127+ {
128+ Loop::get ()->run ();
129+ }
130+
131+ public function stop ()
132+ {
133+ Loop::get ()->stop ();
134+ }
135+ }
0 commit comments