@@ -51,7 +51,8 @@ public class MpscUnboundedStream<E> implements AutoCloseable {
5151
5252 // MSB is used to store the CLOSED flag
5353 private static final long CLOSED_BIT = 1L << 63 ;
54- private static final long CLOSED_MASK = ~CLOSED_BIT ;
54+ private static final long NOT_CLOSED_MASK = ~CLOSED_BIT ;
55+ private static final long IGNORE_CLOSED_AND_RESIZE_MASK = NOT_CLOSED_MASK & ~1L ;
5556 // LSB is used during resize (bit 0)
5657 private static final long RESIZE_BIT = 1L ;
5758
@@ -281,7 +282,7 @@ public E poll() {
281282 final int offset = modifiedCalcCircularRefElementOffset (index , mask );
282283 Object e = lvRefElement (buffer , offset );// LoadLoad
283284 if (e == null ) {
284- long pIndex = lvProducerIndex () & CLOSED_MASK ;
285+ long pIndex = lvProducerIndex () & NOT_CLOSED_MASK ;
285286 pIndex += (pIndex & RESIZE_BIT );
286287 // isEmpty?
287288 if (index == pIndex ) {
@@ -329,8 +330,7 @@ public final boolean isEmpty() {
329330 // This ensures this method is conservative in it's estimate. Note that as this is an MPSC q there is
330331 // nothing we can do to make this an exact method.
331332 long cIndex = lvConsumerIndex ();
332- long pIndex = lvProducerIndex () & CLOSED_MASK ;
333- pIndex += (pIndex & RESIZE_BIT );
333+ long pIndex = lvProducerIndex () & IGNORE_CLOSED_AND_RESIZE_MASK ;
334334 return cIndex == pIndex ;
335335 }
336336
@@ -350,9 +350,7 @@ public final int size() {
350350 final long currentProducerIndex = lvProducerIndex ();
351351 after = lvConsumerIndex ();
352352 if (before == after ) {
353- // Mask out CLOSED bit for size calculation
354- long pIndex = currentProducerIndex & CLOSED_MASK ;
355- pIndex += (pIndex & RESIZE_BIT );
353+ long pIndex = currentProducerIndex & IGNORE_CLOSED_AND_RESIZE_MASK ;
356354 size = (pIndex - after ) >> 1 ;
357355 break ;
358356 }
0 commit comments