Skip to content

Commit ebd531a

Browse files
committed
Fix event loop scheduler leaks
1 parent 17292ac commit ebd531a

2 files changed

Lines changed: 40 additions & 7 deletions

File tree

core/src/main/java/io/netty/loom/EventLoopScheduler.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,19 +66,23 @@ public EventLoopScheduler(IoEventLoopGroup parent, ThreadFactory threadFactory,
6666
eventLoopIsRunning = new AtomicBoolean(false);
6767
runQueue = new MpscUnboundedStream<>(resumedContinuationsExpectedCount);
6868
carrierThread = threadFactory.newThread(this::virtualThreadSchedulerLoop);
69-
var rawVTFactory = Thread.ofVirtual().factory();
70-
vThreadFactory = runnable ->
71-
NettyScheduler.assignUnstarted(rawVTFactory.newThread(
72-
() -> // this can be inherited by any thread created in the context of this virtual thread
73-
// but only the original virtual thread will have the correct scheduler context
74-
ScopedValue.where(CURRENT_SCHEDULER, new SchedulingContext(Thread.currentThread().threadId(), sharedRef)).run(runnable)
75-
), sharedRef);
69+
vThreadFactory = newEventLoopSchedulerFactory(sharedRef);
7670
eventLoopThread = vThreadFactory.newThread(() -> FastThreadLocalThread.runWithFastThreadLocal(this::nettyEventLoop));
7771
ioEventLoop = new ManualIoEventLoop(parent, eventLoopThread,
7872
ioExecutor -> new AwakeAwareIoHandler(eventLoopIsRunning, ioHandlerFactory.newHandler(ioExecutor)));
7973
carrierThread.start();
8074
}
8175

76+
private static ThreadFactory newEventLoopSchedulerFactory(SharedRef sharedRef) {
77+
var rawVTFactory = Thread.ofVirtual().factory();
78+
return runnable ->
79+
NettyScheduler.assignUnstarted(rawVTFactory.newThread(
80+
() -> // this can be inherited by any thread created in the context of this virtual thread
81+
// but only the original virtual thread will have the correct scheduler context
82+
ScopedValue.where(CURRENT_SCHEDULER, new SchedulingContext(Thread.currentThread().threadId(), sharedRef)).run(runnable)
83+
), sharedRef);
84+
}
85+
8286
int externalContinuationsCount() {
8387
return runQueue.size();
8488
}

core/src/test/java/io/netty/loom/VirtualMultithreadIoEventLoopGroupTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import static java.util.concurrent.StructuredTaskScope.Joiner.allSuccessfulOrThrow;
44
import static org.junit.jupiter.api.Assertions.*;
55

6+
import java.lang.ref.SoftReference;
7+
import java.lang.ref.WeakReference;
68
import java.net.InetSocketAddress;
79
import java.net.URI;
810
import java.net.http.HttpClient;
@@ -39,6 +41,7 @@
3941
import io.netty.util.ReferenceCountUtil;
4042
import io.netty.util.concurrent.EventExecutor;
4143
import io.netty.util.internal.ThreadExecutorMap;
44+
import org.junit.jupiter.api.Timeout;
4245

4346
public class VirtualMultithreadIoEventLoopGroupTest {
4447

@@ -366,6 +369,32 @@ void schedulerIsNotInheritedByForkedVT() throws InterruptedException, ExecutionE
366369
}
367370
}
368371

372+
@Test
373+
@Timeout(10)
374+
void schedulerIsNotLeakingIfItsThreadFactoryOutliveIt() throws InterruptedException, ExecutionException {
375+
ThreadFactory vThreadFactory;
376+
WeakReference<EventLoopScheduler> schedulerWeakRef;
377+
EventLoopScheduler.SharedRef schedulerRef;
378+
try (var group = new VirtualMultithreadIoEventLoopGroup(1, LocalIoHandler.newFactory())) {
379+
vThreadFactory = group.submit(group::vThreadFactory).get();
380+
schedulerRef = group.submit(() -> EventLoopScheduler.currentThreadSchedulerContext().scheduler()).get();
381+
schedulerWeakRef = new WeakReference<>(schedulerRef.get());
382+
}
383+
while (schedulerRef.get() != null) {
384+
Thread.yield();
385+
}
386+
while (schedulerWeakRef.get() != null) {
387+
System.gc();
388+
System.runFinalization();
389+
Thread.sleep(100);
390+
}
391+
// we can still run virtual threads from the factory without the scheduler
392+
var schedulerRefPromise = new CompletableFuture<EventLoopScheduler.SharedRef>();
393+
vThreadFactory.newThread(() -> schedulerRefPromise.complete(EventLoopScheduler.currentThreadSchedulerContext().scheduler())).start();
394+
assertSame(schedulerRef, schedulerRefPromise.get());
395+
assertNull(schedulerRefPromise.get().get());
396+
}
397+
369398
@Test
370399
void virtualThreadCanMakeProgressEvenIfEventLoopIsClosed() throws InterruptedException, ExecutionException, BrokenBarrierException, TimeoutException {
371400
var group = new VirtualMultithreadIoEventLoopGroup(1, LocalIoHandler.newFactory());

0 commit comments

Comments
 (0)