Skip to content

Commit 1c19c0b

Browse files
committed
Adjust the ScopedValue logic to correctly report the current scheduler in case of inherited scoped values
1 parent b0add67 commit 1c19c0b

4 files changed

Lines changed: 97 additions & 104 deletions

File tree

core/src/main/java/io/netty/loom/EventLoopScheduler.java

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,32 @@ public EventLoopScheduler get() {
2525
}
2626
}
2727

28+
/**
29+
* PerCarrier Read-Sub-Pollers are not created through the vThreadFactory and although the NettyScheduler
30+
* can dynamically assign them to this scheduler, while running, won't see this scheduler as the current one.
31+
* This means that if they try to unpark some virtual thread which belong to this scheduler,
32+
* they can still wakeup the carrier thread. It's more a performance defect, but won't affect correctness.
33+
*/
34+
public record SchedulerContext(long vThreadId, SchedulerRef scheduler) {
35+
36+
/**
37+
* Get the assigned scheduler for the current thread, or null if this thread is not assigned to any scheduler.
38+
*/
39+
@Override
40+
public SchedulerRef scheduler() {
41+
return (Thread.currentThread().threadId() == vThreadId) ? scheduler : null;
42+
}
43+
}
44+
2845
private static final long MAX_WAIT_TASKS_NS = TimeUnit.HOURS.toNanos(1);
2946
// These are the soft-guaranteed yield times for the event loop whilst Thread.yield() is called.
3047
// Based on the status of the event loop (resuming from blocking or non-blocking, controlled by the running flag)
3148
// a different limit is applied.
3249
private static final long RUNNING_YIELD_US = TimeUnit.MICROSECONDS.toNanos(Integer.getInteger("io.netty.loom.running.yield.us", 1));
3350
private static final long IDLE_YIELD_US = TimeUnit.MICROSECONDS.toNanos(Integer.getInteger("io.netty.loom.idle.yield.us", 1));
3451
// This is required to allow sub-pollers to run on the correct scheduler
35-
private static final ScopedValue<SchedulerRef> CURRENT_SCHEDULER = ScopedValue.newInstance();
36-
private static final SchedulerRef EMPTY_REF = new SchedulerRef(null);
52+
private static final ScopedValue<SchedulerContext> CURRENT_SCHEDULER = ScopedValue.newInstance();
53+
private static final SchedulerContext EMPTY_SCHEDULER_CONTEXT = new SchedulerContext(-1, null);
3754
private final MpscUnboundedStream<Runnable> runQueue;
3855
private final ManualIoEventLoop ioEventLoop;
3956
private final Thread eventLoopThread;
@@ -42,17 +59,20 @@ public EventLoopScheduler get() {
4259
private volatile Runnable eventLoopContinuatioToRun;
4360
private final ThreadFactory vThreadFactory;
4461
private final AtomicBoolean eventLoopIsRunning;
45-
private final SchedulerRef publishedReference;
62+
private final SchedulerRef sharedRef;
4663

4764
public EventLoopScheduler(IoEventLoopGroup parent, ThreadFactory threadFactory, IoHandlerFactory ioHandlerFactory, int resumedContinuationsExpectedCount) {
48-
publishedReference = new SchedulerRef(this);
65+
sharedRef = new SchedulerRef(this);
4966
eventLoopIsRunning = new AtomicBoolean(false);
5067
runQueue = new MpscUnboundedStream<>(resumedContinuationsExpectedCount);
5168
carrierThread = threadFactory.newThread(this::virtualThreadSchedulerLoop);
5269
var rawVTFactory = Thread.ofVirtual().factory();
5370
vThreadFactory = runnable ->
5471
NettyScheduler.assignUnstarted(rawVTFactory.newThread(
55-
() -> ScopedValue.where(CURRENT_SCHEDULER, publishedReference).run(runnable)), publishedReference);
72+
() -> // this can be inherited by any thread created in the context of this virtual thread
73+
// but only the original virtual thread will have the correct scheduler context
74+
ScopedValue.where(CURRENT_SCHEDULER, new SchedulerContext(Thread.currentThread().threadId(), sharedRef)).run(runnable)
75+
), sharedRef);
5676
eventLoopThread = vThreadFactory.newThread(() -> FastThreadLocalThread.runWithFastThreadLocal(this::nettyEventLoop));
5777
ioEventLoop = new ManualIoEventLoop(parent, eventLoopThread,
5878
ioExecutor -> new AwakeAwareIoHandler(eventLoopIsRunning, ioHandlerFactory.newHandler(ioExecutor)));
@@ -149,7 +169,7 @@ private void virtualThreadSchedulerLoop() {
149169
runEventLoopContinuation();
150170
}
151171
// the event loop should be fully terminated
152-
publishedReference.ref = null;
172+
sharedRef.ref = null;
153173
runQueue.close();
154174
// StoreLoad barrier
155175
while (!runQueue.isEmpty()) {
@@ -202,11 +222,8 @@ public boolean execute(Thread.VirtualThreadTask task) {
202222
var currentThread = Thread.currentThread();
203223
if (currentThread != eventLoopThread) {
204224
if (currentThread != carrierThread) {
205-
// this is checking for "local" submissions: it assumes that
206-
// currentThreadEventLoopScheduler() is the currently assigned one, and up to date
207-
// WARNING!!!!!
208-
// work-stealing could break this assumption if we don't update the CURRENT_SCHEDULER scoped value accordingly
209-
if (currentThreadEventLoopScheduler() != publishedReference) {
225+
// this is checking for "local" submissions
226+
if (currentThreadSchedulerContext().scheduler() != sharedRef) {
210227
ioEventLoop.wakeup();
211228
LockSupport.unpark(parkedCarrierThread);
212229
}
@@ -220,8 +237,7 @@ public boolean execute(Thread.VirtualThreadTask task) {
220237
return true;
221238
}
222239

223-
public static SchedulerRef currentThreadEventLoopScheduler() {
224-
var ref = CURRENT_SCHEDULER.orElse(EMPTY_REF);
225-
return ref == EMPTY_REF ? null : ref;
240+
public static SchedulerContext currentThreadSchedulerContext() {
241+
return CURRENT_SCHEDULER.orElse(EMPTY_SCHEDULER_CONTEXT);
226242
}
227243
}

core/src/main/java/io/netty/loom/NettyScheduler.java

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -53,20 +53,17 @@ public void onStart(Thread.VirtualThreadTask virtualThreadTask) {
5353
if (currentThread.isVirtual()) {
5454
// TODO https://github.com/openjdk/loom/blob/12ddf39bb59252a8274d8b937bd075b2a6dbc3f8/src/java.base/share/classes/java/lang/VirtualThread.java#L270C18-L270C33
5555
// in theory should be easy to provide a VirtualThreadTask::current method to avoid the ScopedValue lookup
56-
var schedulerRef = EventLoopScheduler.currentThreadEventLoopScheduler();
57-
// TODO per carrier sub-pollers goes here, but we want them to inherit the scheduler from the caller context
56+
var ctx = EventLoopScheduler.currentThreadSchedulerContext();
57+
var schedulerRef = ctx.scheduler();
58+
// See https://github.com/openjdk/loom/blob/12ddf39bb59252a8274d8b937bd075b2a6dbc3f8/src/java.base/share/classes/sun/nio/ch/Poller.java#L723C48-L723C59
5859
if (schedulerRef != null) {
59-
var scheduler = schedulerRef.get();
60-
// See https://github.com/openjdk/loom/blob/12ddf39bb59252a8274d8b937bd075b2a6dbc3f8/src/java.base/share/classes/sun/nio/ch/Poller.java#L723C48-L723C59
61-
if (scheduler != null) {
62-
if (virtualThreadTask.thread().getName().endsWith("-Read-Poller")) {
63-
// attach the assigned scheduler to the task
64-
virtualThreadTask.attach(schedulerRef);
65-
if (scheduler.execute(virtualThreadTask)) {
66-
return;
67-
}
68-
virtualThreadTask.attach(null);
60+
var runningScheduler = schedulerRef.get();
61+
if (runningScheduler != null && virtualThreadTask.thread().getName().endsWith("-Read-Poller")) {
62+
virtualThreadTask.attach(schedulerRef);
63+
if (runningScheduler.execute(virtualThreadTask)) {
64+
return;
6965
}
66+
virtualThreadTask.attach(null);
7067
}
7168
}
7269
}
@@ -89,9 +86,9 @@ public void onStart(Thread.VirtualThreadTask virtualThreadTask) {
8986
public void onContinue(Thread.VirtualThreadTask virtualThreadTask) {
9087
var attachment = virtualThreadTask.attachment();
9188
if (attachment instanceof SchedulerRef ref) {
92-
var scheduler = ref.get();
93-
if (scheduler != null) {
94-
if (scheduler.execute(virtualThreadTask)) {
89+
var assignedScheduler = ref.get();
90+
if (assignedScheduler != null) {
91+
if (assignedScheduler.execute(virtualThreadTask)) {
9592
return;
9693
}
9794
}

core/src/main/java/io/netty/loom/VirtualMultithreadIoEventLoopGroup.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,8 @@ protected IoEventLoop newChild(Executor executor, IoHandlerFactory ioHandlerFact
4848
return customScheduler.ioEventLoop();
4949
}
5050

51+
@Override
52+
public void close() {
53+
shutdownGracefully();
54+
}
5155
}

core/src/test/java/io/netty/loom/VirtualMultithreadIoEventLoopGroupTest.java

Lines changed: 51 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,17 @@
11
package io.netty.loom;
22

3-
import static org.junit.jupiter.api.Assertions.assertEquals;
4-
import static org.junit.jupiter.api.Assertions.assertFalse;
5-
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
6-
import static org.junit.jupiter.api.Assertions.assertNotEquals;
7-
import static org.junit.jupiter.api.Assertions.assertNotSame;
8-
import static org.junit.jupiter.api.Assertions.assertNull;
9-
import static org.junit.jupiter.api.Assertions.assertSame;
10-
import static org.junit.jupiter.api.Assertions.assertTrue;
3+
import static java.util.concurrent.StructuredTaskScope.Joiner.allSuccessfulOrThrow;
4+
import static org.junit.jupiter.api.Assertions.*;
115
import static org.junit.jupiter.api.Assumptions.assumeTrue;
126

13-
import java.io.IOException;
147
import java.net.InetSocketAddress;
15-
import java.net.ServerSocket;
16-
import java.net.Socket;
178
import java.net.URI;
189
import java.net.http.HttpClient;
1910
import java.net.http.HttpRequest;
2011
import java.net.http.HttpResponse;
21-
import java.util.Objects;
22-
import java.util.concurrent.BrokenBarrierException;
23-
import java.util.concurrent.CompletableFuture;
24-
import java.util.concurrent.CountDownLatch;
25-
import java.util.concurrent.CyclicBarrier;
26-
import java.util.concurrent.ExecutionException;
27-
import java.util.concurrent.ExecutorService;
28-
import java.util.concurrent.Executors;
29-
import java.util.concurrent.ThreadFactory;
30-
import java.util.concurrent.TimeUnit;
31-
import java.util.concurrent.TimeoutException;
12+
import java.util.concurrent.*;
3213
import java.util.concurrent.atomic.AtomicBoolean;
3314
import java.util.concurrent.atomic.AtomicInteger;
34-
import java.util.concurrent.locks.LockSupport;
3515

3616
import io.netty.channel.Channel;
3717
import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -331,37 +311,43 @@ public void channelRead(io.netty.channel.ChannelHandlerContext ctx, Object msg)
331311
group.shutdownGracefully();
332312
}
333313

334-
@Test
335-
void schedulerIsInherited() throws InterruptedException, ExecutionException {
336-
var group = new VirtualMultithreadIoEventLoopGroup(1, LocalIoHandler.newFactory());
337-
final Thread expectedCarrier = group.submit(() -> LoomSupport.getCarrierThread(Thread.currentThread())).get();
338-
final CompletableFuture<Thread> vfactoryCarrier = new CompletableFuture<>();
339-
group.execute(() -> {
340-
group.vThreadFactory().newThread(() -> {
341-
vfactoryCarrier.complete(LoomSupport.getCarrierThread(Thread.currentThread()));
342-
}).start();
343-
});
344-
final CompletableFuture<Thread> inheritedCarrier = new CompletableFuture<>();
345-
group.execute(() -> {
346-
group.vThreadFactory().newThread(() -> {
347-
Thread.ofVirtual().start(() -> {
348-
inheritedCarrier.complete(LoomSupport.getCarrierThread(Thread.currentThread()));
349-
});
350-
}).start();
351-
});
352-
final CompletableFuture<Thread> inheritedVFactoryCarrier = new CompletableFuture<>();
353-
group.execute(() -> {
354-
group.vThreadFactory().newThread(() -> {
355-
Thread.ofVirtual().factory().newThread(() -> {
356-
inheritedVFactoryCarrier.complete(LoomSupport.getCarrierThread(Thread.currentThread()));
314+
@Test
315+
void schedulerIsNotInheritedWithThreadOfVirtual() throws InterruptedException, ExecutionException {
316+
try (var group = new VirtualMultithreadIoEventLoopGroup(1, LocalIoHandler.newFactory())) {
317+
final var expectedScheduler = group.submit(() -> EventLoopScheduler.currentThreadSchedulerContext().scheduler().get()).get();
318+
assertNotNull(expectedScheduler);
319+
final var vThreadFactory = group.submit(group::vThreadFactory).get();
320+
var groupEventLoopScheduler = new CompletableFuture<EventLoopScheduler>();
321+
var noEventLoopScheduler = new CompletableFuture<>();
322+
vThreadFactory.newThread(() -> {
323+
Thread.ofVirtual().start(() -> noEventLoopScheduler.complete(EventLoopScheduler.currentThreadSchedulerContext().scheduler()));
324+
groupEventLoopScheduler.complete(EventLoopScheduler.currentThreadSchedulerContext().scheduler().get());
357325
}).start();
358-
}).start();
359-
});
360-
assertEquals(expectedCarrier, vfactoryCarrier.get());
361-
assertEquals(expectedCarrier, inheritedCarrier.get());
362-
assertEquals(expectedCarrier, inheritedVFactoryCarrier.get());
363-
group.shutdownGracefully();
364-
}
326+
assertEquals(expectedScheduler, groupEventLoopScheduler.get());
327+
assertNull(noEventLoopScheduler.get());
328+
}
329+
}
330+
331+
@Test
332+
void schedulerIsInheritedByForkedVTFromTheRightFactory() throws InterruptedException, ExecutionException {
333+
try (var group = new VirtualMultithreadIoEventLoopGroup(1, LocalIoHandler.newFactory())) {
334+
final var expectedEventLoopScheduler = group.submit(() -> EventLoopScheduler.currentThreadSchedulerContext().scheduler().get()).get();
335+
assertNotNull(expectedEventLoopScheduler);
336+
final var vThreadFactory = group.submit(group::vThreadFactory).get();
337+
var forkInheritedScheduler = new CompletableFuture<EventLoopScheduler>();
338+
vThreadFactory.newThread(() -> {
339+
try (var scope = StructuredTaskScope.open(allSuccessfulOrThrow(),
340+
cf -> cf.withThreadFactory(vThreadFactory))) {
341+
var task = scope.fork(() -> EventLoopScheduler.currentThreadSchedulerContext().scheduler().get());
342+
scope.join();
343+
forkInheritedScheduler.complete(task.get());
344+
} catch (InterruptedException e) {
345+
forkInheritedScheduler.completeExceptionally(e);
346+
}
347+
}).start();
348+
assertEquals(expectedEventLoopScheduler, forkInheritedScheduler.get());
349+
}
350+
}
365351

366352
@Test
367353
void eventLoopSchedulerCanMakeProgressIfTheEventLoopIsBlocked() throws BrokenBarrierException, InterruptedException, TimeoutException {
@@ -422,16 +408,20 @@ private static void spinWait(long nanos) {
422408
}
423409
}
424410

425-
/*
426-
427-
@Test
428-
public void testContainsJustBuiltinPollers() throws InterruptedException {
429-
assertContainsJustBuiltinPollers(builtinGlobalScheduler());
430-
}
431-
411+
/*
432412
@Test
433413
public void testPlatformThreadSpawnsVirtualThreads() throws InterruptedException {
434-
testPlatformThreadSpawnsVirtualThreads(builtinGlobalScheduler());
414+
CompletableFuture<Thread.VirtualThreadScheduler> schedulerCompletableFuture = new CompletableFuture<>();
415+
Thread.ofPlatform()
416+
.start(() -> {
417+
Thread.ofVirtual()
418+
.start(() -> {
419+
Thread.VirtualThreadScheduler virtualThreadScheduler = NettyScheduler.internalSchedulerMappings.get(Thread.currentThread());
420+
schedulerCompletableFuture.complete(virtualThreadScheduler);
421+
});
422+
});
423+
Thread.VirtualThreadScheduler virtualThreadScheduler = schedulerCompletableFuture.join();
424+
assertEquals(NettyScheduler.jdkBuildinScheduler, virtualThreadScheduler);
435425
}
436426
437427
@Test
@@ -588,20 +578,6 @@ private static void assertContainsJustBuiltinPollers(NettyScheduler NettySchedul
588578
entry.getValue() == NettyScheduler.getJdkBuildinScheduler()));
589579
}
590580
591-
private void testPlatformThreadSpawnsVirtualThreads(NettyScheduler NettyScheduler) {
592-
CompletableFuture<Thread.VirtualThreadScheduler> schedulerCompletableFuture = new CompletableFuture<>();
593-
Thread.ofPlatform()
594-
.start(() -> {
595-
Thread.ofVirtual()
596-
.start(() -> {
597-
Thread.VirtualThreadScheduler virtualThreadScheduler = NettyScheduler.internalSchedulerMappings.get(Thread.currentThread());
598-
schedulerCompletableFuture.complete(virtualThreadScheduler);
599-
});
600-
});
601-
Thread.VirtualThreadScheduler virtualThreadScheduler = schedulerCompletableFuture.join();
602-
assertEquals(NettyScheduler.jdkBuildinScheduler, virtualThreadScheduler);
603-
}
604-
605581
private void testNettyThreadSpawnsVirtualThreads(NettyScheduler NettyScheduler) throws InterruptedException {
606582
record NettyVirtualThreadContext(Thread ioEventLoopThread,
607583
Thread.VirtualThreadScheduler vtScheduler,Thread.VirtualThreadScheduler vtSchedulerFromScopeValue,

0 commit comments

Comments
 (0)