11package io .netty .loom ;
22
3+ import java .util .ArrayList ;
34import java .util .IdentityHashMap ;
45import java .util .concurrent .Executor ;
56import java .util .concurrent .ThreadFactory ;
7+ import java .util .concurrent .atomic .AtomicLong ;
68
79import io .netty .channel .IoEventLoop ;
810import io .netty .channel .IoHandlerFactory ;
911import io .netty .channel .MultiThreadIoEventLoopGroup ;
10- import io .netty .util .concurrent .FastThreadLocal ;
1112
1213public class VirtualMultithreadIoEventLoopGroup extends MultiThreadIoEventLoopGroup {
1314
1415 private static final int RESUMED_CONTINUATIONS_EXPECTED_COUNT = Integer .getInteger ("io.netty.loom.resumed.continuations" , 1024 );
16+ private ArrayList <EventLoopScheduler > eventLoopSchedulers ;
17+ private AtomicLong nextScheduler ;
1518 private ThreadFactory threadFactory ;
16- private IdentityHashMap <Thread , EventLoopScheduler > schedulers ;
17- private final FastThreadLocal <EventLoopScheduler > v_thread_factory = new FastThreadLocal <>() {
18- @ Override
19- protected EventLoopScheduler initialValue () {
20- return schedulers .get (Thread .currentThread ());
21- }
22- };
2319
2420 public VirtualMultithreadIoEventLoopGroup (int nThreads , IoHandlerFactory ioHandlerFactory ) {
2521 super (nThreads , (Executor ) command -> {
@@ -30,21 +26,46 @@ public VirtualMultithreadIoEventLoopGroup(int nThreads, IoHandlerFactory ioHandl
3026 }
3127 }
3228
29+ /**
30+ * Return a {@link ThreadFactory} that creates virtual threads tied to an
31+ * {@link EventLoopScheduler} of this group.
32+ *
33+ * <p>If the current thread has an associated {@link EventLoopScheduler} whose
34+ * {@link io.netty.channel.IoEventLoop#parent()} is this group, that scheduler's
35+ * {@code virtualThreadFactory()} is returned so newly created virtual threads
36+ * are associated with the current event loop.</p>
37+ *
38+ * <p>Otherwise a randomly assigned scheduler from this group is used and its
39+ * {@code virtualThreadFactory()} is returned.</p>
40+ *
41+ * @return a {@link ThreadFactory} producing virtual threads backed by an
42+ * {@link EventLoopScheduler} of this group
43+ **/
3344 public ThreadFactory vThreadFactory () {
34- return v_thread_factory .get ().virtualThreadFactory ();
45+ var schedulerRef = EventLoopScheduler .currentThreadSchedulerContext ().scheduler ();
46+ if (schedulerRef != null ) {
47+ var scheduler = schedulerRef .get ();
48+ if (scheduler != null && scheduler .ioEventLoop ().parent () == this ) {
49+ return scheduler .virtualThreadFactory ();
50+ }
51+ }
52+ // assign a random one
53+ int schedulerIndex = (int ) (nextScheduler .getAndIncrement () % executorCount ());
54+ return eventLoopSchedulers .get (schedulerIndex ).virtualThreadFactory ();
3555 }
3656
3757 @ Override
3858 protected IoEventLoop newChild (Executor executor , IoHandlerFactory ioHandlerFactory ,
3959 @ SuppressWarnings ("unused" ) Object ... args ) {
40- if (schedulers == null ) {
41- schedulers = new IdentityHashMap <>();
60+ if (eventLoopSchedulers == null ) {
61+ eventLoopSchedulers = new ArrayList <>(executorCount ());
62+ nextScheduler = new AtomicLong ();
4263 }
4364 if (threadFactory == null ) {
4465 threadFactory = newDefaultThreadFactory ();
4566 }
4667 var customScheduler = new EventLoopScheduler (this , threadFactory , ioHandlerFactory , RESUMED_CONTINUATIONS_EXPECTED_COUNT );
47- schedulers . put ( customScheduler . eventLoopThread (), customScheduler );
68+ eventLoopSchedulers . add ( customScheduler );
4869 return customScheduler .ioEventLoop ();
4970 }
5071
0 commit comments