Skip to content

Commit a29d91a

Browse files
committed
Add VIRTUAL_NETTY support: ManualIoEventLoopTask, VirtualMultithreadManualIoEventLoopGroup, and VT carrier behavior tests
1 parent 34ae521 commit a29d91a

3 files changed

Lines changed: 363 additions & 0 deletions

File tree

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2026 The Netty VirtualThread Scheduler Project
3+
*
4+
* The Netty VirtualThread Scheduler Project licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance with the
6+
* License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software distributed under the
11+
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
12+
* either express or implied. See the License for the specific language governing permissions
13+
* and limitations under the License.
14+
*/
15+
package io.netty.loom;
16+
17+
import io.netty.channel.IoEventLoopGroup;
18+
import io.netty.channel.IoHandlerFactory;
19+
import io.netty.channel.ManualIoEventLoop;
20+
21+
import java.util.concurrent.TimeUnit;
22+
23+
public class ManualIoEventLoopTask extends ManualIoEventLoop implements Runnable {
24+
25+
private static final long RUNNING_YIELD_US = TimeUnit.MICROSECONDS
26+
.toNanos(Integer.getInteger("io.netty.loom.running.yield.us", 1));
27+
28+
public ManualIoEventLoopTask(IoEventLoopGroup parent, Thread owningThread, IoHandlerFactory factory) {
29+
super(parent, owningThread, factory);
30+
}
31+
32+
@Override
33+
public void run() {
34+
while (!isShuttingDown()) {
35+
run(0, RUNNING_YIELD_US);
36+
Thread.yield();
37+
runNonBlockingTasks(RUNNING_YIELD_US);
38+
Thread.yield();
39+
}
40+
while (!isTerminated()) {
41+
runNow();
42+
Thread.yield();
43+
}
44+
}
45+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2026 The Netty VirtualThread Scheduler Project
3+
*
4+
* The Netty VirtualThread Scheduler Project licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance with the
6+
* License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software distributed under the
11+
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
12+
* either express or implied. See the License for the specific language governing permissions
13+
* and limitations under the License.
14+
*/
15+
package io.netty.loom;
16+
17+
import io.netty.channel.IoEventLoop;
18+
import io.netty.channel.IoHandlerFactory;
19+
import io.netty.channel.MultiThreadIoEventLoopGroup;
20+
21+
import java.util.concurrent.Executor;
22+
import java.util.concurrent.ThreadFactory;
23+
24+
public class VirtualMultithreadManualIoEventLoopGroup extends MultiThreadIoEventLoopGroup {
25+
26+
private ThreadFactory threadFactory;
27+
28+
public VirtualMultithreadManualIoEventLoopGroup(int nThreads, IoHandlerFactory factory) {
29+
super(nThreads, (Executor) null, factory);
30+
}
31+
32+
@Override
33+
protected IoEventLoop newChild(Executor executor, IoHandlerFactory ioHandlerFactory, Object... args) {
34+
if (threadFactory == null) {
35+
threadFactory = Thread.ofVirtual().factory();
36+
}
37+
var manualTask = new ManualIoEventLoopTask(this, null, ioHandlerFactory);
38+
var newThread = threadFactory.newThread(manualTask);
39+
manualTask.setOwningThread(newThread);
40+
newThread.start();
41+
return manualTask;
42+
}
43+
}
Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
/*
2+
* Copyright 2026 The Netty VirtualThread Scheduler Project
3+
*
4+
* The Netty VirtualThread Scheduler Project licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance with the
6+
* License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software distributed under the
11+
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
12+
* either express or implied. See the License for the specific language governing permissions
13+
* and limitations under the License.
14+
*/
15+
package io.netty.loom;
16+
17+
import org.junit.jupiter.params.ParameterizedTest;
18+
import org.junit.jupiter.params.provider.Arguments;
19+
import org.junit.jupiter.params.provider.MethodSource;
20+
21+
import java.util.stream.Stream;
22+
23+
import java.lang.invoke.MethodHandle;
24+
import java.lang.invoke.MethodHandles;
25+
import java.lang.invoke.MethodType;
26+
import java.util.ArrayList;
27+
import java.util.IdentityHashMap;
28+
import java.util.List;
29+
import java.util.Map;
30+
import java.util.concurrent.ConcurrentHashMap;
31+
import java.util.concurrent.locks.LockSupport;
32+
import java.util.concurrent.atomic.AtomicBoolean;
33+
import java.util.concurrent.atomic.AtomicLong;
34+
35+
public class AffinityTest {
36+
37+
private static final MethodHandle CURRENT_CARRIER;
38+
39+
static {
40+
try {
41+
CURRENT_CARRIER = MethodHandles.privateLookupIn(Thread.class, MethodHandles.lookup())
42+
.findStatic(Thread.class, "currentCarrierThread", MethodType.methodType(Thread.class));
43+
} catch (NoSuchMethodException | IllegalAccessException e) {
44+
throw new RuntimeException(e);
45+
}
46+
}
47+
48+
private static Thread currentCarrier() {
49+
try {
50+
return (Thread) CURRENT_CARRIER.invokeExact();
51+
} catch (Throwable t) {
52+
throw new RuntimeException(t);
53+
}
54+
}
55+
56+
enum ParkStrategy {
57+
YIELD, TIMED_PARK, MIXED
58+
}
59+
60+
private static void park(ParkStrategy strategy, long iteration) {
61+
switch (strategy) {
62+
case YIELD -> Thread.yield();
63+
case TIMED_PARK -> LockSupport.parkNanos(1);
64+
case MIXED -> {
65+
if ((iteration & 1) == 0) {
66+
Thread.yield();
67+
} else {
68+
LockSupport.parkNanos(1);
69+
}
70+
}
71+
}
72+
}
73+
74+
private record VThreadStats(IdentityHashMap<Thread, AtomicLong> carrierAttempts, AtomicLong totalSwitches,
75+
AtomicLong totalIterations) {
76+
VThreadStats() {
77+
this(new IdentityHashMap<>(), new AtomicLong(), new AtomicLong());
78+
}
79+
}
80+
81+
static Stream<Arguments> parameters() {
82+
var strategies = ParkStrategy.values();
83+
var builder = Stream.<Arguments>builder();
84+
for (var strategy : strategies) {
85+
builder.add(Arguments.of(strategy, true));
86+
builder.add(Arguments.of(strategy, false));
87+
}
88+
return builder.build();
89+
}
90+
91+
@ParameterizedTest(name = "{0} affinity={1}")
92+
@MethodSource("parameters")
93+
public void testAffinityConvergence(ParkStrategy strategy, boolean affinity) throws InterruptedException {
94+
int n = Runtime.getRuntime().availableProcessors();
95+
var builder = Thread.ofVirtual();
96+
if (affinity) {
97+
builder.roundRobinAffinity();
98+
}
99+
var factory = builder.factory();
100+
Map<Thread, VThreadStats> perVThread = new ConcurrentHashMap<>();
101+
var stop = new AtomicBoolean();
102+
List<Thread> threads = new ArrayList<>();
103+
104+
for (int i = 0; i < n; i++) {
105+
threads.add(factory.newThread(() -> {
106+
var stats = new VThreadStats();
107+
perVThread.put(Thread.currentThread(), stats);
108+
for (long iter = 0; !stop.get(); iter++) {
109+
Thread beforePark = currentCarrier();
110+
park(strategy, iter);
111+
long iterations = stats.totalIterations.get() + 1;
112+
stats.totalIterations.lazySet(iterations);
113+
Thread afterPark = currentCarrier();
114+
var counter = stats.carrierAttempts.computeIfAbsent(afterPark, t -> new AtomicLong());
115+
counter.lazySet(counter.get() + 1);
116+
if (afterPark != beforePark) {
117+
stats.totalSwitches.lazySet(stats.totalSwitches.get() + 1);
118+
}
119+
}
120+
}));
121+
}
122+
threads.forEach(Thread::start);
123+
Thread.sleep(10_000);
124+
stop.set(true);
125+
for (var thread : threads) {
126+
thread.join(5_000);
127+
}
128+
129+
System.out.println("Strategy: " + strategy + ", affinity=" + affinity + " (" + n + " virtual threads, "
130+
+ perVThread.size() + " reported)");
131+
long globalSwitches = 0;
132+
long globalIterations = 0;
133+
for (var entry : perVThread.entrySet()) {
134+
var stats = entry.getValue();
135+
long iterations = stats.totalIterations.get();
136+
if (stats.carrierAttempts.isEmpty()) {
137+
System.out.printf("%s => never scheduled%n", entry.getKey().getName());
138+
continue;
139+
}
140+
var dominant = stats.carrierAttempts.entrySet().stream()
141+
.max(Map.Entry.comparingByValue((a, b) -> Long.compare(a.get(), b.get()))).orElseThrow();
142+
long switches = stats.totalSwitches.get();
143+
globalSwitches += switches;
144+
globalIterations += iterations;
145+
System.out.printf(
146+
"%s => dominant: %s (%d/%d = %.4f%% affinity, %d carriers used, switches: %d/%d = %.6f)%n",
147+
entry.getKey().getName(), dominant.getKey().getName(), dominant.getValue().get(), iterations,
148+
iterations > 0 ? 100.0 * dominant.getValue().get() / iterations : 0.0, stats.carrierAttempts.size(),
149+
switches, iterations, iterations > 0 ? (double) switches / iterations : 0.0);
150+
}
151+
System.out.printf("%nGlobal switches: %d / %d = %.6f%n%n", globalSwitches, globalIterations,
152+
globalIterations > 0 ? (double) globalSwitches / globalIterations : 0.0);
153+
System.out.flush();
154+
}
155+
156+
/**
157+
* Tests that short-lived virtual threads spawned by long-running VTs converge
158+
* to the same carrier as their parent. N long-running VTs continuously
159+
* park/yield; on each iteration they spawn a short-lived child VT that records
160+
* its own carrier. We then check whether the child converges to a dominant
161+
* carrier and whether that carrier matches the parent's dominant one.
162+
*/
163+
@ParameterizedTest(name = "{0} affinity={1}")
164+
@MethodSource("parameters")
165+
public void testChildAffinityConvergence(ParkStrategy strategy, boolean affinity) throws InterruptedException {
166+
int n = Runtime.getRuntime().availableProcessors();
167+
var parentBuilder = Thread.ofVirtual();
168+
if (affinity) {
169+
parentBuilder.roundRobinAffinity();
170+
}
171+
var parentFactory = parentBuilder.factory();
172+
// children inherit affinity from the parent VT
173+
var childFactory = affinity ? Thread.ofVirtual().inheritAffinity().factory() : Thread.ofVirtual().factory();
174+
175+
record ParentChildStats(IdentityHashMap<Thread, AtomicLong> parentCarrierCounts,
176+
IdentityHashMap<Thread, AtomicLong> childCarrierCounts, AtomicLong parentIterations,
177+
AtomicLong childSpawned) {
178+
ParentChildStats() {
179+
this(new IdentityHashMap<>(), new IdentityHashMap<>(), new AtomicLong(), new AtomicLong());
180+
}
181+
}
182+
183+
var stop = new AtomicBoolean();
184+
Map<Thread, ParentChildStats> perParent = new ConcurrentHashMap<>();
185+
List<Thread> parents = new ArrayList<>();
186+
187+
for (int i = 0; i < n; i++) {
188+
parents.add(parentFactory.newThread(() -> {
189+
var stats = new ParentChildStats();
190+
perParent.put(Thread.currentThread(), stats);
191+
for (long iter = 0; !stop.get(); iter++) {
192+
// record parent carrier
193+
Thread parentCarrier = currentCarrier();
194+
var parentCounter = stats.parentCarrierCounts.computeIfAbsent(parentCarrier, t -> new AtomicLong());
195+
parentCounter.lazySet(parentCounter.get() + 1);
196+
stats.parentIterations.lazySet(iter + 1);
197+
198+
// spawn a short-lived child every 16 iterations to avoid overwhelming the
199+
// scheduler; join it before proceeding so no concurrent access to
200+
// childCarrierCounts
201+
if ((iter & 0xF) == 0) {
202+
var childThread = childFactory.newThread(() -> {
203+
Thread childCarrier = currentCarrier();
204+
var childCounter = stats.childCarrierCounts.computeIfAbsent(childCarrier,
205+
t -> new AtomicLong());
206+
childCounter.lazySet(childCounter.get() + 1);
207+
});
208+
childThread.start();
209+
try {
210+
childThread.join();
211+
} catch (InterruptedException e) {
212+
Thread.currentThread().interrupt();
213+
return;
214+
}
215+
stats.childSpawned.lazySet(stats.childSpawned.get() + 1);
216+
}
217+
218+
park(strategy, iter);
219+
}
220+
}));
221+
}
222+
223+
parents.forEach(Thread::start);
224+
Thread.sleep(10_000);
225+
stop.set(true);
226+
for (var parent : parents) {
227+
parent.join(5_000);
228+
}
229+
230+
System.out.println("Strategy: " + strategy + ", affinity=" + affinity + " (" + n + " parent VTs)");
231+
int matchCount = 0;
232+
int reportedCount = 0;
233+
for (var entry : perParent.entrySet()) {
234+
var stats = entry.getValue();
235+
long parentIters = stats.parentIterations.get();
236+
long children = stats.childSpawned.get();
237+
238+
if (stats.parentCarrierCounts.isEmpty() || stats.childCarrierCounts.isEmpty()) {
239+
System.out.printf("%s => insufficient data (parentIters=%d, children=%d)%n", entry.getKey().getName(),
240+
parentIters, children);
241+
continue;
242+
}
243+
244+
var parentDominant = stats.parentCarrierCounts.entrySet().stream()
245+
.max(Map.Entry.comparingByValue((a, b) -> Long.compare(a.get(), b.get()))).orElseThrow();
246+
long parentDominantCount = parentDominant.getValue().get();
247+
248+
var childDominant = stats.childCarrierCounts.entrySet().stream()
249+
.max(Map.Entry.comparingByValue((a, b) -> Long.compare(a.get(), b.get()))).orElseThrow();
250+
long childTotal = stats.childCarrierCounts.values().stream().mapToLong(AtomicLong::get).sum();
251+
long childDominantCount = childDominant.getValue().get();
252+
253+
boolean dominantMatch = parentDominant.getKey() == childDominant.getKey();
254+
if (dominantMatch) {
255+
matchCount++;
256+
}
257+
reportedCount++;
258+
259+
System.out.printf(
260+
"%s => parent dominant: %s (%d/%d = %.2f%%, %d carriers)"
261+
+ " | child dominant: %s (%d/%d = %.2f%%, %d carriers) | match=%b%n",
262+
entry.getKey().getName(), parentDominant.getKey().getName(), parentDominantCount, parentIters,
263+
parentIters > 0 ? 100.0 * parentDominantCount / parentIters : 0.0, stats.parentCarrierCounts.size(),
264+
childDominant.getKey().getName(), childDominantCount, childTotal,
265+
childTotal > 0 ? 100.0 * childDominantCount / childTotal : 0.0, stats.childCarrierCounts.size(),
266+
dominantMatch);
267+
}
268+
269+
double matchRate = reportedCount > 0 ? 100.0 * matchCount / reportedCount : 0.0;
270+
System.out.printf("%nParent-child dominant carrier match: %d / %d = %.2f%%%n%n", matchCount, reportedCount,
271+
matchRate);
272+
System.out.flush();
273+
}
274+
275+
}

0 commit comments

Comments
 (0)