MpscAtomicIntegerArrayQueue Class — netty Architecture
Architecture documentation for the MpscAtomicIntegerArrayQueue class in MpscIntQueue.java from the netty codebase.
Entity Profile
Dependency Diagram
graph TD c00a7e8d_e19a_6d4a_6887_5d0634281bed["MpscAtomicIntegerArrayQueue"] 25c4444a_94e9_dd15_817d_4b74be90a1c5["MpscIntQueue.java"] c00a7e8d_e19a_6d4a_6887_5d0634281bed -->|defined in| 25c4444a_94e9_dd15_817d_4b74be90a1c5 f4575ddb_f917_95b9_7ae3_68f182f78f51["MpscAtomicIntegerArrayQueue()"] c00a7e8d_e19a_6d4a_6887_5d0634281bed -->|method| f4575ddb_f917_95b9_7ae3_68f182f78f51 fb7f0fff_a870_3ec4_c353_7341bc7fdd2e["offer()"] c00a7e8d_e19a_6d4a_6887_5d0634281bed -->|method| fb7f0fff_a870_3ec4_c353_7341bc7fdd2e 6c53d641_1124_ccce_e265_4ede0841b9c5["poll()"] c00a7e8d_e19a_6d4a_6887_5d0634281bed -->|method| 6c53d641_1124_ccce_e265_4ede0841b9c5 a6577b96_ac1f_02a7_4a84_eb562e174042["drain()"] c00a7e8d_e19a_6d4a_6887_5d0634281bed -->|method| a6577b96_ac1f_02a7_4a84_eb562e174042 c1dd1383_c80c_804d_7edd_027316763c11["fill()"] c00a7e8d_e19a_6d4a_6887_5d0634281bed -->|method| c1dd1383_c80c_804d_7edd_027316763c11 deeed477_2440_0cfd_cf69_74f73bf32f5b["isEmpty()"] c00a7e8d_e19a_6d4a_6887_5d0634281bed -->|method| deeed477_2440_0cfd_cf69_74f73bf32f5b 16657c3a_e2e7_fd92_b734_85f139e981e7["size()"] c00a7e8d_e19a_6d4a_6887_5d0634281bed -->|method| 16657c3a_e2e7_fd92_b734_85f139e981e7
Relationship Graph
Source Code
common/src/main/java/io/netty/util/concurrent/MpscIntQueue.java lines 96–271
final class MpscAtomicIntegerArrayQueue extends AtomicIntegerArray implements MpscIntQueue {
private static final long serialVersionUID = 8740338425124821455L;
private static final AtomicLongFieldUpdater<MpscAtomicIntegerArrayQueue> PRODUCER_INDEX =
AtomicLongFieldUpdater.newUpdater(MpscAtomicIntegerArrayQueue.class, "producerIndex");
private static final AtomicLongFieldUpdater<MpscAtomicIntegerArrayQueue> PRODUCER_LIMIT =
AtomicLongFieldUpdater.newUpdater(MpscAtomicIntegerArrayQueue.class, "producerLimit");
private static final AtomicLongFieldUpdater<MpscAtomicIntegerArrayQueue> CONSUMER_INDEX =
AtomicLongFieldUpdater.newUpdater(MpscAtomicIntegerArrayQueue.class, "consumerIndex");
private final int mask;
private final int emptyValue;
private volatile long producerIndex;
private volatile long producerLimit;
private volatile long consumerIndex;
public MpscAtomicIntegerArrayQueue(int capacity, int emptyValue) {
super(MathUtil.safeFindNextPositivePowerOfTwo(capacity));
if (emptyValue != 0) {
this.emptyValue = emptyValue;
int end = length() - 1;
for (int i = 0; i < end; i++) {
lazySet(i, emptyValue);
}
getAndSet(end, emptyValue); // 'getAndSet' acts as a full barrier, giving us initialization safety.
} else {
this.emptyValue = 0;
}
mask = length() - 1;
}
@Override
public boolean offer(int value) {
if (value == emptyValue) {
throw new IllegalArgumentException("Cannot offer the \"empty\" value: " + emptyValue);
}
// use a cached view on consumer index (potentially updated in loop)
final int mask = this.mask;
long producerLimit = this.producerLimit;
long pIndex;
do {
pIndex = producerIndex;
if (pIndex >= producerLimit) {
final long cIndex = consumerIndex;
producerLimit = cIndex + mask + 1;
if (pIndex >= producerLimit) {
// FULL :(
return false;
} else {
// update producer limit to the next index that we must recheck the consumer index
// this is racy, but the race is benign
PRODUCER_LIMIT.lazySet(this, producerLimit);
}
}
} while (!PRODUCER_INDEX.compareAndSet(this, pIndex, pIndex + 1));
/*
* NOTE: the new producer index value is made visible BEFORE the element in the array. If we relied on
* the index visibility to poll() we would need to handle the case where the element is not visible.
*/
// Won CAS, move on to storing
final int offset = (int) (pIndex & mask);
lazySet(offset, value);
// AWESOME :)
return true;
}
@Override
public int poll() {
final long cIndex = consumerIndex;
final int offset = (int) (cIndex & mask);
// If we can't see the next available element we can't poll
int value = get(offset);
if (emptyValue == value) {
/*
* NOTE: Queue may not actually be empty in the case of a producer (P1) being interrupted after
* winning the CAS on offer but before storing the element in the queue. Other producers may go on
* to fill up the queue after this element.
*/
if (cIndex != producerIndex) {
do {
value = get(offset);
} while (emptyValue == value);
} else {
Source
Frequently Asked Questions
What is the MpscAtomicIntegerArrayQueue class?
MpscAtomicIntegerArrayQueue is a class in the netty codebase, defined in common/src/main/java/io/netty/util/concurrent/MpscIntQueue.java.
Where is MpscAtomicIntegerArrayQueue defined?
MpscAtomicIntegerArrayQueue is defined in common/src/main/java/io/netty/util/concurrent/MpscIntQueue.java at line 96.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free