Skip to content

Commit 1da6cd9

Browse files
committed
Enhance EventLoopScheduler and NettyScheduler with improved thread handling and validation checks
1 parent c357017 commit 1da6cd9

5 files changed

Lines changed: 189 additions & 21 deletions

File tree

core/src/main/java/io/netty/loom/EventLoopScheduler.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@
1212

1313
public class EventLoopScheduler {
1414

15+
/**
16+
* A shared reference to an EventLoopScheduler, which can be cleared when the
17+
* scheduler is fully terminated.
18+
*/
1519
public static final class SharedRef {
1620

1721
private volatile EventLoopScheduler ref;
@@ -103,8 +107,8 @@ public ThreadFactory virtualThreadFactory() {
103107
return vThreadFactory;
104108
}
105109

106-
public Thread eventLoopThread() {
107-
return eventLoopThread;
110+
Thread carrierThread() {
111+
return carrierThread;
108112
}
109113

110114
public ManualIoEventLoop ioEventLoop() {
@@ -257,6 +261,10 @@ public boolean execute(Thread.VirtualThreadTask task) {
257261
return true;
258262
}
259263

264+
/**
265+
* Get the current thread's associated EventLoopScheduler context, or an empty
266+
* one if none is associated.
267+
*/
260268
public static SchedulingContext currentThreadSchedulerContext() {
261269
return CURRENT_SCHEDULER.orElse(EMPTY_SCHEDULER_CONTEXT);
262270
}

core/src/main/java/io/netty/loom/NettyScheduler.java

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,33 @@
2222

2323
public class NettyScheduler implements Thread.VirtualThreadScheduler {
2424

25-
private static NettyScheduler INSTANCE;
25+
private static volatile NettyScheduler INSTANCE;
2626

2727
private final Thread.VirtualThreadScheduler jdkBuildinScheduler;
2828

2929
private final ConcurrentHashMap<Thread, SharedRef> unstartedThreads = new ConcurrentHashMap<>();
3030

3131
private final boolean perCarrierPollers;
3232

33+
private static NettyScheduler ensureInstalled() {
34+
var instance = INSTANCE;
35+
if (instance != null) {
36+
return instance;
37+
}
38+
Thread.ofVirtual().unstarted(new Runnable() {
39+
@Override
40+
public void run() {
41+
42+
}
43+
});
44+
// we expect VirtualThread clinit to have loaded it by now
45+
return INSTANCE;
46+
}
47+
3348
public NettyScheduler(Thread.VirtualThreadScheduler jdkBuildinScheduler) {
3449
this.jdkBuildinScheduler = jdkBuildinScheduler;
35-
INSTANCE = this;
3650
perCarrierPollers = Integer.getInteger("jdk.pollerMode", -1) == 3;
37-
VarHandle.storeStoreFence();
51+
INSTANCE = this;
3852
}
3953

4054
public boolean expectsPerCarrierPollers() {
@@ -59,24 +73,21 @@ public void onStart(Thread.VirtualThreadTask virtualThreadTask) {
5973
var assignedSchedulerRef = unstartedThreads.remove(virtualThreadTask.thread());
6074
if (assignedSchedulerRef == null) {
6175
if (perCarrierPollers) {
62-
// Read-Poller threads are special: if we run from a VThread managed by a
63-
// VirtualThreadNettyScheduler,
64-
// we want should continue using that same scheduler for the Read-Poller thread.
65-
var currentThread = Thread.currentThread();
66-
if (currentThread.isVirtual()) {
76+
// Read-Poller threads should always inherit the event loop scheduler from the
77+
// caller thread
78+
if (Thread.currentThread().isVirtual()) {
6779
// TODO
6880
// https://github.com/openjdk/loom/blob/12ddf39bb59252a8274d8b937bd075b2a6dbc3f8/src/java.base/share/classes/java/lang/VirtualThread.java#L270C18-L270C33
6981
// in theory should be easy to provide a VirtualThreadTask::current method to
7082
// avoid the ScopedValue lookup
71-
var ctx = EventLoopScheduler.currentThreadSchedulerContext();
72-
var schedulerRef = ctx.scheduler();
83+
var schedulerRef = EventLoopScheduler.currentThreadSchedulerContext().scheduler();
7384
// See
7485
// https://github.com/openjdk/loom/blob/12ddf39bb59252a8274d8b937bd075b2a6dbc3f8/src/java.base/share/classes/sun/nio/ch/Poller.java#L723C48-L723C59
7586
if (schedulerRef != null) {
76-
var runningScheduler = schedulerRef.get();
77-
if (runningScheduler != null && virtualThreadTask.thread().getName().endsWith("-Read-Poller")) {
87+
var scheduler = schedulerRef.get();
88+
if (scheduler != null && virtualThreadTask.thread().getName().endsWith("-Read-Poller")) {
7889
virtualThreadTask.attach(schedulerRef);
79-
if (runningScheduler.execute(virtualThreadTask)) {
90+
if (scheduler.execute(virtualThreadTask)) {
8091
return;
8192
}
8293
virtualThreadTask.attach(null);
@@ -118,15 +129,15 @@ public void onContinue(Thread.VirtualThreadTask virtualThreadTask) {
118129
}
119130

120131
static Thread assignUnstarted(Thread unstarted, SharedRef ref) {
121-
INSTANCE.unstartedThreads.put(unstarted, ref);
132+
ensureInstalled().unstartedThreads.put(unstarted, ref);
122133
return unstarted;
123134
}
124135

125136
public static boolean perCarrierPollers() {
126-
return INSTANCE.perCarrierPollers;
137+
return ensureInstalled().perCarrierPollers;
127138
}
128139

129140
public static boolean isAvailable() {
130-
return INSTANCE != null;
141+
return ensureInstalled() != null;
131142
}
132143
}

core/src/main/java/io/netty/loom/VirtualMultithreadIoEventLoopGroup.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package io.netty.loom;
22

33
import java.util.ArrayList;
4-
import java.util.IdentityHashMap;
54
import java.util.concurrent.Executor;
65
import java.util.concurrent.ThreadFactory;
76
import java.util.concurrent.atomic.AtomicLong;
@@ -22,6 +21,9 @@ public VirtualMultithreadIoEventLoopGroup(int nThreads, IoHandlerFactory ioHandl
2221
super(nThreads, (Executor) command -> {
2322
throw new UnsupportedOperationException("this executor is not supposed to be used");
2423
}, ioHandlerFactory);
24+
}
25+
26+
private static void validateNettyAvailability() {
2527
if (!NettyScheduler.isAvailable()) {
2628
throw new IllegalStateException(
2729
"-Djdk.virtualThreadScheduler.implClass=io.netty.loom.NettyScheduler is required to use VirtualMultithreadIoEventLoopGroup");
@@ -63,6 +65,7 @@ public ThreadFactory vThreadFactory() {
6365
@Override
6466
protected IoEventLoop newChild(Executor executor, IoHandlerFactory ioHandlerFactory,
6567
@SuppressWarnings("unused") Object... args) {
68+
validateNettyAvailability();
6669
if (eventLoopSchedulers == null) {
6770
eventLoopSchedulers = new ArrayList<>(executorCount());
6871
nextScheduler = new AtomicLong();
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package io.netty.loom;
2+
3+
import org.junit.jupiter.api.Test;
4+
5+
import static org.junit.jupiter.api.Assertions.assertTrue;
6+
7+
public class NettySchedulerClassInitTest {
8+
9+
@Test
10+
public void testNettySchedulerClassInitialization() {
11+
assertTrue(NettyScheduler.isAvailable());
12+
}
13+
}

core/src/test/java/io/netty/loom/VirtualMultithreadIoEventLoopGroupTest.java

Lines changed: 135 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@
1313
import java.net.http.HttpClient;
1414
import java.net.http.HttpRequest;
1515
import java.net.http.HttpResponse;
16+
import java.time.Duration;
1617
import java.util.concurrent.*;
1718
import java.util.concurrent.atomic.AtomicBoolean;
1819
import java.util.concurrent.atomic.AtomicInteger;
20+
import java.util.concurrent.atomic.AtomicReference;
1921

2022
import io.netty.channel.Channel;
2123
import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -494,14 +496,16 @@ void testBlockingIO() throws IOException, InterruptedException, ExecutionExcepti
494496
var clientOut = clientSocket.getOutputStream();
495497
var serverIn = serverSocket.getInputStream()) {
496498
var readerVThreadPromise = new CompletableFuture<Thread>();
499+
var readCompleted = new CompletableFuture<Void>();
497500
group.execute(() -> {
498501
var readerThread = group.vThreadFactory().newThread(() -> {
499502
var eventLoopScheduler = EventLoopScheduler.currentThreadSchedulerContext().scheduler().get();
500503
assertEquals(0, eventLoopScheduler.externalContinuationsCount());
501504
try {
502505
serverIn.read();
503-
} catch (IOException e) {
504-
throw new RuntimeException(e);
506+
readCompleted.complete(null);
507+
} catch (Throwable e) {
508+
readCompleted.completeExceptionally(e);
505509
}
506510
});
507511
readerVThreadPromise.complete(readerThread);
@@ -518,4 +522,133 @@ void testBlockingIO() throws IOException, InterruptedException, ExecutionExcepti
518522
}
519523
}
520524
}
525+
526+
@Test
527+
void testShutdownSchedulerOnBlockingIO() throws IOException, InterruptedException, ExecutionException {
528+
assumeTrue(NettyScheduler.perCarrierPollers());
529+
try (var group = new VirtualMultithreadIoEventLoopGroup(1, LocalIoHandler.newFactory());
530+
var serverAcceptor = new ServerSocket(0)) {
531+
var serverSocketPromise = new CompletableFuture<Socket>();
532+
Thread.ofVirtual().start(() -> {
533+
try {
534+
serverSocketPromise.complete(serverAcceptor.accept());
535+
} catch (Throwable e) {
536+
// complete exceptionally
537+
serverSocketPromise.completeExceptionally(e);
538+
}
539+
});
540+
try (var clientSocket = new Socket("localhost", serverAcceptor.getLocalPort());
541+
var serverSocket = serverSocketPromise.join();
542+
var clientOut = clientSocket.getOutputStream();
543+
var serverIn = serverSocket.getInputStream()) {
544+
var schedulerRef = new AtomicReference<EventLoopScheduler.SharedRef>();
545+
var firstReadCompleted = new CompletableFuture<Void>();
546+
var secondReadCompleted = new CompletableFuture<Void>();
547+
var readerVThreadPromise = new CompletableFuture<Thread>();
548+
group.execute(() -> {
549+
var readerThread = group.vThreadFactory().newThread(() -> {
550+
schedulerRef.lazySet(EventLoopScheduler.currentThreadSchedulerContext().scheduler());
551+
var eventLoopScheduler = EventLoopScheduler.currentThreadSchedulerContext().scheduler().get();
552+
assertEquals(0, eventLoopScheduler.externalContinuationsCount());
553+
try {
554+
serverIn.read();
555+
firstReadCompleted.complete(null);
556+
try {
557+
serverIn.read();
558+
secondReadCompleted.complete(null);
559+
} catch (Throwable e) {
560+
secondReadCompleted.completeExceptionally(e);
561+
}
562+
} catch (Throwable e) {
563+
firstReadCompleted.completeExceptionally(e);
564+
}
565+
});
566+
readerVThreadPromise.complete(readerThread);
567+
readerThread.start();
568+
569+
});
570+
var readerVThread = readerVThreadPromise.get();
571+
// it has to be waiting on read
572+
while (readerVThread.getState() != Thread.State.WAITING) {
573+
Thread.sleep(1);
574+
}
575+
Thread carrier = schedulerRef.get().get().carrierThread();
576+
group.shutdownGracefully().get();
577+
assertTrue(carrier.join(Duration.MAX));
578+
// unblock the client and expect the read to complete
579+
clientOut.write(1);
580+
firstReadCompleted.join();
581+
// it has to be waiting on read
582+
while (readerVThread.getState() != Thread.State.WAITING) {
583+
Thread.sleep(1);
584+
}
585+
clientOut.write(1);
586+
secondReadCompleted.join();
587+
}
588+
}
589+
}
590+
591+
@Test
592+
void testShutdownSchedulerOnLongBlockingIO() throws IOException, InterruptedException, ExecutionException {
593+
assumeTrue(NettyScheduler.perCarrierPollers());
594+
int bytesToWrite = 16;
595+
try (var group = new VirtualMultithreadIoEventLoopGroup(1, LocalIoHandler.newFactory());
596+
var serverAcceptor = new ServerSocket(0)) {
597+
var serverSocketPromise = new CompletableFuture<Socket>();
598+
Thread.ofVirtual().start(() -> {
599+
try {
600+
serverSocketPromise.complete(serverAcceptor.accept());
601+
} catch (Throwable e) {
602+
// complete exceptionally
603+
serverSocketPromise.completeExceptionally(e);
604+
}
605+
});
606+
try (var clientSocket = new Socket("localhost", serverAcceptor.getLocalPort());
607+
var serverSocket = serverSocketPromise.join();
608+
var clientOut = clientSocket.getOutputStream();
609+
var serverIn = serverSocket.getInputStream()) {
610+
var schedulerRef = new AtomicReference<EventLoopScheduler.SharedRef>();
611+
var readCompleted = new CompletableFuture<byte[]>();;
612+
var readerVThreadPromise = new CompletableFuture<Thread>();
613+
group.execute(() -> {
614+
var readerThread = group.vThreadFactory().newThread(() -> {
615+
schedulerRef.lazySet(EventLoopScheduler.currentThreadSchedulerContext().scheduler());
616+
var eventLoopScheduler = EventLoopScheduler.currentThreadSchedulerContext().scheduler().get();
617+
assertEquals(0, eventLoopScheduler.externalContinuationsCount());
618+
try {
619+
byte[] data = serverIn.readNBytes(bytesToWrite);
620+
readCompleted.complete(data);
621+
} catch (Throwable e) {
622+
readCompleted.completeExceptionally(e);
623+
}
624+
});
625+
readerVThreadPromise.complete(readerThread);
626+
readerThread.start();
627+
628+
});
629+
var readerVThread = readerVThreadPromise.get();
630+
byte[] toWrite = new byte[bytesToWrite];
631+
int shutDownAt = toWrite.length / 2;
632+
for (int i = 0; i < toWrite.length; i++) {
633+
toWrite[i] = (byte) i;
634+
}
635+
// it has to be waiting on read
636+
while (readerVThread.getState() != Thread.State.WAITING) {
637+
Thread.sleep(1);
638+
}
639+
for (int i = 0; i < toWrite.length; i++) {
640+
Thread.sleep(10); // make sure the read is parked
641+
byte b = toWrite[i];
642+
// shutdown whilst the read is parked
643+
if (i == shutDownAt) {
644+
Thread carrier = schedulerRef.get().get().carrierThread();
645+
group.shutdownGracefully().get();
646+
assertTrue(carrier.join(Duration.MAX));
647+
}
648+
clientOut.write(b);
649+
}
650+
assertArrayEquals(toWrite, readCompleted.join());
651+
}
652+
}
653+
}
521654
}

0 commit comments

Comments
 (0)