33import java .util .concurrent .ThreadFactory ;
44import java .util .concurrent .TimeUnit ;
55import java .util .concurrent .atomic .AtomicBoolean ;
6- import java .util .concurrent .atomic .AtomicReference ;
76import java .util .concurrent .locks .LockSupport ;
87
98import io .netty .channel .IoEventLoopGroup ;
1312
1413public class EventLoopScheduler {
1514
15+ public static final class SchedulerRef {
16+
17+ private volatile EventLoopScheduler ref ;
18+
19+ private SchedulerRef (EventLoopScheduler ref ) {
20+ this .ref = ref ;
21+ }
22+
23+ public EventLoopScheduler get () {
24+ return ref ;
25+ }
26+ }
27+
1628 private static final long MAX_WAIT_TASKS_NS = TimeUnit .HOURS .toNanos (1 );
1729 // These are the soft-guaranteed yield times for the event loop whilst Thread.yield() is called.
1830 // Based on the status of the event loop (resuming from blocking or non-blocking, controlled by the running flag)
1931 // a different limit is applied.
2032 private static final long RUNNING_YIELD_US = TimeUnit .MICROSECONDS .toNanos (Integer .getInteger ("io.netty.loom.running.yield.us" , 1 ));
2133 private static final long IDLE_YIELD_US = TimeUnit .MICROSECONDS .toNanos (Integer .getInteger ("io.netty.loom.idle.yield.us" , 1 ));
2234 // This is required to allow sub-pollers to run on the correct scheduler
23- private static final ScopedValue <AtomicReference < EventLoopScheduler > > CURRENT_SCHEDULER = ScopedValue .newInstance ();
24- private static final AtomicReference < EventLoopScheduler > EMPTY_REFERENCE = new AtomicReference <>( );
35+ private static final ScopedValue <SchedulerRef > CURRENT_SCHEDULER = ScopedValue .newInstance ();
36+ private static final SchedulerRef EMPTY_REF = new SchedulerRef ( null );
2537 private final MpscUnboundedStream <Runnable > runQueue ;
2638 private final ManualIoEventLoop ioEventLoop ;
2739 private final Thread eventLoopThread ;
2840 private final Thread carrierThread ;
2941 private volatile Thread parkedCarrierThread ;
3042 private volatile Runnable eventLoopContinuatioToRun ;
3143 private final ThreadFactory vThreadFactory ;
32- private final AtomicBoolean running ;
33- private final AtomicReference < EventLoopScheduler > schedulerReference ;
44+ private final AtomicBoolean eventLoopIsRunning ;
45+ private final SchedulerRef publishedReference ;
3446
3547 public EventLoopScheduler (IoEventLoopGroup parent , ThreadFactory threadFactory , IoHandlerFactory ioHandlerFactory , int resumedContinuationsExpectedCount ) {
36- schedulerReference = new AtomicReference <> (this );
37- running = new AtomicBoolean (false );
48+ publishedReference = new SchedulerRef (this );
49+ eventLoopIsRunning = new AtomicBoolean (false );
3850 runQueue = new MpscUnboundedStream <>(resumedContinuationsExpectedCount );
3951 carrierThread = threadFactory .newThread (this ::virtualThreadSchedulerLoop );
4052 var rawVTFactory = Thread .ofVirtual ().factory ();
4153 vThreadFactory = runnable ->
4254 NettyScheduler .assignUnstarted (rawVTFactory .newThread (
43- () -> ScopedValue .where (CURRENT_SCHEDULER , schedulerReference ).run (runnable )), schedulerReference );
55+ () -> ScopedValue .where (CURRENT_SCHEDULER , publishedReference ).run (runnable )), publishedReference );
4456 eventLoopThread = vThreadFactory .newThread (() -> FastThreadLocalThread .runWithFastThreadLocal (this ::nettyEventLoop ));
4557 ioEventLoop = new ManualIoEventLoop (parent , eventLoopThread ,
46- ioExecutor -> new AwakeAwareIoHandler (running , ioHandlerFactory .newHandler (ioExecutor )));
58+ ioExecutor -> new AwakeAwareIoHandler (eventLoopIsRunning , ioHandlerFactory .newHandler (ioExecutor )));
4759 carrierThread .start ();
4860 }
4961
@@ -64,7 +76,7 @@ public ManualIoEventLoop ioEventLoop() {
6476 }
6577
6678 private void nettyEventLoop () {
67- running .set (true );
79+ eventLoopIsRunning .set (true );
6880 assert ioEventLoop .inEventLoop (Thread .currentThread ()) && Thread .currentThread ().isVirtual ();
6981 boolean canBlock = false ;
7082 while (!ioEventLoop .isShuttingDown ()) {
@@ -84,16 +96,16 @@ private void nettyEventLoop() {
8496 private boolean runIO (boolean canBlock ) {
8597 if (canBlock ) {
8698 // try to go to sleep waiting for I/O tasks
87- running .set (false );
99+ eventLoopIsRunning .set (false );
88100 // StoreLoad barrier: see https://www.scylladb.com/2018/02/15/memory-barriers-seastar-linux/
89101 if (canBlock ()) {
90102 try {
91103 return ioEventLoop .run (MAX_WAIT_TASKS_NS , RUNNING_YIELD_US ) == 0 ;
92104 } finally {
93- running .set (true );
105+ eventLoopIsRunning .set (true );
94106 }
95107 } else {
96- running .set (true );
108+ eventLoopIsRunning .set (true );
97109 }
98110 }
99111 return ioEventLoop .runNow (RUNNING_YIELD_US ) == 0 ;
@@ -118,7 +130,7 @@ private void virtualThreadSchedulerLoop() {
118130 // we keep on running until the event loop is shutting-down
119131 while (!eventLoop .isTerminated ()) {
120132 // if the event loop was idle, we apply a different limit to the yield time
121- final boolean eventLoopRunning = running .get ();
133+ final boolean eventLoopRunning = eventLoopIsRunning .get ();
122134 final long yieldDurationNs = eventLoopRunning ? RUNNING_YIELD_US : IDLE_YIELD_US ;
123135 int count = runExternalContinuations (yieldDurationNs );
124136 if (!runEventLoopContinuation () && count == 0 ) {
@@ -136,7 +148,8 @@ private void virtualThreadSchedulerLoop() {
136148 runExternalContinuations (RUNNING_YIELD_US );
137149 runEventLoopContinuation ();
138150 }
139- schedulerReference .set (null );
151+ // the event loop should be fully terminated
152+ publishedReference .ref = null ;
140153 runQueue .close ();
141154 // StoreLoad barrier
142155 while (!runQueue .isEmpty ()) {
@@ -187,20 +200,25 @@ public boolean execute(Thread.VirtualThreadTask task) {
187200 }
188201 }
189202 if (!ioEventLoop .inEventLoop (Thread .currentThread ())) {
190- // TODO: if we have access to the scheduler brought by the continuation,
191- // we could skip the wakeup if matches with this.
192- ioEventLoop .wakeup ();
193- LockSupport .unpark (parkedCarrierThread );
194- } else if (!isEventLoopContinuation && !running .get ()) {
203+ // this is checking for "local" submissions: it assumes that
204+ // currentThreadEventLoopScheduler() is the currently assigned one, and up to date
205+ // WARNING!!!!!
206+ // work-stealing could break this assumption if we don't update the CURRENT_SCHEDULER scoped value accordingly
207+ if (currentThreadEventLoopScheduler () != publishedReference ) {
208+ ioEventLoop .wakeup ();
209+ LockSupport .unpark (parkedCarrierThread );
210+ }
211+ } else if (!isEventLoopContinuation && !eventLoopIsRunning .get ()) {
212+ // the event loop thread is allowed to give up cycles to consume external continuations
213+ // whilst is just woken up for I/O
195214 assert eventLoopContinuatioToRun == null ;
196- // since the event loop was blocked, it's fine if we try to consume some continuations, if any
197215 Thread .yield ();
198216 }
199217 return true ;
200218 }
201219
202- public static AtomicReference < EventLoopScheduler > currentRef () {
203- var ref = CURRENT_SCHEDULER .orElse (EMPTY_REFERENCE );
204- return ref == EMPTY_REFERENCE ? null : ref ;
220+ public static SchedulerRef currentThreadEventLoopScheduler () {
221+ var ref = CURRENT_SCHEDULER .orElse (EMPTY_REF );
222+ return ref == EMPTY_REF ? null : ref ;
205223 }
206224}
0 commit comments