Skip to content

Commit 7e0b275

Browse files
committed
add concurrent close + drain test
1 parent 7746a8c commit 7e0b275

1 file changed

Lines changed: 189 additions & 1 deletion

File tree

core/src/test/java/io/netty/loom/MpscUnboundedStreamTest.java

Lines changed: 189 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,14 @@
33
import org.junit.jupiter.api.Test;
44
import org.junit.jupiter.api.BeforeEach;
55

6+
import java.util.ArrayList;
7+
import java.util.Arrays;
8+
import java.util.List;
9+
import java.util.concurrent.CountDownLatch;
10+
import java.util.concurrent.TimeUnit;
11+
import java.util.concurrent.atomic.AtomicBoolean;
12+
import java.util.concurrent.atomic.AtomicReference;
13+
614
import static org.junit.jupiter.api.Assertions.*;
715

816
/**
@@ -174,6 +182,126 @@ void testSizeAfterClose() {
174182
assertEquals(1, queue.size());
175183
}
176184

185+
@Test
186+
void testDrainAfterClose() {
187+
// Offer several elements, close, then drain while checking size and isEmpty
188+
for (int i = 0; i < 5; i++) {
189+
assertTrue(queue.offer("item" + i));
190+
}
191+
192+
// Close the queue; offers should be rejected but existing elements remain
193+
queue.close();
194+
assertTrue(queue.isClosed());
195+
assertEquals(5, queue.size());
196+
assertFalse(queue.isEmpty());
197+
assertFalse(queue.offer("shouldFail"));
198+
199+
// Drain and verify size decrements and isEmpty becomes true at the end
200+
for (int i = 0; i < 5; i++) {
201+
assertEquals("item" + i, queue.poll());
202+
assertEquals(4 - i, queue.size());
203+
if (i < 4) {
204+
assertFalse(queue.isEmpty());
205+
}
206+
}
207+
208+
// Now the queue should be empty and further polls return null
209+
assertNull(queue.poll());
210+
assertTrue(queue.isEmpty());
211+
assertEquals(0, queue.size());
212+
213+
// Offers remain rejected after close
214+
assertFalse(queue.offer("afterClose"));
215+
}
216+
217+
@Test
218+
void testConcurrentCloseAndOffers() throws Exception {
219+
// Deterministic concurrent scenario:
220+
// - Preload two elements
221+
// - Start a "winning" producer that offers immediately
222+
// - Start a "losing" producer that only offers after we closed the queue
223+
// - Consumer waits for the winning producer to complete, then closes and drains
224+
final MpscUnboundedStream<String> q = new MpscUnboundedStream<>(4);
225+
q.offer("init0");
226+
q.offer("init1");
227+
228+
final CountDownLatch start = new CountDownLatch(1);
229+
final CountDownLatch winnerDone = new CountDownLatch(1);
230+
final CountDownLatch closed = new CountDownLatch(1);
231+
232+
final AtomicBoolean winnerSucceeded = new AtomicBoolean(false);
233+
final AtomicBoolean loserSucceeded = new AtomicBoolean(false);
234+
final AtomicReference<List<String>> drainedRef = new AtomicReference<>();
235+
236+
Thread producerWinner = new Thread(() -> {
237+
try {
238+
start.await();
239+
boolean r = q.offer("win");
240+
winnerSucceeded.set(r);
241+
} catch (InterruptedException ex) {
242+
Thread.currentThread().interrupt();
243+
} finally {
244+
winnerDone.countDown();
245+
}
246+
}, "producer-winner");
247+
248+
Thread producerLoser = new Thread(() -> {
249+
try {
250+
start.await();
251+
// wait until the consumer signals it has closed the queue
252+
closed.await();
253+
boolean r = q.offer("lose");
254+
loserSucceeded.set(r);
255+
} catch (InterruptedException ex) {
256+
Thread.currentThread().interrupt();
257+
}
258+
}, "producer-loser");
259+
260+
Thread consumer = new Thread(() -> {
261+
try {
262+
start.await();
263+
// Wait for the winner to attempt an offer so we deterministically close after it.
264+
winnerDone.await(1, TimeUnit.SECONDS);
265+
q.close();
266+
closed.countDown();
267+
268+
List<String> drained = new ArrayList<>();
269+
String s;
270+
while ((s = q.poll()) != null) {
271+
drained.add(s);
272+
}
273+
drainedRef.set(drained);
274+
} catch (InterruptedException ex) {
275+
Thread.currentThread().interrupt();
276+
}
277+
}, "consumer");
278+
279+
producerWinner.start();
280+
producerLoser.start();
281+
consumer.start();
282+
283+
// start the race
284+
start.countDown();
285+
286+
// Join threads with timeout to avoid deadlocks flaking the test
287+
producerWinner.join(2000);
288+
producerLoser.join(2000);
289+
consumer.join(2000);
290+
291+
assertTrue(winnerSucceeded.get(), "expected the winning producer to succeed before close");
292+
assertFalse(loserSucceeded.get(), "expected the losing producer to fail after close");
293+
294+
List<String> drained = drainedRef.get();
295+
assertNotNull(drained, "consumer must have drained the queue");
296+
297+
// Preloaded elements should be drained first, then the winning offer
298+
assertEquals(Arrays.asList("init0", "init1", "win"), drained);
299+
300+
assertTrue(q.isClosed());
301+
assertTrue(q.isEmpty());
302+
assertEquals(0, q.size());
303+
}
304+
177305
@Test
178306
void testAlternatingOfferAndPoll() {
179307
for (int i = 0; i < 20; i++) {
@@ -298,5 +426,65 @@ void testSizeConsistency() {
298426
assertTrue(queue.isEmpty());
299427
}
300428
}
301-
}
302429

430+
@Test
431+
void testStressConcurrentProducersCloseAndDrain() throws Exception {
432+
final int producers = 64;
433+
final MpscUnboundedStream<String> q = new MpscUnboundedStream<>(8);
434+
435+
// Each producer will try to add one element labeled by its id
436+
final CountDownLatch start = new CountDownLatch(1);
437+
final CountDownLatch done = new CountDownLatch(producers);
438+
final AtomicBoolean[] succeeded = new AtomicBoolean[producers];
439+
for (int i = 0; i < producers; i++) succeeded[i] = new AtomicBoolean(false);
440+
441+
for (int i = 0; i < producers; i++) {
442+
final int id = i;
443+
Thread t = new Thread(() -> {
444+
try {
445+
start.await();
446+
// small randomized busy spin to increase contention
447+
if ((id & 7) == 0) Thread.yield();
448+
boolean r = q.offer("p" + id);
449+
succeeded[id].set(r);
450+
} catch (InterruptedException ex) {
451+
Thread.currentThread().interrupt();
452+
} finally {
453+
done.countDown();
454+
}
455+
}, "stress-producer-" + i);
456+
t.start();
457+
}
458+
459+
// start producers
460+
start.countDown();
461+
462+
// Wait a tiny bit to let many offers happen concurrently, then close and drain
463+
Thread.sleep(30);
464+
q.close();
465+
466+
// Wait for producers to finish (with timeout)
467+
done.await(5, TimeUnit.SECONDS);
468+
469+
List<String> drained = new ArrayList<>();
470+
String s;
471+
while ((s = q.poll()) != null) {
472+
drained.add(s);
473+
}
474+
475+
// Count successes
476+
for (AtomicBoolean b : succeeded) if (b.get()) ;
477+
478+
// All successful offers must appear in the drained list
479+
for (int i = 0; i < producers; i++) {
480+
if (succeeded[i].get()) {
481+
assertTrue(drained.remove("p" + i), "drained must contain successful producer element p" + i);
482+
}
483+
}
484+
485+
// The queue should be closed and empty after drain
486+
assertTrue(q.isClosed());
487+
assertTrue(q.isEmpty());
488+
assertEquals(0, q.size());
489+
}
490+
}

0 commit comments

Comments
 (0)