3636import io .netty .channel .uring .IoUring ;
3737import io .netty .channel .uring .IoUringIoHandler ;
3838import io .netty .channel .uring .IoUringServerSocketChannel ;
39- import org .junit .jupiter .api .Test ;
4039import org .junit .jupiter .params .ParameterizedTest ;
4140import org .junit .jupiter .params .provider .MethodSource ;
4241
5756import io .netty .util .internal .ThreadExecutorMap ;
5857import org .junit .jupiter .api .Timeout ;
5958
59+ @ Timeout (10 )
6060public class VirtualMultithreadIoEventLoopGroupTest {
6161
6262 // Transport enumeration to drive tests across available Netty transports.
@@ -68,49 +68,34 @@ boolean isLocal() {
6868 }
6969
7070 boolean isAvailable () {
71- switch (this ) {
72- case NIO :
73- return true ;
74- case EPOLL :
75- return Epoll .isAvailable ();
76- case IO_URING :
77- return IoUring .isAvailable ();
78- case LOCAL :
79- return true ;
80- default :
81- return false ;
82- }
71+ return switch (this ) {
72+ case NIO -> true ;
73+ case EPOLL -> Epoll .isAvailable ();
74+ case IO_URING -> IoUring .isAvailable ();
75+ case LOCAL -> true ;
76+ default -> false ;
77+ };
8378 }
8479
8580 IoHandlerFactory handlerFactory () {
86- switch (this ) {
87- case NIO :
88- return NioIoHandler .newFactory ();
89- case EPOLL :
90- return EpollIoHandler .newFactory ();
91- case IO_URING :
92- return IoUringIoHandler .newFactory ();
93- case LOCAL :
94- return LocalIoHandler .newFactory ();
95- default :
96- throw new IllegalStateException ();
97- }
81+ return switch (this ) {
82+ case NIO -> NioIoHandler .newFactory ();
83+ case EPOLL -> EpollIoHandler .newFactory ();
84+ case IO_URING -> IoUringIoHandler .newFactory ();
85+ case LOCAL -> LocalIoHandler .newFactory ();
86+ default -> throw new IllegalStateException ();
87+ };
9888 }
9989
10090 Class <? extends io .netty .channel .ServerChannel > serverChannelClass () {
101- switch (this ) {
102- case NIO :
103- return NioServerSocketChannel .class ;
104- case EPOLL :
105- return EpollServerSocketChannel .class ;
106- case IO_URING :
107- return IoUringServerSocketChannel .class ;
108- case LOCAL :
109- throw new IllegalStateException (
110- "LOCAL transport does not provide a ServerChannel class for real networking" );
111- default :
112- throw new IllegalStateException ();
113- }
91+ return switch (this ) {
92+ case NIO -> NioServerSocketChannel .class ;
93+ case EPOLL -> EpollServerSocketChannel .class ;
94+ case IO_URING -> IoUringServerSocketChannel .class ;
95+ case LOCAL -> throw new IllegalStateException (
96+ "LOCAL transport does not provide a ServerChannel class for real networking" );
97+ default -> throw new IllegalStateException ();
98+ };
11499 }
115100 }
116101
@@ -181,9 +166,11 @@ public void channelRead(io.netty.channel.ChannelHandlerContext ctx, Object msg)
181166 group .shutdownGracefully ();
182167 }
183168
184- @ Test
185- void virtualEventExecutorGroupCorrectlySetEventExecutor () throws ExecutionException , InterruptedException {
186- var group = new VirtualMultithreadIoEventLoopGroup (1 , NioIoHandler .newFactory ());
169+ @ ParameterizedTest (name = "{index} => transport={0}" )
170+ @ MethodSource ("transportsAllowLocal" )
171+ void virtualEventExecutorGroupCorrectlySetEventExecutor (Transport transport )
172+ throws ExecutionException , InterruptedException {
173+ var group = new VirtualMultithreadIoEventLoopGroup (1 , transport .handlerFactory ());
187174 var ioEventLoop = group .next ();
188175 assertInstanceOf (EventExecutor .class , ioEventLoop );
189176 assertTrue (group .submit (() -> ThreadExecutorMap .currentExecutor () == ioEventLoop ).get ());
@@ -281,12 +268,15 @@ public void channelRead(io.netty.channel.ChannelHandlerContext ctx, Object msg)
281268 group .shutdownGracefully ();
282269 }
283270
284- @ Test
285- void saveWakeupsOnVirtualThreads () throws InterruptedException , ExecutionException {
271+ @ ParameterizedTest (name = "{index} => transport={0}" )
272+ @ MethodSource ("transportsAllowLocal" )
273+ void saveWakeupsOnVirtualThreads (Transport transport ) throws InterruptedException , ExecutionException {
274+ assumeTrue (transport .isAvailable ());
275+ assumeTrue (!transport .isLocal ());
286276 var wakeupCounter = new AtomicInteger ();
287- var nioFactory = NioIoHandler . newFactory ();
277+ IoHandlerFactory baseFactory = transport . handlerFactory ();
288278 IoHandlerFactory counterHandlerFactory = ioExecutor -> {
289- var ioHandler = nioFactory .newHandler (ioExecutor );
279+ var ioHandler = baseFactory .newHandler (ioExecutor );
290280 return new IoHandler () {
291281
292282 @ Override
@@ -331,7 +321,7 @@ public boolean isCompatible(Class<? extends IoHandle> handleType) {
331321 InetSocketAddress inetAddress = new InetSocketAddress (8080 );
332322 var innerVThreadCreationFromVThread = new CompletableFuture <Integer >();
333323 var innerWriteFromVThread = new CompletableFuture <Integer >();
334- var bootstrap = new ServerBootstrap ().group (group ).channel (NioServerSocketChannel . class )
324+ var bootstrap = new ServerBootstrap ().group (group ).channel (transport . serverChannelClass () )
335325 .childHandler (new ChannelInitializer <SocketChannel >() {
336326
337327 @ Override
@@ -382,9 +372,11 @@ public void channelRead(io.netty.channel.ChannelHandlerContext ctx, Object msg)
382372 group .shutdownGracefully ();
383373 }
384374
385- @ Test
386- void schedulerIsNotInheritedWithThreadOfVirtual () throws InterruptedException , ExecutionException {
387- try (var group = new VirtualMultithreadIoEventLoopGroup (1 , LocalIoHandler .newFactory ())) {
375+ @ ParameterizedTest (name = "{index} => transport={0}" )
376+ @ MethodSource ("transportsAllowLocal" )
377+ void schedulerIsNotInheritedWithThreadOfVirtual (Transport transport )
378+ throws InterruptedException , ExecutionException {
379+ try (var group = new VirtualMultithreadIoEventLoopGroup (1 , transport .handlerFactory ())) {
388380 final var expectedScheduler = group
389381 .submit (() -> EventLoopScheduler .currentThreadSchedulerContext ().scheduler ().get ()).get ();
390382 assertNotNull (expectedScheduler );
@@ -401,9 +393,11 @@ void schedulerIsNotInheritedWithThreadOfVirtual() throws InterruptedException, E
401393 }
402394 }
403395
404- @ Test
405- void schedulerIsInheritedByForkedVTFromTheRightFactory () throws InterruptedException , ExecutionException {
406- try (var group = new VirtualMultithreadIoEventLoopGroup (1 , LocalIoHandler .newFactory ())) {
396+ @ ParameterizedTest (name = "{index} => transport={0}" )
397+ @ MethodSource ("transportsAllowLocal" )
398+ void schedulerIsInheritedByForkedVTFromTheRightFactory (Transport transport )
399+ throws InterruptedException , ExecutionException {
400+ try (var group = new VirtualMultithreadIoEventLoopGroup (1 , transport .handlerFactory ())) {
407401 final var expectedEventLoopScheduler = group
408402 .submit (() -> EventLoopScheduler .currentThreadSchedulerContext ().scheduler ().get ()).get ();
409403 assertNotNull (expectedEventLoopScheduler );
@@ -423,9 +417,10 @@ void schedulerIsInheritedByForkedVTFromTheRightFactory() throws InterruptedExcep
423417 }
424418 }
425419
426- @ Test
427- void schedulerIsNotInheritedByForkedVT () throws InterruptedException , ExecutionException {
428- try (var group = new VirtualMultithreadIoEventLoopGroup (1 , LocalIoHandler .newFactory ())) {
420+ @ ParameterizedTest (name = "{index} => transport={0}" )
421+ @ MethodSource ("transportsAllowLocal" )
422+ void schedulerIsNotInheritedByForkedVT (Transport transport ) throws InterruptedException , ExecutionException {
423+ try (var group = new VirtualMultithreadIoEventLoopGroup (1 , transport .handlerFactory ())) {
429424 final var vThreadFactory = group .submit (group ::vThreadFactory ).get ();
430425 var schedulerRef = new CompletableFuture <EventLoopScheduler .SharedRef >();
431426 vThreadFactory .newThread (() -> {
@@ -441,13 +436,14 @@ void schedulerIsNotInheritedByForkedVT() throws InterruptedException, ExecutionE
441436 }
442437 }
443438
444- @ Test
445- @ Timeout (10 )
446- void schedulerIsNotLeakingIfItsThreadFactoryOutliveIt () throws InterruptedException , ExecutionException {
439+ @ ParameterizedTest (name = "{index} => transport={0}" )
440+ @ MethodSource ("transportsAllowLocal" )
441+ void schedulerIsNotLeakingIfItsThreadFactoryOutliveIt (Transport transport )
442+ throws InterruptedException , ExecutionException {
447443 ThreadFactory vThreadFactory ;
448444 WeakReference <EventLoopScheduler > schedulerWeakRef ;
449445 EventLoopScheduler .SharedRef schedulerRef ;
450- try (var group = new VirtualMultithreadIoEventLoopGroup (1 , LocalIoHandler . newFactory ())) {
446+ try (var group = new VirtualMultithreadIoEventLoopGroup (1 , transport . handlerFactory ())) {
451447 vThreadFactory = group .submit (group ::vThreadFactory ).get ();
452448 schedulerRef = group .submit (() -> EventLoopScheduler .currentThreadSchedulerContext ().scheduler ()).get ();
453449 schedulerWeakRef = new WeakReference <>(schedulerRef .get ());
@@ -471,10 +467,11 @@ void schedulerIsNotLeakingIfItsThreadFactoryOutliveIt() throws InterruptedExcept
471467 assertNull (schedulerRefPromise .get ().get ());
472468 }
473469
474- @ Test
475- void virtualThreadCanMakeProgressEvenIfEventLoopIsClosed ()
476- throws InterruptedException , ExecutionException , BrokenBarrierException , TimeoutException {
477- var group = new VirtualMultithreadIoEventLoopGroup (1 , LocalIoHandler .newFactory ());
470+ @ ParameterizedTest (name = "{index} => transport={0}" )
471+ @ MethodSource ("transportsAllowLocal" )
472+ void virtualThreadCanMakeProgressEvenIfEventLoopIsClosed (Transport transport )
473+ throws InterruptedException , ExecutionException , TimeoutException , BrokenBarrierException {
474+ var group = new VirtualMultithreadIoEventLoopGroup (1 , transport .handlerFactory ());
478475 final var barrier = new CyclicBarrier (2 );
479476 final var vThreadFactory = group .submit (group ::vThreadFactory ).get ();
480477 vThreadFactory .newThread (() -> {
@@ -488,10 +485,11 @@ void virtualThreadCanMakeProgressEvenIfEventLoopIsClosed()
488485 barrier .await (5 , TimeUnit .SECONDS );
489486 }
490487
491- @ Test
492- void eventLoopSchedulerCanMakeProgressIfTheEventLoopIsBlocked ()
488+ @ ParameterizedTest (name = "{index} => transport={0}" )
489+ @ MethodSource ("transportsAllowLocal" )
490+ void eventLoopSchedulerCanMakeProgressIfTheEventLoopIsBlocked (Transport transport )
493491 throws BrokenBarrierException , InterruptedException , TimeoutException {
494- var group = new VirtualMultithreadIoEventLoopGroup (1 , NioIoHandler . newFactory ());
492+ var group = new VirtualMultithreadIoEventLoopGroup (1 , transport . handlerFactory ());
495493 var allBlocked = new CyclicBarrier (3 );
496494 group .execute (() -> {
497495 group .vThreadFactory ().newThread (() -> {
@@ -512,11 +510,12 @@ void eventLoopSchedulerCanMakeProgressIfTheEventLoopIsBlocked()
512510 }
513511 }
514512
515- @ Test
516- void testFairness () throws ExecutionException , InterruptedException {
513+ @ ParameterizedTest (name = "{index} => transport={0}" )
514+ @ MethodSource ("transportsAllowLocal" )
515+ void testFairness (Transport transport ) throws ExecutionException , InterruptedException {
517516 final long V_TASK_DURATION_NS = TimeUnit .MILLISECONDS .toNanos (100 );
518517 int tasks = 4 ;
519- var group = new VirtualMultithreadIoEventLoopGroup (1 , NioIoHandler . newFactory ());
518+ var group = new VirtualMultithreadIoEventLoopGroup (1 , transport . handlerFactory ());
520519 var interleavingVirtualThreads = new AtomicBoolean (false );
521520
522521 var nonBlockingTasksCompleted = new CountDownLatch (tasks );
@@ -547,19 +546,21 @@ private static void spinWait(long nanos) {
547546 }
548547 }
549548
550- @ Test
551- void testPlatformThreadSpawnsVirtualThreads () throws ExecutionException , InterruptedException {
552- try (var group = new VirtualMultithreadIoEventLoopGroup (1 , LocalIoHandler .newFactory ());
549+ @ ParameterizedTest (name = "{index} => transport={0}" )
550+ @ MethodSource ("transportsAllowLocal" )
551+ void testPlatformThreadSpawnsVirtualThreads (Transport transport ) throws ExecutionException , InterruptedException {
552+ try (var group = new VirtualMultithreadIoEventLoopGroup (1 , transport .handlerFactory ());
553553 var executor = Executors .newVirtualThreadPerTaskExecutor ()) {
554554 var scheduler = executor .submit (() -> EventLoopScheduler .currentThreadSchedulerContext ().scheduler ());
555555 assertNull (scheduler .get ());
556556 }
557557 }
558558
559- @ Test
560- void testBlockingIO () throws IOException , InterruptedException , ExecutionException {
559+ @ ParameterizedTest (name = "{index} => transport={0}" )
560+ @ MethodSource ("transportsAllowLocal" )
561+ void testBlockingIO (Transport transport ) throws IOException , InterruptedException , ExecutionException {
561562 assumeTrue (NettyScheduler .perCarrierPollers ());
562- try (var group = new VirtualMultithreadIoEventLoopGroup (1 , NioIoHandler . newFactory ());
563+ try (var group = new VirtualMultithreadIoEventLoopGroup (1 , transport . handlerFactory ());
563564 var serverAcceptor = new ServerSocket (0 )) {
564565 var serverSocketPromise = new CompletableFuture <Socket >();
565566 Thread .ofVirtual ().start (() -> {
@@ -602,10 +603,12 @@ void testBlockingIO() throws IOException, InterruptedException, ExecutionExcepti
602603 }
603604 }
604605
605- @ Test
606- void testShutdownSchedulerOnBlockingIO () throws IOException , InterruptedException , ExecutionException {
606+ @ ParameterizedTest (name = "{index} => transport={0}" )
607+ @ MethodSource ("transportsAllowLocal" )
608+ void testShutdownSchedulerOnBlockingIO (Transport transport )
609+ throws IOException , InterruptedException , ExecutionException {
607610 assumeTrue (NettyScheduler .perCarrierPollers ());
608- try (var group = new VirtualMultithreadIoEventLoopGroup (1 , LocalIoHandler . newFactory ());
611+ try (var group = new VirtualMultithreadIoEventLoopGroup (1 , transport . handlerFactory ());
609612 var serverAcceptor = new ServerSocket (0 )) {
610613 var serverSocketPromise = new CompletableFuture <Socket >();
611614 Thread .ofVirtual ().start (() -> {
@@ -667,11 +670,13 @@ void testShutdownSchedulerOnBlockingIO() throws IOException, InterruptedExceptio
667670 }
668671 }
669672
670- @ Test
671- void testShutdownSchedulerOnLongBlockingIO () throws IOException , InterruptedException , ExecutionException {
673+ @ ParameterizedTest (name = "{index} => transport={0}" )
674+ @ MethodSource ("transportsAllowLocal" )
675+ void testShutdownSchedulerOnLongBlockingIO (Transport transport )
676+ throws IOException , InterruptedException , ExecutionException {
672677 assumeTrue (NettyScheduler .perCarrierPollers ());
673678 int bytesToWrite = 16 ;
674- try (var group = new VirtualMultithreadIoEventLoopGroup (1 , LocalIoHandler . newFactory ());
679+ try (var group = new VirtualMultithreadIoEventLoopGroup (1 , transport . handlerFactory ());
675680 var serverAcceptor = new ServerSocket (0 )) {
676681 var serverSocketPromise = new CompletableFuture <Socket >();
677682 Thread .ofVirtual ().start (() -> {
0 commit comments