11package io .netty .loom ;
22
3- import java .util .concurrent .CountDownLatch ;
4- import java .util .concurrent .Executor ;
5- import java .util .concurrent .RejectedExecutionException ;
3+ import java .util .Queue ;
64import java .util .concurrent .ThreadFactory ;
75import java .util .concurrent .TimeUnit ;
86import java .util .concurrent .atomic .AtomicBoolean ;
7+ import java .util .concurrent .atomic .AtomicReference ;
98import java .util .concurrent .locks .LockSupport ;
109
1110import io .netty .channel .IoEventLoopGroup ;
1413import io .netty .util .concurrent .FastThreadLocalThread ;
1514import io .netty .util .internal .shaded .org .jctools .queues .MpscUnboundedArrayQueue ;
1615
17- public class VirtualThreadNettyScheduler implements Thread . VirtualThreadScheduler {
16+ public class VirtualThreadNettyScheduler {
1817
1918 private static final long MAX_WAIT_TASKS_NS = TimeUnit .HOURS .toNanos (1 );
2019 // These are the soft-guaranteed yield times for the event loop whilst Thread.yield() is called.
2120 // Based on the status of the event loop (resuming from blocking or non-blocking, controlled by the running flag)
2221 // a different limit is applied.
2322 private static final long RUNNING_YIELD_US = TimeUnit .MICROSECONDS .toNanos (Integer .getInteger ("io.netty.loom.running.yield.us" , 1 ));
2423 private static final long IDLE_YIELD_US = TimeUnit .MICROSECONDS .toNanos (Integer .getInteger ("io.netty.loom.idle.yield.us" , 1 ));
25- private static final ScopedValue <Thread .VirtualThreadScheduler > CURRENT_SCHEDULER = ScopedValue .newInstance ();
26- private static final Thread .VirtualThreadScheduler EMPTY = new Thread .VirtualThreadScheduler () {
27- @ Override
28- public void execute (Thread vthread , Runnable command ) {
29- throw new RejectedExecutionException ("VirtualThreadScheduler is empty" );
30- }
31- };
32-
33- private final MpscUnboundedArrayQueue <Runnable > externalContinuations ;
24+ // This is required to allow sub-pollers to run on the correct scheduler
25+ private static final ScopedValue <AtomicReference <VirtualThreadNettyScheduler >> CURRENT_SCHEDULER = ScopedValue .newInstance ();
26+ private volatile MpscUnboundedArrayQueue <Runnable > submissionQueue ;
27+ private final MpscUnboundedArrayQueue <Runnable > runQueue ;
3428 private final ManualIoEventLoop ioEventLoop ;
3529 private final Thread eventLoopThread ;
3630 private final Thread carrierThread ;
3731 private volatile Thread parkedCarrierThread ;
38- private volatile Runnable eventLoopContinuation ;
39- private volatile boolean submittedEventLoopContinuation ;
40- private final CountDownLatch eventLoopContinuationAvailable ;
32+ private volatile Runnable eventLoopContinuatioToRun ;
4133 private final ThreadFactory vThreadFactory ;
4234 private final AtomicBoolean running ;
35+ private final AtomicReference <VirtualThreadNettyScheduler > schedulerReference ;
4336
4437 public VirtualThreadNettyScheduler (IoEventLoopGroup parent , ThreadFactory threadFactory , IoHandlerFactory ioHandlerFactory , int resumedContinuationsExpectedCount ) {
45- this .running = new AtomicBoolean (false );
46- this .externalContinuations = new MpscUnboundedArrayQueue <>(resumedContinuationsExpectedCount );
47- this .carrierThread = threadFactory .newThread (this ::virtualThreadSchedulerLoop );
48- var builder = Thread .ofVirtual ().scheduler (this );
49- ThreadFactory rawVTFactory = builder .factory ();
50- this .vThreadFactory = (r ) -> rawVTFactory .newThread (() -> ScopedValue .where (CURRENT_SCHEDULER , this ).run (r ));
51- this .eventLoopThread = vThreadFactory .newThread (
52- // this is enabling the adaptive allocator to use unshared magazines
53- () -> FastThreadLocalThread .runWithFastThreadLocal (this ::nettyEventLoop ));
54- this .ioEventLoop = new ManualIoEventLoop (parent , eventLoopThread ,
38+ schedulerReference = new AtomicReference <>(this );
39+ running = new AtomicBoolean (false );
40+ runQueue = new MpscUnboundedArrayQueue <>(resumedContinuationsExpectedCount );
41+ submissionQueue = runQueue ;
42+ carrierThread = threadFactory .newThread (this ::virtualThreadSchedulerLoop );
43+ var rawVTFactory = Thread .ofVirtual ().factory ();
44+ vThreadFactory = runnable ->
45+ GlobalDelegateThreadNettyScheduler .assignUnstarted (rawVTFactory .newThread (
46+ () -> ScopedValue .where (CURRENT_SCHEDULER , schedulerReference ).run (runnable )), schedulerReference );
47+ eventLoopThread = vThreadFactory .newThread (() -> FastThreadLocalThread .runWithFastThreadLocal (this ::nettyEventLoop ));
48+ ioEventLoop = new ManualIoEventLoop (parent , eventLoopThread ,
5549 ioExecutor -> new AwakeAwareIoHandler (running , ioHandlerFactory .newHandler (ioExecutor )));
56- // we can start the carrier only after all the fields are initialized
57- eventLoopContinuationAvailable = new CountDownLatch (1 );
5850 carrierThread .start ();
59- // TODO we cannot make the virtual thread factory available until the event loop v thread is started:
60- // we can save this if we can query the virtual thread associated with a continuation
61- try {
62- eventLoopContinuationAvailable .await ();
63- } catch (InterruptedException e ) {
64- throw new RuntimeException (e );
65- }
6651 }
6752
6853 int externalContinuationsCount () {
69- return externalContinuations .size ();
54+ return runQueue .size ();
7055 }
7156
7257 public ThreadFactory virtualThreadFactory () {
@@ -111,29 +96,35 @@ private boolean runIO(boolean canBlock) {
11196 running .set (true );
11297 }
11398 } else {
114- running .set (false );
99+ running .set (true );
115100 }
116101 }
117102 return ioEventLoop .runNow (RUNNING_YIELD_US ) == 0 ;
118103 }
119104
105+ private boolean runEventLoopContinuation () {
106+ assert Thread .currentThread () == carrierThread ;
107+ var eventLoopContinuation = this .eventLoopContinuatioToRun ;
108+ if (eventLoopContinuation != null ) {
109+ this .eventLoopContinuatioToRun = null ;
110+ eventLoopContinuation .run ();
111+ return true ;
112+ }
113+ return false ;
114+ }
115+
120116 private void virtualThreadSchedulerLoop () {
121117 // start the event loop thread
122118 var eventLoop = this .ioEventLoop ;
123119 eventLoopThread .start ();
124- // we expect here the continuation to be set up already
125- var eventLoopContinuation = this .eventLoopContinuation ;
126- assert eventLoopContinuation != null && eventLoopContinuationAvailable .getCount () == 0 ;
120+ assert eventLoopContinuatioToRun != null ;
127121 // we keep on running until the event loop is shutting-down
128122 while (!eventLoop .isTerminated ()) {
129123 // if the event loop was idle, we apply a different limit to the yield time
130124 final boolean eventLoopRunning = running .get ();
131125 final long yieldDurationNs = eventLoopRunning ? RUNNING_YIELD_US : IDLE_YIELD_US ;
132126 int count = runExternalContinuations (yieldDurationNs );
133- if (submittedEventLoopContinuation ) {
134- submittedEventLoopContinuation = false ;
135- eventLoopContinuation .run ();
136- } else if (count == 0 ) {
127+ if (!runEventLoopContinuation () && count == 0 ) {
137128 // nothing to run, including the event loop: we can park
138129 parkedCarrierThread = carrierThread ;
139130 if (canBlock ()) {
@@ -142,24 +133,27 @@ private void virtualThreadSchedulerLoop() {
142133 parkedCarrierThread = null ;
143134 }
144135 }
145- while (!canBlock ()) {
146- // we still have continuations to run, let's run them
136+ // make sure the event loop thread is fully terminated
137+ // TODO verify that isAlive works!
138+ while (eventLoopThread .isAlive ()) {
147139 runExternalContinuations (RUNNING_YIELD_US );
148- if (submittedEventLoopContinuation ) {
149- submittedEventLoopContinuation = false ;
150- eventLoopContinuation .run ();
151- }
140+ runEventLoopContinuation ();
141+ }
142+ schedulerReference .set (null );
143+ submissionQueue = null ;
144+ // StoreLoad barrier
145+ while (!runQueue .isEmpty ()) {
146+ runExternalContinuations (IDLE_YIELD_US );
152147 }
153148 }
154149
155150 private boolean canBlock () {
156- return externalContinuations .isEmpty () && ! submittedEventLoopContinuation ;
151+ return runQueue .isEmpty () && eventLoopContinuatioToRun == null ;
157152 }
158153
159154 private int runExternalContinuations (long deadlineNs ) {
160- assert eventLoopContinuation != null ;
161155 final long startDrainingNs = System .nanoTime ();
162- var ready = this .externalContinuations ;
156+ var ready = this .runQueue ;
163157 int runContinuations = 0 ;
164158 for (; ; ) {
165159 var continuation = ready .poll ();
@@ -176,51 +170,48 @@ private int runExternalContinuations(long deadlineNs) {
176170 return runContinuations ;
177171 }
178172
179- @ Override
180- public void execute (Thread vthread , Runnable command ) {
181- // The default scheduler won't shut down, but Netty's event loop can!
182- Runnable eventLoopContinuation = this .eventLoopContinuation ;
183- if (eventLoopContinuation == null ) {
184- eventLoopContinuation = setEventLoopContinuation (command );
173+ private boolean eventLoopContinuation (Thread .VirtualThreadTask task ) {
174+ if (eventLoopContinuatioToRun != null ) {
175+ assert task .thread () != eventLoopThread ;
176+ return false ;
185177 }
186- if (eventLoopContinuation == command ) {
187- submittedEventLoopContinuation = true ;
188- } else {
189- if (ioEventLoop .isTerminated ()) {
190- throw new RejectedExecutionException ("event loop is shutting down" );
191- }
192- externalContinuations .offer (command );
193- }
194- if (!inEventLoop (Thread .currentThread ())) {
195- // TODO: if we have access to the scheduler brought by the continuation,
196- // we could skip the wakeup if matches with this.
197- ioEventLoop .wakeup ();
198- LockSupport .unpark (parkedCarrierThread );
199- } else if (eventLoopContinuation != command && !running .get ()) {
200- // since the event loop was blocked, it's fine if we try to consume some continuations, if any
201- Thread .yield ();
178+ if (task .thread () == eventLoopThread ) {
179+ eventLoopContinuatioToRun = task ;
180+ return true ;
202181 }
182+ return false ;
203183 }
204184
205- private boolean inEventLoop (Thread thread ) {
206- if (!thread .isVirtual ()) {
207- return ioEventLoop .inEventLoop (thread );
208- }
209-
210- return CURRENT_SCHEDULER .orElse (EMPTY ) == this ;
211- }
212-
213- private Runnable setEventLoopContinuation (Runnable command ) {
214- // this is the first command, we need to set the continuation
215- this .eventLoopContinuation = command ;
216- // we need to notify the event loop that we have a continuation
217- eventLoopContinuationAvailable .countDown ();
218- return command ;
219- }
220-
221- public static VirtualThreadNettyScheduler current () {
222- Thread .VirtualThreadScheduler virtualThreadScheduler = CURRENT_SCHEDULER .orElse (EMPTY );
223- return virtualThreadScheduler == EMPTY ? null : (VirtualThreadNettyScheduler ) virtualThreadScheduler ;
185+ public boolean execute (Thread .VirtualThreadTask task ) {
186+ boolean isEventLoopContinuation = eventLoopContinuation (task );
187+ if (!isEventLoopContinuation ) {
188+ var q = submissionQueue ;
189+ if (q == null ) {
190+ return false ;
191+ }
192+ q .offer (task );
193+ // double-check in case we are shutting down
194+ if (submissionQueue == null ) {
195+ // TODO
196+ // What if:
197+ // 1) we added to the queue
198+ // 2) we got paused for long time
199+ // 3) the scheduler found the task and run it
200+ // 4) we are resuming here and return false
201+ // This is not ok, need to be fixed!
202+ return false ;
203+ }
204+ }
205+ if (!ioEventLoop .inEventLoop (Thread .currentThread ())) {
206+ // TODO: if we have access to the scheduler brought by the continuation,
207+ // we could skip the wakeup if matches with this.
208+ ioEventLoop .wakeup ();
209+ LockSupport .unpark (parkedCarrierThread );
210+ } else if (!isEventLoopContinuation && !running .get ()) {
211+ assert eventLoopContinuatioToRun == null ;
212+ // since the event loop was blocked, it's fine if we try to consume some continuations, if any
213+ Thread .yield ();
214+ }
215+ return true ;
224216 }
225-
226217}
0 commit comments