@@ -46,6 +46,7 @@ public class MpscUnboundedStream<E> implements AutoCloseable {
4646
4747 // MSB is used to store the CLOSED flag
4848 private static final long CLOSED_BIT = 1L << 63 ;
49+ private static final long CLOSED_MASK = ~CLOSED_BIT ;
4950 // LSB is used during resize (bit 0)
5051 private static final long RESIZE_BIT = 1L ;
5152
@@ -78,10 +79,51 @@ public MpscUnboundedStream(int initialCapacity) {
7879 soProducerLimit (mask ); // we know it's all empty to start with
7980 }
8081
82+ private void soProducerLimit (long v ) {
83+ PRODUCER_LIMIT .setRelease (this , v );
84+ }
85+
86+ private long lvProducerLimit () {
87+ return (long ) PRODUCER_LIMIT .getAcquire (this );
88+ }
89+
90+ private long lvProducerIndex () {
91+ return (long ) PRODUCER_INDEX .getAcquire (this );
92+ }
93+
94+ private boolean casProducerIndex (long expect , long newValue ) {
95+ return PRODUCER_INDEX .compareAndSet (this , expect , newValue );
96+ }
97+
98+ private long lvConsumerIndex () {
99+ return (long ) CONSUMER_INDEX .getAcquire (this );
100+ }
101+
102+ private void soConsumerIndex (long v ) {
103+ CONSUMER_INDEX .setRelease (this , v );
104+ }
105+
106+ private void soProducerIndex (long v ) {
107+ PRODUCER_INDEX .setRelease (this , v );
108+ }
109+
110+ private boolean casProducerLimit (long expect , long newValue ) {
111+ return PRODUCER_LIMIT .compareAndSet (this , expect , newValue );
112+ }
113+
114+ private static <E > void soElement (E [] buffer , int offset , E e ) {
115+ ARRAY .setRelease (buffer , offset , e );
116+ }
117+
118+ @ SuppressWarnings ("unchecked" )
119+ private static <E > E lvElement (E [] buffer , int offset ) {
120+ return (E ) ARRAY .getAcquire (buffer , offset );
121+ }
122+
81123 /**
82124 * Offers an element to the queue. This method can be called by multiple producers.
83125 *
84- * @param element the element to add
126+ * @param e the element to add
85127 * @return true if the element was added, false if the queue is closed
86128 */
87129 public boolean offer (E e ) {
@@ -101,7 +143,7 @@ public boolean offer(E e) {
101143 continue ;
102144 }
103145 // higher bit is indicative of closed
104- if ((pIndex & CLOSED_BIT ) == 1 ) {
146+ if ((pIndex & CLOSED_BIT ) != 0 ) {
105147 return false ;
106148 }
107149 // pIndex is even (lower bit is 0) -> actual index is (pIndex >> 1)
@@ -166,14 +208,15 @@ else if (casProducerIndex(pIndex, pIndex + 1)) {
166208
167209 private void resize (long oldMask , E [] oldBuffer , long pIndex , final E e ) {
168210 int newBufferLength = getNextBufferSize (oldBuffer );
211+ @ SuppressWarnings ("unchecked" )
169212 final E [] newBuffer = (E []) new Object [newBufferLength ];
170213
171214 producerBuffer = newBuffer ;
172215 final int newMask = (newBufferLength - 2 ) << 1 ;
173216 producerMask = newMask ;
174217
175- final long offsetInOld = modifiedCalcElementOffset (pIndex , oldMask );
176- final long offsetInNew = modifiedCalcElementOffset (pIndex , newMask );
218+ final int offsetInOld = modifiedCalcElementOffset (pIndex , oldMask );
219+ final int offsetInNew = modifiedCalcElementOffset (pIndex , newMask );
177220
178221
179222 soElement (newBuffer , offsetInNew , e );// element in new array
@@ -220,18 +263,19 @@ protected long availableInQueue(long pIndex, long cIndex) {
220263 * compensated for by reducing the element shift. The computation is constant folded, so there's no cost.
221264 */
222265 private static int modifiedCalcElementOffset (long index , long mask ) {
223- return (int ) ((index & mask ) << 1 );
266+ return (int ) ((index & mask ) >> 1 );
224267 }
225268
269+ @ SuppressWarnings ("unchecked" )
226270 public E poll () {
227271 final E [] buffer = consumerBuffer ;
228272 final long index = consumerIndex ;
229273 final long mask = consumerMask ;
230274
231- final long offset = modifiedCalcElementOffset (index , mask );
275+ final int offset = modifiedCalcElementOffset (index , mask );
232276 Object e = lvElement (buffer , offset );// LoadLoad
233277 if (e == null ) {
234- if (index != lvProducerIndex ()) {
278+ if (index != ( lvProducerIndex () & CLOSED_MASK )) {
235279 // poll() == null iff queue is empty, null element is not strong enough indicator, so we must
236280 // check the producer index. If the queue is indeed not empty we spin until element is
237281 // visible.
@@ -253,14 +297,15 @@ public E poll() {
253297 }
254298
255299 private E [] getNextBuffer (final E [] buffer , final long mask ) {
256- final long nextArrayOffset = nextArrayOffset (mask );
300+ final int nextArrayOffset = nextArrayOffset (mask );
301+ @ SuppressWarnings ("unchecked" )
257302 final E [] nextBuffer = (E []) lvElement (buffer , nextArrayOffset );
258303 soElement (buffer , nextArrayOffset , null );
259304 return nextBuffer ;
260305 }
261306
262307 private E newBufferPoll (E [] nextBuffer , final long index ) {
263- final long offsetInNew = newBufferAndOffset (nextBuffer , index );
308+ final int offsetInNew = newBufferAndOffset (nextBuffer , index );
264309 final E n = lvElement (nextBuffer , offsetInNew );// LoadLoad
265310 if (n == null ) {
266311 throw new IllegalStateException ("new buffer must have at least one element" );
@@ -277,6 +322,45 @@ private int newBufferAndOffset(E[] nextBuffer, final long index) {
277322 return offsetInNew ;
278323 }
279324
325+ public final boolean isEmpty () {
326+ // Order matters!
327+ // Loading consumer before producer allows for producer increments after consumer index is read.
328+ // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
329+ // nothing we can do to make this an exact method.
330+ return (this .lvConsumerIndex () == (this .lvProducerIndex () & CLOSED_MASK ));
331+ }
332+
333+ public final int size () {
334+ // NOTE: because indices are on even numbers we cannot use the size util.
335+
336+ /*
337+ * It is possible for a thread to be interrupted or reschedule between the read of the producer and
338+ * consumer indices, therefore protection is required to ensure size is within valid range. In the
339+ * event of concurrent polls/offers to this method the size is OVER estimated as we read consumer
340+ * index BEFORE the producer index.
341+ */
342+ long after = lvConsumerIndex ();
343+ long size ;
344+ while (true ) {
345+ final long before = after ;
346+ final long currentProducerIndex = lvProducerIndex ();
347+ after = lvConsumerIndex ();
348+ if (before == after ) {
349+ // Mask out CLOSED bit for size calculation
350+ size = (((currentProducerIndex & CLOSED_MASK ) - after ) >> 1 );
351+ break ;
352+ }
353+ }
354+ // Long overflow is impossible, so size is always positive. Integer overflow is possible for the unbounded
355+ // indexed queues.
356+ if (size > Integer .MAX_VALUE ) {
357+ return Integer .MAX_VALUE ;
358+ }
359+ else {
360+ return (int ) size ;
361+ }
362+ }
363+
280364 /**
281365 * Marks the queue as closed. After this, no more elements can be offered.
282366 * This is done by setting the MSB of the producer index.
@@ -285,16 +369,11 @@ private int newBufferAndOffset(E[] nextBuffer, final long index) {
285369 public void close () {
286370 long pIndex ;
287371 while (true ) {
288- pIndex = (long ) PRODUCER_INDEX .getVolatile (this );
289-
290- // Check if already closed
372+ pIndex = lvProducerIndex ();
291373 if ((pIndex & CLOSED_BIT ) != 0 ) {
292374 return ;
293375 }
294-
295- // Set the CLOSED bit
296- long closedPIndex = pIndex | CLOSED_BIT ;
297- if (PRODUCER_INDEX .compareAndSet (this , pIndex , closedPIndex )) {
376+ if (casProducerIndex (pIndex , pIndex | CLOSED_BIT )) {
298377 return ;
299378 }
300379 }
@@ -306,7 +385,6 @@ public void close() {
306385 * @return true if the queue is closed
307386 */
308387 public boolean isClosed () {
309- long pIndex = (long ) PRODUCER_INDEX .getVolatile (this );
310- return (pIndex & CLOSED_BIT ) != 0 ;
388+ return (lvProducerIndex () & CLOSED_BIT ) != 0 ;
311389 }
312390}
0 commit comments