Skip to content

Commit 005213f

Browse files
committed
Expose event loop -> thread factory mapping
1 parent e61c652 commit 005213f

2 files changed

Lines changed: 44 additions & 16 deletions

File tree

core/src/main/java/io/netty/loom/VirtualMultithreadIoEventLoopGroup.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package io.netty.loom;
22

3-
import java.util.ArrayList;
3+
import java.util.*;
44
import java.util.concurrent.Executor;
55
import java.util.concurrent.ThreadFactory;
66
import java.util.concurrent.TimeUnit;
@@ -14,7 +14,8 @@ public class VirtualMultithreadIoEventLoopGroup extends MultiThreadIoEventLoopGr
1414

1515
private static final int RESUMED_CONTINUATIONS_EXPECTED_COUNT = Integer
1616
.getInteger("io.netty.loom.resumed.continuations", 1024);
17-
private ArrayList<EventLoopScheduler> eventLoopSchedulers;
17+
private Map<IoEventLoop, EventLoopScheduler> eventSchedulerMappings;
18+
private List<EventLoopScheduler> schedulers;
1819
private AtomicLong nextScheduler;
1920
private ThreadFactory threadFactory;
2021

@@ -31,6 +32,19 @@ private static void validateNettyAvailability() {
3132
}
3233
}
3334

35+
/**
36+
* Return a {@link ThreadFactory} that creates virtual threads tied to the
37+
* specified {@link IoEventLoop}'s {@link EventLoopScheduler}. Returns
38+
* {@code null} if the provided event loop is not associated with this group.
39+
*/
40+
public ThreadFactory vThreadFactoryOf(IoEventLoop eventLoop) {
41+
EventLoopScheduler scheduler = eventSchedulerMappings.get(eventLoop);
42+
if (scheduler == null) {
43+
return null;
44+
}
45+
return scheduler.virtualThreadFactory();
46+
}
47+
3448
/**
3549
* Return a {@link ThreadFactory} that creates virtual threads tied to an
3650
* {@link EventLoopScheduler} of this group.
@@ -60,23 +74,25 @@ public ThreadFactory vThreadFactory() {
6074
}
6175
// assign a random one
6276
int schedulerIndex = (int) (nextScheduler.getAndIncrement() % executorCount());
63-
return eventLoopSchedulers.get(schedulerIndex).virtualThreadFactory();
77+
return schedulers.get(schedulerIndex).virtualThreadFactory();
6478
}
6579

6680
@Override
6781
protected IoEventLoop newChild(Executor executor, IoHandlerFactory ioHandlerFactory,
6882
@SuppressWarnings("unused") Object... args) {
6983
validateNettyAvailability();
70-
if (eventLoopSchedulers == null) {
71-
eventLoopSchedulers = new ArrayList<>(executorCount());
84+
if (eventSchedulerMappings == null) {
85+
eventSchedulerMappings = new IdentityHashMap<>(executorCount());
86+
schedulers = new ArrayList<>(executorCount());
7287
nextScheduler = new AtomicLong();
7388
}
7489
if (threadFactory == null) {
7590
threadFactory = newDefaultThreadFactory();
7691
}
7792
var customScheduler = new EventLoopScheduler(this, threadFactory, ioHandlerFactory,
7893
RESUMED_CONTINUATIONS_EXPECTED_COUNT);
79-
eventLoopSchedulers.add(customScheduler);
94+
eventSchedulerMappings.put(customScheduler.ioEventLoop(), customScheduler);
95+
schedulers.add(customScheduler);
8096
return customScheduler.ioEventLoop();
8197
}
8298

core/src/test/java/io/netty/loom/VirtualMultithreadIoEventLoopGroupTest.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,15 @@
2020
import java.util.concurrent.atomic.AtomicReference;
2121
import java.util.stream.Stream;
2222

23-
import io.netty.channel.Channel;
24-
import io.netty.channel.ChannelInboundHandlerAdapter;
25-
import io.netty.channel.ChannelInitializer;
26-
import io.netty.channel.IoHandle;
27-
import io.netty.channel.IoHandler;
28-
import io.netty.channel.IoHandlerContext;
29-
import io.netty.channel.IoHandlerFactory;
30-
import io.netty.channel.IoRegistration;
31-
import io.netty.channel.epoll.Epoll;
23+
import io.netty.channel.*;
3224
import io.netty.channel.local.LocalIoHandler;
3325
import io.netty.channel.epoll.EpollIoHandler;
3426
import io.netty.channel.epoll.EpollServerSocketChannel;
3527

3628
import io.netty.channel.uring.IoUring;
3729
import io.netty.channel.uring.IoUringIoHandler;
3830
import io.netty.channel.uring.IoUringServerSocketChannel;
31+
import org.junit.jupiter.api.Test;
3932
import org.junit.jupiter.params.ParameterizedTest;
4033
import org.junit.jupiter.params.provider.MethodSource;
4134

@@ -475,7 +468,7 @@ void schedulerIsNotLeakingIfItsThreadFactoryOutliveIt(Transport transport)
475468
@ParameterizedTest(name = "{index} => transport={0}")
476469
@MethodSource("transportsAllowLocal")
477470
void virtualThreadCanMakeProgressEvenIfEventLoopIsClosed(Transport transport)
478-
throws InterruptedException, ExecutionException, TimeoutException, BrokenBarrierException {
471+
throws InterruptedException, ExecutionException, BrokenBarrierException {
479472
var group = new VirtualMultithreadIoEventLoopGroup(1, transport.handlerFactory());
480473
final var barrier = new CyclicBarrier(2);
481474
final var vThreadFactory = group.submit(group::vThreadFactory).get();
@@ -736,4 +729,23 @@ void testShutdownSchedulerOnLongBlockingIO(Transport transport)
736729
}
737730
}
738731
}
732+
733+
@Test
734+
void vThreadFactoryMappingShouldReturnNullIfNoneIsFound() throws InterruptedException, ExecutionException {
735+
try (var otherGroup = new VirtualMultithreadIoEventLoopGroup(1, LocalIoHandler.newFactory());
736+
var group = new VirtualMultithreadIoEventLoopGroup(1, NioIoHandler.newFactory())) {
737+
var otherEventLoop = otherGroup.next();
738+
assertNull(group.vThreadFactoryOf(otherEventLoop));
739+
}
740+
}
741+
742+
@Test
743+
void vThreadFactoryMappingShouldReturnTheRightOne() throws InterruptedException, ExecutionException {
744+
try (var group = new VirtualMultithreadIoEventLoopGroup(1, NioIoHandler.newFactory())) {
745+
var eventLoopScheduler = group
746+
.submit(() -> EventLoopScheduler.currentThreadSchedulerContext().scheduler().get()).get();
747+
assertSame(eventLoopScheduler.virtualThreadFactory(),
748+
group.vThreadFactoryOf(eventLoopScheduler.ioEventLoop()));
749+
}
750+
}
739751
}

0 commit comments

Comments
 (0)