Skip to content

Commit 8b0bc0d

Browse files
committed
update global scheduler behaviour
1 parent 60052bd commit 8b0bc0d

2 files changed

Lines changed: 38 additions & 9 deletions

File tree

core/src/main/java/io/netty/loom/GlobalDelegateThreadNettyScheduler.java

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,39 @@ public void onStart(Thread.VirtualThreadTask virtualThreadTask) {
3939
// 2. if a vThread will never start, it will leak here forever
4040
var assignedSchedulerRef = unstartedThreads.remove(virtualThreadTask.thread());
4141
if (assignedSchedulerRef == null) {
42-
// TODO per carrier sub-pollers goes here, but we want them to inherit the scheduler from the caller context
43-
}
44-
var scheduler = assignedSchedulerRef != null ? assignedSchedulerRef.get() : null;
45-
if (scheduler != null) {
46-
// attach the assigned scheduler to the task
47-
virtualThreadTask.attach(assignedSchedulerRef);
48-
if (scheduler.execute(virtualThreadTask)) {
49-
return;
42+
// Read-Poller threads are special: if we run from a VThread managed by a VirtualThreadNettyScheduler,
43+
// we want should continue using that same scheduler for the Read-Poller thread.
44+
var currentThread = Thread.currentThread();
45+
if (currentThread.isVirtual()) {
46+
// TODO https://github.com/openjdk/loom/blob/12ddf39bb59252a8274d8b937bd075b2a6dbc3f8/src/java.base/share/classes/java/lang/VirtualThread.java#L270C18-L270C33
47+
// in theory should be easy to provide a VirtualThreadTask::current method to avoid the ScopedValue lookup
48+
var schedulerRef = VirtualThreadNettyScheduler.currentRef();
49+
// TODO per carrier sub-pollers goes here, but we want them to inherit the scheduler from the caller context
50+
if (schedulerRef != null) {
51+
var scheduler = schedulerRef.get();
52+
// See https://github.com/openjdk/loom/blob/12ddf39bb59252a8274d8b937bd075b2a6dbc3f8/src/java.base/share/classes/sun/nio/ch/Poller.java#L723C48-L723C59
53+
if (scheduler != null) {
54+
if (virtualThreadTask.thread().getName().endsWith("-Read-Poller")) {
55+
// attach the assigned scheduler to the task
56+
virtualThreadTask.attach(schedulerRef);
57+
if (scheduler.execute(virtualThreadTask)) {
58+
return;
59+
}
60+
virtualThreadTask.attach(null);
61+
}
62+
}
63+
}
64+
}
65+
} else {
66+
var scheduler = assignedSchedulerRef.get();
67+
if (scheduler != null) {
68+
// attach the assigned scheduler to the task
69+
virtualThreadTask.attach(assignedSchedulerRef);
70+
if (scheduler.execute(virtualThreadTask)) {
71+
return;
72+
}
5073
}
51-
// the v thread has been rejected by its assigned scheduler, clean it up and fallback to JDK
74+
// the v thread has been rejected by its assigned scheduler or its scheduler is gone
5275
virtualThreadTask.attach(null);
5376
}
5477
jdkBuildinScheduler.onStart(virtualThreadTask);

core/src/main/java/io/netty/loom/VirtualThreadNettyScheduler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public class VirtualThreadNettyScheduler {
2121
private static final long IDLE_YIELD_US = TimeUnit.MICROSECONDS.toNanos(Integer.getInteger("io.netty.loom.idle.yield.us", 1));
2222
// This is required to allow sub-pollers to run on the correct scheduler
2323
private static final ScopedValue<AtomicReference<VirtualThreadNettyScheduler>> CURRENT_SCHEDULER = ScopedValue.newInstance();
24+
private static final AtomicReference<VirtualThreadNettyScheduler> EMPTY_REFERENCE = new AtomicReference<>();
2425
private final MpscUnboundedStream<Runnable> runQueue;
2526
private final ManualIoEventLoop ioEventLoop;
2627
private final Thread eventLoopThread;
@@ -197,4 +198,9 @@ public boolean execute(Thread.VirtualThreadTask task) {
197198
}
198199
return true;
199200
}
201+
202+
public static AtomicReference<VirtualThreadNettyScheduler> currentRef() {
203+
var ref = CURRENT_SCHEDULER.orElse(EMPTY_REFERENCE);
204+
return ref == EMPTY_REFERENCE? null : ref;
205+
}
200206
}

0 commit comments

Comments
 (0)