Skip to content

Commit b0add67

Browse files
committed
Change names (again, sorry!) and disable tests to fix
1 parent f16624a commit b0add67

6 files changed

Lines changed: 89 additions & 93 deletions

File tree

benchmarks/src/main/java/io/netty/loom/benchmark/NettySchedulerBenchmark.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import io.netty.channel.nio.NioIoHandler;
44
import io.netty.loom.LoomSupport;
5-
import io.netty.loom.MultithreadVirtualEventExecutorGroup;
5+
import io.netty.loom.VirtualMultithreadIoEventLoopGroup;
66
import org.openjdk.jmh.annotations.Benchmark;
77
import org.openjdk.jmh.annotations.BenchmarkMode;
88
import org.openjdk.jmh.annotations.Fork;
@@ -33,13 +33,13 @@ public class NettySchedulerBenchmark {
3333
@Param({"1000", "100000"})
3434
private int tasks;
3535

36-
private MultithreadVirtualEventExecutorGroup executorGroup;
36+
private VirtualMultithreadIoEventLoopGroup executorGroup;
3737

3838
private ThreadFactory vtFactory;
3939

4040
@Setup
4141
public void setup() throws ExecutionException, InterruptedException {
42-
executorGroup = new MultithreadVirtualEventExecutorGroup(1, NioIoHandler.newFactory());
42+
executorGroup = new VirtualMultithreadIoEventLoopGroup(1, NioIoHandler.newFactory());
4343
vtFactory = executorGroup.submit(executorGroup::vThreadFactory).get();
4444
}
4545

benchmarks/src/main/java/io/netty/loom/benchmark/SchedulerBenchmark.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import io.netty.channel.local.LocalIoHandler;
3232
import io.netty.channel.nio.NioIoHandler;
3333
import io.netty.loom.LoomSupport;
34-
import io.netty.loom.MultithreadVirtualEventExecutorGroup;
34+
import io.netty.loom.VirtualMultithreadIoEventLoopGroup;
3535

3636
@BenchmarkMode(Mode.Throughput)
3737
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@@ -82,7 +82,7 @@ public void setup() throws ExecutionException, InterruptedException {
8282
durationNs = TimeUnit.MICROSECONDS.toNanos(durationUs);
8383
switch (scheduler) {
8484
case Netty:
85-
var virtualGroup = new MultithreadVirtualEventExecutorGroup(1, factory);
85+
var virtualGroup = new VirtualMultithreadIoEventLoopGroup(1, factory);
8686
executorGroup = virtualGroup;
8787
LoomSupport.checkSupported();
8888
vtFactory = executorGroup.submit(virtualGroup::vThreadFactory).get();

core/src/main/java/io/netty/loom/EventLoopScheduler.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ private int runExternalContinuations(long deadlineNs) {
180180
return runContinuations;
181181
}
182182

183-
private boolean eventLoopContinuation(Thread.VirtualThreadTask task) {
183+
private boolean rescheduleEventLoop(Thread.VirtualThreadTask task) {
184184
if (eventLoopContinuatioToRun != null) {
185185
assert task.thread() != eventLoopThread;
186186
return false;
@@ -193,22 +193,25 @@ private boolean eventLoopContinuation(Thread.VirtualThreadTask task) {
193193
}
194194

195195
public boolean execute(Thread.VirtualThreadTask task) {
196-
boolean isEventLoopContinuation = eventLoopContinuation(task);
197-
if (!isEventLoopContinuation) {
196+
boolean eventLoopTask = rescheduleEventLoop(task);
197+
if (!eventLoopTask) {
198198
if (!runQueue.offer(task)) {
199199
return false;
200200
}
201201
}
202-
if (!ioEventLoop.inEventLoop(Thread.currentThread())) {
203-
// this is checking for "local" submissions: it assumes that
204-
// currentThreadEventLoopScheduler() is the currently assigned one, and up to date
205-
// WARNING!!!!!
206-
// work-stealing could break this assumption if we don't update the CURRENT_SCHEDULER scoped value accordingly
207-
if (currentThreadEventLoopScheduler() != publishedReference) {
208-
ioEventLoop.wakeup();
209-
LockSupport.unpark(parkedCarrierThread);
202+
var currentThread = Thread.currentThread();
203+
if (currentThread != eventLoopThread) {
204+
if (currentThread != carrierThread) {
205+
// this is checking for "local" submissions: it assumes that
206+
// currentThreadEventLoopScheduler() is the currently assigned one, and up to date
207+
// WARNING!!!!!
208+
// work-stealing could break this assumption if we don't update the CURRENT_SCHEDULER scoped value accordingly
209+
if (currentThreadEventLoopScheduler() != publishedReference) {
210+
ioEventLoop.wakeup();
211+
LockSupport.unpark(parkedCarrierThread);
212+
}
210213
}
211-
} else if (!isEventLoopContinuation && !eventLoopIsRunning.get()) {
214+
} else if (!eventLoopTask && !eventLoopIsRunning.get()) {
212215
// the event loop thread is allowed to give up cycles to consume external continuations
213216
// whilst is just woken up for I/O
214217
assert eventLoopContinuatioToRun == null;

core/src/main/java/io/netty/loom/NettyScheduler.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import java.lang.invoke.VarHandle;
66
import java.util.concurrent.ConcurrentHashMap;
7-
import java.util.concurrent.atomic.AtomicReference;
87

98
/**
109
* Global scheduler — provides execution for virtual threads that do not specify their own scheduler.
@@ -34,6 +33,10 @@ public NettyScheduler(Thread.VirtualThreadScheduler jdkBuildinScheduler) {
3433
VarHandle.storeStoreFence();
3534
}
3635

36+
Thread.VirtualThreadScheduler jdkBuildinScheduler() {
37+
return jdkBuildinScheduler;
38+
}
39+
3740
@Override
3841
public void onStart(Thread.VirtualThreadTask virtualThreadTask) {
3942
// TODO this is not great for 2 reasons:
@@ -85,10 +88,7 @@ public void onStart(Thread.VirtualThreadTask virtualThreadTask) {
8588
@Override
8689
public void onContinue(Thread.VirtualThreadTask virtualThreadTask) {
8790
var attachment = virtualThreadTask.attachment();
88-
if (attachment instanceof AtomicReference<?>) {
89-
// TODO create a EventLoopSchedulerRef type
90-
@SuppressWarnings("unchecked")
91-
var ref = (AtomicReference<EventLoopScheduler>) attachment;
91+
if (attachment instanceof SchedulerRef ref) {
9292
var scheduler = ref.get();
9393
if (scheduler != null) {
9494
if (scheduler.execute(virtualThreadTask)) {
@@ -101,14 +101,13 @@ public void onContinue(Thread.VirtualThreadTask virtualThreadTask) {
101101
jdkBuildinScheduler.onContinue(virtualThreadTask);
102102
}
103103

104-
// just for benchmark
105-
public Thread.VirtualThreadScheduler getJdkBuildinScheduler() {
106-
return jdkBuildinScheduler;
107-
}
108-
109104
static Thread assignUnstarted(Thread unstarted, SchedulerRef ref) {
110105
INSTANCE.unstartedThreads.put(unstarted, ref);
111106
return unstarted;
112107
}
108+
109+
public static boolean isAvailable() {
110+
return INSTANCE != null;
111+
}
113112
}
114113

core/src/main/java/io/netty/loom/MultithreadVirtualEventExecutorGroup.java renamed to core/src/main/java/io/netty/loom/VirtualMultithreadIoEventLoopGroup.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import io.netty.channel.MultiThreadIoEventLoopGroup;
1010
import io.netty.util.concurrent.FastThreadLocal;
1111

12-
public class MultithreadVirtualEventExecutorGroup extends MultiThreadIoEventLoopGroup {
12+
public class VirtualMultithreadIoEventLoopGroup extends MultiThreadIoEventLoopGroup {
1313

1414
private static final int RESUMED_CONTINUATIONS_EXPECTED_COUNT = Integer.getInteger("io.netty.loom.resumed.continuations", 1024);
1515
private ThreadFactory threadFactory;
@@ -21,10 +21,13 @@ protected EventLoopScheduler initialValue() {
2121
}
2222
};
2323

24-
public MultithreadVirtualEventExecutorGroup(int nThreads, IoHandlerFactory ioHandlerFactory) {
24+
public VirtualMultithreadIoEventLoopGroup(int nThreads, IoHandlerFactory ioHandlerFactory) {
2525
super(nThreads, (Executor) command -> {
2626
throw new UnsupportedOperationException("this executor is not supposed to be used");
2727
}, ioHandlerFactory);
28+
if (!NettyScheduler.isAvailable()) {
29+
throw new IllegalStateException("-Djdk.virtualThreadScheduler.implClass=io.netty.loom.NettyScheduler is required to use VirtualMultithreadIoEventLoopGroup");
30+
}
2831
}
2932

3033
public ThreadFactory vThreadFactory() {

0 commit comments

Comments
 (0)