3737import io .netty .handler .codec .http .HttpServerCodec ;
3838import io .netty .handler .codec .http .HttpUtil ;
3939import io .netty .handler .codec .http .HttpVersion ;
40- import io .netty .loom .EventLoopSchedulerType ;
4140import io .netty .loom .VirtualMultithreadIoEventLoopGroup ;
41+ import io .netty .loom .VirtualMultithreadManualIoEventLoopGroup ;
4242import io .netty .util .AsciiString ;
4343import io .netty .util .CharsetUtil ;
4444import org .apache .hc .client5 .http .ConnectionKeepAliveStrategy ;
7272 * <p>
7373 * Usage: java -cp benchmark-runner.jar
7474 * io.netty.loom.benchmark.runner.HandoffHttpServer \ --port 8081 \ --mock-url
75- * http://localhost:8080/fruits \ --threads 2 \ --use-custom-scheduler true \
7675 * --io epoll
7776 */
7877public class HandoffHttpServer {
@@ -81,54 +80,53 @@ public enum IO {
8180 EPOLL , NIO , IO_URING
8281 }
8382
83+ public enum Mode {
84+ NON_VIRTUAL_NETTY , REACTIVE , VIRTUAL_NETTY , NETTY_SCHEDULER
85+ }
86+
8487 private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper ();
8588 private static final ByteBuf HEALTH_RESPONSE = Unpooled
8689 .unreleasableBuffer (Unpooled .copiedBuffer ("OK" , CharsetUtil .UTF_8 ));
8790
91+ // Same JSON as MockHttpServer.CACHED_RESPONSE — used in mockless mode
92+ private static final byte [] CACHED_JSON_BYTES = """
93+ {
94+ "fruits": [
95+ {"name": "Apple", "color": "Red", "price": 1.20},
96+ {"name": "Banana", "color": "Yellow", "price": 0.50},
97+ {"name": "Orange", "color": "Orange", "price": 0.80},
98+ {"name": "Grape", "color": "Purple", "price": 2.00},
99+ {"name": "Mango", "color": "Yellow", "price": 1.50},
100+ {"name": "Strawberry", "color": "Red", "price": 3.00},
101+ {"name": "Blueberry", "color": "Blue", "price": 4.00},
102+ {"name": "Pineapple", "color": "Yellow", "price": 2.50},
103+ {"name": "Watermelon", "color": "Green", "price": 5.00},
104+ {"name": "Kiwi", "color": "Brown", "price": 1.00}
105+ ]
106+ }
107+ """ .getBytes (java .nio .charset .StandardCharsets .UTF_8 );
108+
88109 private final int port ;
89110 private final String mockUrl ;
90111 private final int threads ;
91- private final boolean useCustomScheduler ;
92112 private final IO io ;
93113 private final boolean silent ;
94- private final boolean noTimeout ;
95- private final boolean useReactive ;
96- private final EventLoopSchedulerType schedulerType ;
114+ private final boolean mockless ;
115+ private final Mode mode ;
97116
98117 private MultiThreadIoEventLoopGroup workerGroup ;
99118 private Channel serverChannel ;
100119 private Supplier <ThreadFactory > threadFactorySupplier ;
101120
102- public HandoffHttpServer (int port , String mockUrl , int threads , boolean useCustomScheduler , IO io ) {
103- this (port , mockUrl , threads , useCustomScheduler , io , false , false , false );
104- }
105-
106- public HandoffHttpServer (int port , String mockUrl , int threads , boolean useCustomScheduler , IO io , boolean silent ) {
107- this (port , mockUrl , threads , useCustomScheduler , io , silent , false , false );
108- }
109-
110- public HandoffHttpServer (int port , String mockUrl , int threads , boolean useCustomScheduler , IO io , boolean silent ,
111- boolean noTimeout ) {
112- this (port , mockUrl , threads , useCustomScheduler , io , silent , noTimeout , false );
113- }
114-
115- public HandoffHttpServer (int port , String mockUrl , int threads , boolean useCustomScheduler , IO io , boolean silent ,
116- boolean noTimeout , boolean useReactive ) {
117- this (port , mockUrl , threads , useCustomScheduler , io , silent , noTimeout , useReactive ,
118- EventLoopSchedulerType .FIFO );
119- }
120-
121- public HandoffHttpServer (int port , String mockUrl , int threads , boolean useCustomScheduler , IO io , boolean silent ,
122- boolean noTimeout , boolean useReactive , EventLoopSchedulerType schedulerType ) {
121+ public HandoffHttpServer (int port , String mockUrl , int threads , IO io , boolean silent , boolean mockless ,
122+ Mode mode ) {
123123 this .port = port ;
124124 this .mockUrl = mockUrl ;
125125 this .threads = threads ;
126- this .useCustomScheduler = useCustomScheduler ;
127126 this .io = io ;
128127 this .silent = silent ;
129- this .noTimeout = noTimeout ;
130- this .useReactive = useReactive ;
131- this .schedulerType = schedulerType == null ? EventLoopSchedulerType .FIFO : schedulerType ;
128+ this .mockless = mockless ;
129+ this .mode = mode ;
132130 }
133131
134132 public void start () throws InterruptedException {
@@ -138,26 +136,48 @@ public void start() throws InterruptedException {
138136 case IO_URING -> IoUringIoHandler .newFactory ();
139137 };
140138
141- Class <? extends ServerSocketChannel > serverChannelClass = switch (io ) {
142- case NIO -> NioServerSocketChannel .class ;
143- case EPOLL -> EpollServerSocketChannel .class ;
144- case IO_URING -> IoUringServerSocketChannel .class ;
145- };
146-
147- Class <? extends io .netty .channel .socket .SocketChannel > clientChannelClass = switch (io ) {
148- case NIO -> io .netty .channel .socket .nio .NioSocketChannel .class ;
149- case EPOLL -> io .netty .channel .epoll .EpollSocketChannel .class ;
150- case IO_URING -> io .netty .channel .uring .IoUringSocketChannel .class ;
151- };
152-
153- if (useCustomScheduler ) {
154- var group = new VirtualMultithreadIoEventLoopGroup (threads , ioHandlerFactory , schedulerType );
155- threadFactorySupplier = group ::vThreadFactory ;
156- workerGroup = group ;
157- } else {
158- workerGroup = new MultiThreadIoEventLoopGroup (threads , ioHandlerFactory );
159- var defaultFactory = Thread .ofVirtual ().factory ();
160- threadFactorySupplier = () -> defaultFactory ;
139+ final Class <? extends ServerSocketChannel > serverChannelClass ;
140+ final Class <? extends io .netty .channel .socket .SocketChannel > clientChannelClass ;
141+
142+ switch (mode ) {
143+ case VIRTUAL_NETTY -> {
144+ var group = new VirtualMultithreadManualIoEventLoopGroup (threads , NioIoHandler .newFactory ());
145+ workerGroup = group ;
146+ var defaultFactory = Thread .ofVirtual ().factory ();
147+ threadFactorySupplier = () -> defaultFactory ;
148+ serverChannelClass = NioServerSocketChannel .class ;
149+ clientChannelClass = io .netty .channel .socket .nio .NioSocketChannel .class ;
150+ }
151+ case NETTY_SCHEDULER -> {
152+ serverChannelClass = switch (io ) {
153+ case NIO -> NioServerSocketChannel .class ;
154+ case EPOLL -> EpollServerSocketChannel .class ;
155+ case IO_URING -> IoUringServerSocketChannel .class ;
156+ };
157+ clientChannelClass = switch (io ) {
158+ case NIO -> io .netty .channel .socket .nio .NioSocketChannel .class ;
159+ case EPOLL -> io .netty .channel .epoll .EpollSocketChannel .class ;
160+ case IO_URING -> io .netty .channel .uring .IoUringSocketChannel .class ;
161+ };
162+ var group = new VirtualMultithreadIoEventLoopGroup (threads , ioHandlerFactory );
163+ threadFactorySupplier = group ::vThreadFactory ;
164+ workerGroup = group ;
165+ }
166+ default -> {
167+ serverChannelClass = switch (io ) {
168+ case NIO -> NioServerSocketChannel .class ;
169+ case EPOLL -> EpollServerSocketChannel .class ;
170+ case IO_URING -> IoUringServerSocketChannel .class ;
171+ };
172+ clientChannelClass = switch (io ) {
173+ case NIO -> io .netty .channel .socket .nio .NioSocketChannel .class ;
174+ case EPOLL -> io .netty .channel .epoll .EpollSocketChannel .class ;
175+ case IO_URING -> io .netty .channel .uring .IoUringSocketChannel .class ;
176+ };
177+ workerGroup = new MultiThreadIoEventLoopGroup (threads , ioHandlerFactory );
178+ var defaultFactory = Thread .ofVirtual ().factory ();
179+ threadFactorySupplier = () -> defaultFactory ;
180+ }
161181 }
162182 ServerBootstrap b = new ServerBootstrap ();
163183 b .group (workerGroup ).channel (serverChannelClass ).childOption (ChannelOption .TCP_NODELAY , true )
@@ -167,8 +187,8 @@ protected void initChannel(SocketChannel ch) {
167187 ChannelPipeline p = ch .pipeline ();
168188 p .addLast (new HttpServerCodec ());
169189 p .addLast (new HttpObjectAggregator (65536 ));
170- if (useReactive ) {
171- p .addLast (new ReactiveHandoffHandler (mockUrl , noTimeout , clientChannelClass ));
190+ if (mode == Mode . REACTIVE ) {
191+ p .addLast (new ReactiveHandoffHandler (mockUrl , clientChannelClass ));
172192 } else {
173193 p .addLast (new HandoffHandler ());
174194 }
@@ -178,17 +198,18 @@ protected void initChannel(SocketChannel ch) {
178198 serverChannel = b .bind (port ).sync ().channel ();
179199 if (!silent ) {
180200 System .out .printf ("Handoff HTTP Server started on port %d%n" , port );
181- System .out .printf (" Mode: %s%n" , useReactive ? "Reactive (Project Reactor)" : "Virtual Thread" );
182- System .out .printf (" Mock URL: %s%n" , mockUrl );
183- System .out .printf (" Threads: %d%n" , threads );
184- if (!useReactive ) {
185- System .out .printf (" Custom Scheduler: %s%n" , useCustomScheduler );
186- if (useCustomScheduler ) {
187- System .out .printf (" Scheduler Type: %s%n" , schedulerType );
188- }
201+ System .out .printf (" Mode: %s%n" , switch (mode ) {
202+ case NON_VIRTUAL_NETTY -> "Non-Virtual Netty (platform IO + VT blocking)" ;
203+ case REACTIVE -> "Reactive (pure async, no VTs)" ;
204+ case VIRTUAL_NETTY -> "Virtual Netty (IO loops as VTs on ForkJoinPool)" ;
205+ case NETTY_SCHEDULER -> "Netty Scheduler (platform IO + Netty VT scheduler)" ;
206+ });
207+ System .out .printf (" Mockless: %s%n" , mockless );
208+ if (!mockless ) {
209+ System .out .printf (" Mock URL: %s%n" , mockUrl );
189210 }
190- System .out .printf (" I/O : %s %n" , io );
191- System .out .printf (" No Timeout : %s%n" , noTimeout );
211+ System .out .printf (" Threads : %d %n" , threads );
212+ System .out .printf (" I/O : %s%n" , mode == Mode . VIRTUAL_NETTY ? "NIO (forced)" : io );
192213 }
193214 }
194215
@@ -218,12 +239,16 @@ private class HandoffHandler extends SimpleChannelInboundHandler<FullHttpRequest
218239 private ExecutorService orderedExecutorService ;
219240
220241 HandoffHandler () {
221- ConnectionKeepAliveStrategy keepAliveStrategy = (HttpResponse response ,
222- HttpContext context ) -> TimeValue .NEG_ONE_MILLISECOND ;
223- BasicHttpClientConnectionManager cm = new BasicHttpClientConnectionManager ();
224- RequestConfig requestConfig = noTimeout ? NO_TIMEOUT_HTTP_REQUEST_CONFIG : RequestConfig .DEFAULT ;
225- httpClient = HttpClientBuilder .create ().setConnectionManager (cm ).setDefaultRequestConfig (requestConfig )
226- .setConnectionManagerShared (false ).setKeepAliveStrategy (keepAliveStrategy ).build ();
242+ if (mockless ) {
243+ httpClient = null ;
244+ } else {
245+ ConnectionKeepAliveStrategy keepAliveStrategy = (HttpResponse response ,
246+ HttpContext context ) -> TimeValue .NEG_ONE_MILLISECOND ;
247+ BasicHttpClientConnectionManager cm = new BasicHttpClientConnectionManager ();
248+ httpClient = HttpClientBuilder .create ().setConnectionManager (cm )
249+ .setDefaultRequestConfig (NO_TIMEOUT_HTTP_REQUEST_CONFIG ).setConnectionManagerShared (false )
250+ .setKeepAliveStrategy (keepAliveStrategy ).build ();
251+ }
227252 }
228253
229254 @ Override
@@ -244,10 +269,16 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request)
244269 }
245270
246271 if (uri .equals ("/" ) || uri .startsWith ("/fruits" )) {
247- // Hand off to virtual thread for blocking processing
248- orderedExecutorService .execute (() -> {
249- doBlockingProcessing (ctx , eventLoop , keepAlive );
250- });
272+ // Hand off to virtual thread for processing
273+ if (mockless ) {
274+ orderedExecutorService .execute (() -> {
275+ doMocklessProcessing (ctx , eventLoop , keepAlive );
276+ });
277+ } else {
278+ orderedExecutorService .execute (() -> {
279+ doBlockingProcessing (ctx , eventLoop , keepAlive );
280+ });
281+ }
251282 return ;
252283 }
253284
@@ -294,6 +325,28 @@ private void doBlockingProcessing(ChannelHandlerContext ctx, IoEventLoop eventLo
294325 }
295326 }
296327
328+ private void doMocklessProcessing (ChannelHandlerContext ctx , IoEventLoop eventLoop , boolean keepAlive ) {
329+ try {
330+ // Same Jackson work as doBlockingProcessing, without the HTTP call
331+ FruitsResponse fruitsResponse = OBJECT_MAPPER .readValue (CACHED_JSON_BYTES , FruitsResponse .class );
332+ byte [] responseBytes = OBJECT_MAPPER .writeValueAsBytes (fruitsResponse );
333+ eventLoop .execute (() -> {
334+ ByteBuf content = Unpooled .wrappedBuffer (responseBytes );
335+ sendResponse (ctx , content , HttpHeaderValues .APPLICATION_JSON , keepAlive );
336+ });
337+ } catch (Throwable e ) {
338+ eventLoop .execute (() -> {
339+ ByteBuf content = Unpooled .copiedBuffer ("{\" error\" :\" " + e .getMessage () + "\" }" ,
340+ CharsetUtil .UTF_8 );
341+ FullHttpResponse response = new DefaultFullHttpResponse (HttpVersion .HTTP_1_1 ,
342+ HttpResponseStatus .INTERNAL_SERVER_ERROR , content );
343+ response .headers ().set (HttpHeaderNames .CONTENT_TYPE , HttpHeaderValues .APPLICATION_JSON );
344+ response .headers ().setInt (HttpHeaderNames .CONTENT_LENGTH , content .readableBytes ());
345+ ctx .writeAndFlush (response ).addListener (ChannelFutureListener .CLOSE );
346+ });
347+ }
348+ }
349+
297350 private void sendResponse (ChannelHandlerContext ctx , ByteBuf content , AsciiString contentType ,
298351 boolean keepAlive ) {
299352 FullHttpResponse response = new DefaultFullHttpResponse (HttpVersion .HTTP_1_1 , HttpResponseStatus .OK ,
@@ -319,7 +372,9 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
319372 try {
320373 orderedExecutorService .execute (() -> {
321374 try {
322- httpClient .close ();
375+ if (httpClient != null ) {
376+ httpClient .close ();
377+ }
323378 } catch (IOException e ) {
324379 } finally {
325380 orderedExecutorService .shutdown ();
@@ -332,37 +387,31 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
332387 }
333388
334389 public static void main (String [] args ) throws InterruptedException {
335- // Parse arguments
336390 int port = 8081 ;
337391 String mockUrl = "http://localhost:8080/fruits" ;
338392 int threads = 1 ;
339- boolean useCustomScheduler = false ;
340393 IO io = IO .EPOLL ;
341394 boolean silent = false ;
342- boolean noTimeout = false ;
343- boolean useReactive = false ;
344- EventLoopSchedulerType schedulerType = EventLoopSchedulerType .FIFO ;
395+ boolean mockless = false ;
396+ Mode mode = Mode .NON_VIRTUAL_NETTY ;
345397
346398 for (int i = 0 ; i < args .length ; i ++) {
347399 switch (args [i ]) {
348400 case "--port" -> port = Integer .parseInt (args [++i ]);
349401 case "--mock-url" -> mockUrl = args [++i ];
350402 case "--threads" -> threads = Integer .parseInt (args [++i ]);
351- case "--use-custom-scheduler" -> useCustomScheduler = Boolean .parseBoolean (args [++i ]);
352- case "--scheduler-type" -> schedulerType = EventLoopSchedulerType .valueOf (args [++i ].toUpperCase ());
353403 case "--io" -> io = IO .valueOf (args [++i ].toUpperCase ());
354404 case "--silent" -> silent = true ;
355- case "--no-timeout " -> noTimeout = Boolean . parseBoolean ( args [++ i ]) ;
356- case "--reactive " -> useReactive = Boolean . parseBoolean (args [++i ]);
405+ case "--mockless " -> mockless = true ;
406+ case "--mode " -> mode = Mode . valueOf (args [++i ]. toUpperCase () );
357407 case "--help" -> {
358408 printUsage ();
359409 return ;
360410 }
361411 }
362412 }
363413
364- HandoffHttpServer server = new HandoffHttpServer (port , mockUrl , threads , useCustomScheduler , io , silent ,
365- noTimeout , useReactive , schedulerType );
414+ HandoffHttpServer server = new HandoffHttpServer (port , mockUrl , threads , io , silent , mockless , mode );
366415 server .start ();
367416
368417 // Shutdown hook
@@ -372,25 +421,24 @@ public static void main(String[] args) throws InterruptedException {
372421 }
373422
374423 private static void printUsage () {
375- System .out .println (
376- """
377- Usage: java -cp benchmark-runner.jar io.netty.loom.benchmark.runner.HandoffHttpServer [options]
378-
379- Options:
380- --port <port> HTTP port (default: 8081)
381- --mock-url <url> Mock server URL (default: http://localhost:8080/fruits)
382- --threads <n> Number of event loop threads (default: 1)
383- --use-custom-scheduler <bool> Use custom Netty scheduler (default: false, ignored if --reactive is true)
384- --scheduler-type <fifo> Scheduler type for custom scheduler (default: fifo)
385- --io <epoll|nio|io_uring> I/O type (default: epoll)
386- --no-timeout <true|false> Disable HTTP client timeout (default: false)
387- --reactive <true|false> Use reactive handler with Reactor (default: false)
388- --silent Suppress output messages
389- --help Show this help
390-
391- Modes:
392- Virtual Thread (default): Uses virtual threads with blocking Apache HttpClient
393- Reactive (--reactive true): Uses Project Reactor with non-blocking Reactor Netty HTTP client
394- """ );
424+ System .out .println ("""
425+ Usage: java -cp benchmark-runner.jar io.netty.loom.benchmark.runner.HandoffHttpServer [options]
426+
427+ Options:
428+ --port <port> HTTP port (default: 8081)
429+ --mock-url <url> Mock server URL (default: http://localhost:8080/fruits)
430+ --threads <n> Number of event loop threads (default: 1)
431+ --io <epoll|nio|io_uring> I/O type (default: epoll)
432+ --mockless Skip mock server; do Jackson work inline (default: off)
433+ --mode <mode> Server mode (default: virtual_thread)
434+ --silent Suppress output messages
435+ --help Show this help
436+
437+ Modes:
438+ NON_VIRTUAL_NETTY (default): Platform thread IO + virtual thread blocking work
439+ REACTIVE: Pure Netty async, no virtual threads
440+ VIRTUAL_NETTY: Netty IO event loops as VTs on ForkJoinPool
441+ NETTY_SCHEDULER: Platform thread IO + Netty custom VT scheduler
442+ """ );
395443 }
396444}
0 commit comments