Skip to content

Commit 946edb8

Browse files
committed
Refactor MpscUnboundedStream for clarity and performance improvements
1 parent e9f5a67 commit 946edb8

1 file changed

Lines changed: 58 additions & 52 deletions

File tree

core/src/main/java/io/netty/loom/MpscUnboundedStream.java

Lines changed: 58 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ public class MpscUnboundedStream<E> implements AutoCloseable {
4343

4444
// Sentinel object used to mark a jump to the next buffer
4545
private static final Object JUMP = new Object();
46+
private static final Object BUFFER_CONSUMED = new Object();
47+
private static final int CONTINUE_TO_P_INDEX_CAS = 0;
48+
private static final int RETRY = 1;
49+
private static final int QUEUE_FULL = 2;
50+
private static final int QUEUE_RESIZE = 3;
4651

4752
// MSB is used to store the CLOSED flag
4853
private static final long CLOSED_BIT = 1L << 63;
@@ -111,12 +116,12 @@ private boolean casProducerLimit(long expect, long newValue) {
111116
return PRODUCER_LIMIT.compareAndSet(this, expect, newValue);
112117
}
113118

114-
private static <E> void soElement(E[] buffer, int offset, E e) {
119+
private static <E> void soRefElement(E[] buffer, int offset, E e) {
115120
ARRAY.setRelease(buffer, offset, e);
116121
}
117122

118123
@SuppressWarnings("unchecked")
119-
private static <E> E lvElement(E[] buffer, int offset) {
124+
private static <E> E lvRefElement(E[] buffer, int offset) {
120125
return (E) ARRAY.getAcquire(buffer, offset);
121126
}
122127

@@ -157,13 +162,13 @@ public boolean offer(E e) {
157162
if (producerLimit <= pIndex) {
158163
int result = offerSlowPath(mask, pIndex, producerLimit);
159164
switch (result) {
160-
case 0:
165+
case CONTINUE_TO_P_INDEX_CAS:
161166
break;
162-
case 1:
167+
case RETRY:
163168
continue;
164-
case 2:
169+
case QUEUE_FULL:
165170
return false;
166-
case 3:
171+
case QUEUE_RESIZE:
167172
resize(mask, buffer, pIndex, e);
168173
return true;
169174
}
@@ -174,36 +179,37 @@ public boolean offer(E e) {
174179
}
175180
}
176181
// INDEX visible before ELEMENT, consistent with consumer expectation
177-
final int offset = modifiedCalcElementOffset(pIndex, mask);
178-
soElement(buffer, offset, e);
182+
final int offset = modifiedCalcCircularRefElementOffset(pIndex, mask);
183+
soRefElement(buffer, offset, e);
179184
return true;
180185
}
181186

182187
/**
183188
* We do not inline resize into this method because we do not resize on fill.
184189
*/
185190
private int offerSlowPath(long mask, long pIndex, long producerLimit) {
186-
int result;
187191
final long cIndex = lvConsumerIndex();
188192
long bufferCapacity = getCurrentBufferCapacity(mask);
189-
result = 0;// 0 - goto pIndex CAS
190193
if (cIndex + bufferCapacity > pIndex) {
191194
if (!casProducerLimit(producerLimit, cIndex + bufferCapacity)) {
192-
result = 1;// retry from top
195+
// retry from top
196+
return RETRY;
193197
}
198+
// continue to pIndex CAS
199+
return CONTINUE_TO_P_INDEX_CAS;
194200
}
195201
// full and cannot grow
196202
else if (availableInQueue(pIndex, cIndex) <= 0) {
197-
result = 2;// -> return false;
203+
// offer should return false;
204+
return QUEUE_FULL;
198205
}
199206
// grab index for resize -> set lower bit
200207
else if (casProducerIndex(pIndex, pIndex + 1)) {
201-
result = 3;// -> resize
208+
// trigger a resize
209+
return QUEUE_RESIZE;
202210
}
203-
else {
204-
result = 1;// failed resize attempt, retry from top
205-
}
206-
return result;
211+
// failed resize attempt, retry from top
212+
return RETRY;
207213
}
208214

209215
private void resize(long oldMask, E[] oldBuffer, long pIndex, final E e) {
@@ -215,12 +221,12 @@ private void resize(long oldMask, E[] oldBuffer, long pIndex, final E e) {
215221
final int newMask = (newBufferLength - 2) << 1;
216222
producerMask = newMask;
217223

218-
final int offsetInOld = modifiedCalcElementOffset(pIndex, oldMask);
219-
final int offsetInNew = modifiedCalcElementOffset(pIndex, newMask);
224+
final int offsetInOld = modifiedCalcCircularRefElementOffset(pIndex, oldMask);
225+
final int offsetInNew = modifiedCalcCircularRefElementOffset(pIndex, newMask);
220226

221227

222-
soElement(newBuffer, offsetInNew, e);// element in new array
223-
soElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);// buffer linked
228+
soRefElement(newBuffer, offsetInNew, e);// element in new array
229+
soRefElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);// buffer linked
224230

225231
// ASSERT code
226232
final long cIndex = lvConsumerIndex();
@@ -239,11 +245,11 @@ private void resize(long oldMask, E[] oldBuffer, long pIndex, final E e) {
239245
// INDEX visible before ELEMENT, consistent with consumer expectation
240246

241247
// make resize visible to consumer
242-
soElement(oldBuffer, offsetInOld, JUMP);
248+
soRefElement(oldBuffer, offsetInOld, JUMP);
243249
}
244250

245251
private int nextArrayOffset(final long mask) {
246-
return modifiedCalcElementOffset(mask + 2, Long.MAX_VALUE);
252+
return modifiedCalcCircularRefElementOffset(mask + 2, Long.MAX_VALUE);
247253
}
248254

249255
protected int getNextBufferSize(E[] buffer) {
@@ -262,7 +268,7 @@ protected long availableInQueue(long pIndex, long cIndex) {
262268
* This method assumes index is actually (index << 1) because lower bit is used for resize. This is
263269
* compensated for by reducing the element shift. The computation is constant folded, so there's no cost.
264270
*/
265-
private static int modifiedCalcElementOffset(long index, long mask) {
271+
private static int modifiedCalcCircularRefElementOffset(long index, long mask) {
266272
return (int) ((index & mask) >> 1);
267273
}
268274

@@ -272,62 +278,60 @@ public E poll() {
272278
final long index = consumerIndex;
273279
final long mask = consumerMask;
274280

275-
final int offset = modifiedCalcElementOffset(index, mask);
276-
Object e = lvElement(buffer, offset);// LoadLoad
281+
final int offset = modifiedCalcCircularRefElementOffset(index, mask);
282+
Object e = lvRefElement(buffer, offset);// LoadLoad
277283
if (e == null) {
278-
if (index != (lvProducerIndex() & CLOSED_MASK)) {
279-
// poll() == null iff queue is empty, null element is not strong enough indicator, so we must
280-
// check the producer index. If the queue is indeed not empty we spin until element is
281-
// visible.
282-
do {
283-
e = lvElement(buffer, offset);
284-
} while (e == null);
285-
}
286-
else {
284+
long pIndex = lvProducerIndex() & CLOSED_MASK;
285+
pIndex += (pIndex & RESIZE_BIT);
286+
// isEmpty?
287+
if (index == pIndex) {
287288
return null;
288289
}
290+
// poll() == null iff queue is empty, null element is not strong enough indicator, so we must
291+
// spin until element is visible.
292+
do {
293+
e = lvRefElement(buffer, offset);
294+
} while (e == null);
289295
}
290296
if (e == JUMP) {
291-
final E[] nextBuffer = getNextBuffer(buffer, mask);
297+
final E[] nextBuffer = nextBuffer(buffer, mask);
292298
return newBufferPoll(nextBuffer, index);
293299
}
294-
soElement(buffer, offset, null);
300+
soRefElement(buffer, offset, null);
295301
soConsumerIndex(index + 2);
296302
return (E) e;
297303
}
298304

299-
private E[] getNextBuffer(final E[] buffer, final long mask) {
305+
private E[] nextBuffer(final E[] buffer, final long mask) {
300306
final int nextArrayOffset = nextArrayOffset(mask);
301307
@SuppressWarnings("unchecked")
302-
final E[] nextBuffer = (E[]) lvElement(buffer, nextArrayOffset);
303-
soElement(buffer, nextArrayOffset, null);
308+
final E[] nextBuffer = (E[]) lvRefElement(buffer, nextArrayOffset);
309+
consumerBuffer = nextBuffer;
310+
consumerMask = (nextBuffer.length - 2) << 1;
311+
soRefElement(buffer, nextArrayOffset, BUFFER_CONSUMED);
304312
return nextBuffer;
305313
}
306314

307315
private E newBufferPoll(E[] nextBuffer, final long index) {
308-
final int offsetInNew = newBufferAndOffset(nextBuffer, index);
309-
final E n = lvElement(nextBuffer, offsetInNew);// LoadLoad
316+
final int offset = modifiedCalcCircularRefElementOffset(index, consumerMask);
317+
final E n = lvRefElement(nextBuffer, offset);// LoadLoad
310318
if (n == null) {
311319
throw new IllegalStateException("new buffer must have at least one element");
312320
}
313-
soElement(nextBuffer, offsetInNew, null);// StoreStore
321+
soRefElement(nextBuffer, offset, null);// StoreStore
314322
soConsumerIndex(index + 2);
315323
return n;
316324
}
317325

318-
private int newBufferAndOffset(E[] nextBuffer, final long index) {
319-
consumerBuffer = nextBuffer;
320-
consumerMask = (nextBuffer.length - 2) << 1;
321-
final int offsetInNew = modifiedCalcElementOffset(index, consumerMask);
322-
return offsetInNew;
323-
}
324-
325326
public final boolean isEmpty() {
326327
// Order matters!
327328
// Loading consumer before producer allows for producer increments after consumer index is read.
328329
// This ensures this method is conservative in it's estimate. Note that as this is an MPSC q there is
329330
// nothing we can do to make this an exact method.
330-
return (this.lvConsumerIndex() == (this.lvProducerIndex() & CLOSED_MASK));
331+
long cIndex = lvConsumerIndex();
332+
long pIndex = lvProducerIndex() & CLOSED_MASK;
333+
pIndex += (pIndex & RESIZE_BIT);
334+
return cIndex == pIndex;
331335
}
332336

333337
public final int size() {
@@ -347,7 +351,9 @@ public final int size() {
347351
after = lvConsumerIndex();
348352
if (before == after) {
349353
// Mask out CLOSED bit for size calculation
350-
size = (((currentProducerIndex & CLOSED_MASK) - after) >> 1);
354+
long pIndex = currentProducerIndex & CLOSED_MASK;
355+
pIndex += (pIndex & RESIZE_BIT);
356+
size = (pIndex - after) >> 1;
351357
break;
352358
}
353359
}

0 commit comments

Comments
 (0)