22
33import io .netty .loom .EventLoopScheduler .SharedRef ;
44
5- import java .util .concurrent .ConcurrentHashMap ;
6-
75/**
86 * Global Netty scheduler proxy for virtual threads.
97 *
@@ -36,8 +34,6 @@ public class NettyScheduler implements Thread.VirtualThreadScheduler {
3634
3735 private final Thread .VirtualThreadScheduler jdkBuildinScheduler ;
3836
39- private final ConcurrentHashMap <Thread , SharedRef > unstartedThreads = new ConcurrentHashMap <>();
40-
4137 private final boolean perCarrierPollers ;
4238
4339 private static NettyScheduler ensureInstalled () {
@@ -71,17 +67,15 @@ Thread.VirtualThreadScheduler jdkBuildinScheduler() {
7167
7268 @ Override
7369 public void onStart (Thread .VirtualThreadTask virtualThreadTask ) {
74- // TODO this is not great for 2 reasons:
75- // 1. we are doing a remove on a concurrent map even for v threads which are not
76- // really interesting to us
77- // 2. if a vThread will never start, it will leak here forever
78- // HINT: if we had a VirtualThreadTask::Of(VirtualThread) method, we could
79- // perform the assignment BEFORE calling this
80- // on the vThread factory
81- // or the vThreadFactory could provide in its build method something to access
82- // the VirtualThreadTask of an unstarted VirtualThread
83- var assignedSchedulerRef = unstartedThreads .remove (virtualThreadTask .thread ());
84- if (assignedSchedulerRef == null ) {
70+ if (virtualThreadTask .attachment () instanceof SharedRef ref ) {
71+ var eventLoop = ref .get ();
72+ if (eventLoop != null && eventLoop .execute (virtualThreadTask )) {
73+ return ;
74+ }
75+ // the v thread has been rejected by its assigned scheduler or its scheduler is
76+ // gone
77+ virtualThreadTask .attach (null );
78+ } else {
8579 if (perCarrierPollers ) {
8680 // Read-Poller threads should always inherit the event loop scheduler from the
8781 // caller thread
@@ -105,31 +99,16 @@ public void onStart(Thread.VirtualThreadTask virtualThreadTask) {
10599 }
106100 }
107101 }
108- } else {
109- var scheduler = assignedSchedulerRef .get ();
110- if (scheduler != null ) {
111- // attach the assigned scheduler to the task
112- virtualThreadTask .attach (assignedSchedulerRef );
113- if (scheduler .execute (virtualThreadTask )) {
114- return ;
115- }
116- }
117- // the v thread has been rejected by its assigned scheduler or its scheduler is
118- // gone
119- virtualThreadTask .attach (null );
120102 }
121103 jdkBuildinScheduler .onStart (virtualThreadTask );
122104 }
123105
124106 @ Override
125107 public void onContinue (Thread .VirtualThreadTask virtualThreadTask ) {
126- var attachment = virtualThreadTask .attachment ();
127- if (attachment instanceof SharedRef ref ) {
128- var assignedScheduler = ref .get ();
129- if (assignedScheduler != null ) {
130- if (assignedScheduler .execute (virtualThreadTask )) {
131- return ;
132- }
108+ if (virtualThreadTask .attachment () instanceof SharedRef ref ) {
109+ var eventLoop = ref .get ();
110+ if (eventLoop != null && eventLoop .execute (virtualThreadTask )) {
111+ return ;
133112 }
134113 // the v thread has been rejected by its assigned scheduler or its scheduler is
135114 // gone
@@ -138,11 +117,6 @@ public void onContinue(Thread.VirtualThreadTask virtualThreadTask) {
138117 jdkBuildinScheduler .onContinue (virtualThreadTask );
139118 }
140119
141- static Thread assignUnstarted (Thread unstarted , SharedRef ref ) {
142- ensureInstalled ().unstartedThreads .put (unstarted , ref );
143- return unstarted ;
144- }
145-
146120 public static boolean perCarrierPollers () {
147121 return ensureInstalled ().perCarrierPollers ;
148122 }
0 commit comments