Skip to content

Commit 4216bdb

Browse files
committed
use closeable mpsc stream
1 parent 7ebbeec commit 4216bdb

3 files changed

Lines changed: 675 additions & 21 deletions

File tree

Lines changed: 358 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,358 @@
1+
package io.netty.loom;
2+
3+
import java.lang.invoke.MethodHandles;
4+
import java.lang.invoke.VarHandle;
5+
6+
/**
7+
* A Multi-Producer Single-Consumer (MPSC) unbounded array queue implementation.
8+
* This queue uses VarHandle for memory operations and stores the CLOSED state
9+
* in the MSB of the producer sequence.
10+
* <p>
11+
* Based on JCTools MpscUnboundedArrayQueue but simplified:
12+
* <ul>
13+
* <li>No Unsafe usage</li>
14+
* <li>No padding fields</li>
15+
* <li>No capacity limits (truly unbounded)</li>
16+
* <li>Uses VarHandle for atomic operations</li>
17+
* <li>MSB of producer sequence stores CLOSED information</li>
18+
* <li>Real producer index = (sequence &amp; ~CLOSED_BIT) &gt;&gt; 1</li>
19+
* </ul>
20+
*
21+
* @param <E> the type of elements held in this queue
22+
*/
23+
public class MpscUnboundedStream<E> implements AutoCloseable {
24+
25+
private static final VarHandle PRODUCER_INDEX;
26+
private static final VarHandle CONSUMER_INDEX;
27+
private static final VarHandle PRODUCER_LIMIT;
28+
private static final VarHandle ARRAY;
29+
30+
static {
31+
try {
32+
MethodHandles.Lookup lookup = MethodHandles.lookup();
33+
PRODUCER_INDEX = lookup.findVarHandle(MpscUnboundedStream.class, "producerIndex", long.class);
34+
CONSUMER_INDEX = lookup.findVarHandle(MpscUnboundedStream.class, "consumerIndex", long.class);
35+
PRODUCER_LIMIT = lookup.findVarHandle(MpscUnboundedStream.class, "producerLimit", long.class);
36+
ARRAY = MethodHandles.arrayElementVarHandle(Object[].class);
37+
} catch (ReflectiveOperationException e) {
38+
throw new ExceptionInInitializerError(e);
39+
}
40+
}
41+
42+
// Sentinel object used to mark a jump to the next buffer
43+
private static final Object JUMP = new Object();
44+
// Marker to indicate buffer has been consumed and can be GC'd
45+
private static final Object BUFFER_CONSUMED = new Object();
46+
47+
// MSB is used to store the CLOSED flag
48+
private static final long CLOSED_BIT = 1L << 63;
49+
// LSB is used during resize (bit 0)
50+
private static final long RESIZE_BIT = 1L;
51+
52+
@SuppressWarnings("FieldMayBeFinal") // Modified via VarHandle
53+
private volatile long producerIndex;
54+
@SuppressWarnings("FieldMayBeFinal") // Modified via VarHandle
55+
private volatile long consumerIndex;
56+
@SuppressWarnings("FieldMayBeFinal") // Modified via VarHandle
57+
private volatile long producerLimit;
58+
59+
private long producerMask;
60+
private volatile Object[] producerBuffer;
61+
private Object[] consumerBuffer;
62+
63+
public MpscUnboundedStream(int initialCapacity) {
64+
// Round up to next power of 2
65+
int capacity = roundToPowerOfTwo(initialCapacity);
66+
this.producerMask = capacity - 1;
67+
this.producerBuffer = new Object[capacity];
68+
this.consumerBuffer = this.producerBuffer;
69+
this.producerIndex = 0;
70+
this.consumerIndex = 0;
71+
// Initialize producer limit to capacity * 2 (since we increment by 2)
72+
this.producerLimit = capacity * 2L;
73+
}
74+
75+
private static int roundToPowerOfTwo(int value) {
76+
if (value <= 1) return 2;
77+
return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
78+
}
79+
80+
/**
81+
* Offers an element to the queue. This method can be called by multiple producers.
82+
*
83+
* @param element the element to add
84+
* @return true if the element was added, false if the queue is closed
85+
*/
86+
public boolean offer(E element) {
87+
if (element == null) {
88+
throw new NullPointerException("Null elements are not allowed");
89+
}
90+
91+
long mask;
92+
Object[] buffer;
93+
long pIndex;
94+
95+
while (true) {
96+
long producerLimit = (long) PRODUCER_LIMIT.getVolatile(this);
97+
pIndex = (long) PRODUCER_INDEX.getVolatile(this);
98+
99+
// Check if queue is closed (MSB set)
100+
if ((pIndex & CLOSED_BIT) != 0) {
101+
return false;
102+
}
103+
104+
// Lower bit is indicative of resize, if we see it we spin until it's cleared
105+
if ((pIndex & RESIZE_BIT) == 1) {
106+
continue;
107+
}
108+
109+
// pIndex is even (lower bit is 0) -> actual index is (pIndex >> 1)
110+
// mask/buffer may get changed by resizing -> capture before CAS
111+
// A successful CAS ties the ordering: lv(pIndex) - [mask/buffer] -> cas(pIndex)
112+
mask = this.producerMask;
113+
buffer = this.producerBuffer;
114+
115+
// Assumption: queue is almost always empty or near empty
116+
// Check producer limit before expensive operations
117+
if (producerLimit <= pIndex) {
118+
int result = offerSlowPath(buffer, pIndex, producerLimit, element);
119+
if (result != 0) { // 0 = CONTINUE_TO_P_INDEX_CAS
120+
return result > 0; // QUEUE_RESIZE returns true
121+
}
122+
}
123+
124+
// Try to claim this index by incrementing producer index by 2
125+
// (we use increments of 2 to have space for the resize bit)
126+
long nextPIndex = pIndex + 2;
127+
if (PRODUCER_INDEX.compareAndSet(this, pIndex, nextPIndex)) {
128+
break;
129+
}
130+
}
131+
132+
// INDEX visible before ELEMENT
133+
// Calculate offset: (pIndex >> 1) & mask, but need to account for CLOSED_BIT
134+
final long offset = ((pIndex & ~CLOSED_BIT) >> 1) & mask;
135+
ARRAY.setRelease(buffer, (int) offset, element); // release element
136+
return true;
137+
}
138+
139+
// Return values
140+
private static final int CONTINUE_TO_P_INDEX_CAS = 0;
141+
private static final int RETRY = -1;
142+
private static final int QUEUE_RESIZE = 1;
143+
144+
/**
145+
* We do not inline resize into offer because we do not resize on fill.
146+
*/
147+
private int offerSlowPath(Object[] buffer, long pIndex, long producerLimit, E element) {
148+
long cIndex = (long) CONSUMER_INDEX.getVolatile(this);
149+
long bufferCapacity = buffer.length;
150+
151+
// Calculate actual producer and consumer indices
152+
long actualPIndex = (pIndex & ~CLOSED_BIT) >> 1;
153+
154+
// Check if we have capacity with current consumer position
155+
if (cIndex + bufferCapacity > actualPIndex) {
156+
// We have capacity, update producer limit
157+
long newLimit = (cIndex + bufferCapacity) * 2; // multiply by 2 since we use pIndex * 2
158+
if (!PRODUCER_LIMIT.compareAndSet(this, producerLimit, newLimit)) {
159+
// CAS failed, retry from top
160+
return RETRY;
161+
} else {
162+
// Continue to pIndex CAS
163+
return CONTINUE_TO_P_INDEX_CAS;
164+
}
165+
}
166+
// Need to resize (unbounded queue, so we always resize when full)
167+
else if (PRODUCER_INDEX.compareAndSet(this, pIndex, pIndex + 1)) {
168+
// Successfully set resize bit, trigger resize
169+
resize(buffer, pIndex, element);
170+
return QUEUE_RESIZE;
171+
} else {
172+
// Failed to set resize bit, retry from top
173+
return RETRY;
174+
}
175+
}
176+
177+
private void resize(Object[] oldBuffer, long pIndex, E element) {
178+
// At this point, pIndex has resize bit set (LSB = 1)
179+
int oldCapacity = oldBuffer.length;
180+
int newCapacity = oldCapacity * 2;
181+
Object[] newBuffer = new Object[newCapacity];
182+
183+
// Update producer mask and buffer
184+
this.producerMask = newCapacity - 1;
185+
this.producerBuffer = newBuffer;
186+
187+
// Calculate where to put the element in the new buffer
188+
// Only strip CLOSED_BIT, the shift handles RESIZE_BIT
189+
long actualIndex = (pIndex & ~CLOSED_BIT) >> 1;
190+
long newOffset = actualIndex & (newCapacity - 1);
191+
ARRAY.setRelease(newBuffer, (int) newOffset, element);
192+
193+
// Link old buffer to new buffer using JUMP
194+
long linkOffset = oldCapacity - 1; // Last slot in old buffer
195+
ARRAY.setRelease(oldBuffer, (int) linkOffset, newBuffer);
196+
ARRAY.setRelease(oldBuffer, (int) linkOffset, JUMP);
197+
198+
// Update producer limit to new capacity
199+
long cIndex = (long) CONSUMER_INDEX.getVolatile(this);
200+
long newLimit = (cIndex + newCapacity) * 2; // multiply by 2 since we use pIndex * 2
201+
PRODUCER_LIMIT.setVolatile(this, newLimit);
202+
203+
// Clear resize bit and advance index by 2
204+
long newPIndex = (pIndex & ~RESIZE_BIT) + 2;
205+
PRODUCER_INDEX.setVolatile(this, newPIndex);
206+
}
207+
208+
/**
209+
* Polls an element from the queue. This method should only be called by a single consumer.
210+
*
211+
* @return the element at the head of the queue, or null if empty
212+
*/
213+
@SuppressWarnings("unchecked")
214+
public E poll() {
215+
long cIndex = this.consumerIndex;
216+
Object[] buffer = this.consumerBuffer;
217+
long mask = buffer.length - 1;
218+
long offset = cIndex & mask;
219+
220+
Object element = ARRAY.getAcquire(buffer, (int) offset);
221+
222+
if (element == null) {
223+
// Queue is empty
224+
return null;
225+
}
226+
227+
if (element == JUMP) {
228+
// Need to jump to the next buffer
229+
return pollNextBuffer(buffer, cIndex, mask);
230+
}
231+
232+
// Clear the slot
233+
ARRAY.setRelease(buffer, (int) offset, null);
234+
235+
// Update consumer index
236+
CONSUMER_INDEX.setRelease(this, cIndex + 1);
237+
238+
return (E) element;
239+
}
240+
241+
@SuppressWarnings("unchecked")
242+
private E pollNextBuffer(Object[] buffer, long cIndex, long mask) {
243+
// Read the next buffer reference from the last slot
244+
long linkOffset = mask; // Last slot (mask is length - 1)
245+
Object[] nextBuffer = (Object[]) ARRAY.getAcquire(buffer, (int) linkOffset);
246+
247+
if (nextBuffer == null) {
248+
// Next buffer not ready yet (should not happen with proper JUMP logic)
249+
return null;
250+
}
251+
252+
// Update consumer buffer and mask
253+
this.consumerBuffer = nextBuffer;
254+
long newMask = nextBuffer.length - 1;
255+
256+
// Mark the link slot as consumed to allow GC of old buffer
257+
ARRAY.setRelease(buffer, (int) linkOffset, BUFFER_CONSUMED);
258+
259+
// Read element from new buffer at same consumer index
260+
long offset = cIndex & newMask;
261+
Object element = ARRAY.getAcquire(nextBuffer, (int) offset);
262+
263+
if (element == null) {
264+
return null;
265+
}
266+
267+
// Clear the slot
268+
ARRAY.setRelease(nextBuffer, (int) offset, null);
269+
270+
// Update consumer index
271+
CONSUMER_INDEX.setRelease(this, cIndex + 1);
272+
273+
return (E) element;
274+
}
275+
276+
/**
277+
* Returns the current size of the queue.
278+
* Uses a stability loop similar to JCTools to ensure consistent reads
279+
* of producer and consumer indices in the face of concurrent modifications.
280+
*
281+
* @return the approximate number of elements in the queue
282+
*/
283+
public int size() {
284+
long cIndexBefore, cIndexAfter;
285+
long pIndex;
286+
287+
// Loop until we get a stable read of consumer index
288+
// (i.e., consumer index doesn't change while we read producer index)
289+
do {
290+
cIndexBefore = (long) CONSUMER_INDEX.getVolatile(this);
291+
pIndex = (long) PRODUCER_INDEX.getVolatile(this);
292+
cIndexAfter = (long) CONSUMER_INDEX.getVolatile(this);
293+
} while (cIndexBefore != cIndexAfter);
294+
295+
// Only strip MSB (CLOSED_BIT), LSB is handled by the shift
296+
long realProducerIndex = (pIndex & ~CLOSED_BIT) >> 1;
297+
298+
long size = realProducerIndex - cIndexAfter;
299+
300+
// Protect against overflow
301+
if (size > Integer.MAX_VALUE) {
302+
return Integer.MAX_VALUE;
303+
}
304+
305+
return (int) size;
306+
}
307+
308+
/**
309+
* Checks if the queue is empty.
310+
* Order matters! Loading consumer before producer allows for producer increments after consumer index is read.
311+
* This ensures this method is conservative in its estimate.
312+
*
313+
* @return true if the queue appears to be empty
314+
*/
315+
public boolean isEmpty() {
316+
// Order matters: consumer before producer
317+
long cIndex = (long) CONSUMER_INDEX.getVolatile(this);
318+
long pIndex = (long) PRODUCER_INDEX.getVolatile(this);
319+
320+
// Only strip MSB (CLOSED_BIT), LSB is handled by the shift
321+
// (cIndex - pIndex) / 2 == 0
322+
return ((cIndex - ((pIndex & ~CLOSED_BIT) >> 1)) == 0);
323+
}
324+
325+
/**
326+
* Marks the queue as closed. After this, no more elements can be offered.
327+
* This is done by setting the MSB of the producer index.
328+
*/
329+
@Override
330+
public void close() {
331+
long pIndex;
332+
while (true) {
333+
pIndex = (long) PRODUCER_INDEX.getVolatile(this);
334+
335+
// Check if already closed
336+
if ((pIndex & CLOSED_BIT) != 0) {
337+
return;
338+
}
339+
340+
// Set the CLOSED bit
341+
long closedPIndex = pIndex | CLOSED_BIT;
342+
if (PRODUCER_INDEX.compareAndSet(this, pIndex, closedPIndex)) {
343+
return;
344+
}
345+
}
346+
}
347+
348+
/**
349+
* Checks if the queue is closed.
350+
*
351+
* @return true if the queue is closed
352+
*/
353+
public boolean isClosed() {
354+
long pIndex = (long) PRODUCER_INDEX.getVolatile(this);
355+
return (pIndex & CLOSED_BIT) != 0;
356+
}
357+
}
358+

0 commit comments

Comments
 (0)