Home / Class/ MpscAtomicIntegerArrayQueue Class — netty Architecture

MpscAtomicIntegerArrayQueue Class — netty Architecture

Architecture documentation for the MpscAtomicIntegerArrayQueue class in MpscIntQueue.java from the netty codebase.

Entity Profile

Dependency Diagram

graph TD
  c00a7e8d_e19a_6d4a_6887_5d0634281bed["MpscAtomicIntegerArrayQueue"]
  25c4444a_94e9_dd15_817d_4b74be90a1c5["MpscIntQueue.java"]
  c00a7e8d_e19a_6d4a_6887_5d0634281bed -->|defined in| 25c4444a_94e9_dd15_817d_4b74be90a1c5
  f4575ddb_f917_95b9_7ae3_68f182f78f51["MpscAtomicIntegerArrayQueue()"]
  c00a7e8d_e19a_6d4a_6887_5d0634281bed -->|method| f4575ddb_f917_95b9_7ae3_68f182f78f51
  fb7f0fff_a870_3ec4_c353_7341bc7fdd2e["offer()"]
  c00a7e8d_e19a_6d4a_6887_5d0634281bed -->|method| fb7f0fff_a870_3ec4_c353_7341bc7fdd2e
  6c53d641_1124_ccce_e265_4ede0841b9c5["poll()"]
  c00a7e8d_e19a_6d4a_6887_5d0634281bed -->|method| 6c53d641_1124_ccce_e265_4ede0841b9c5
  a6577b96_ac1f_02a7_4a84_eb562e174042["drain()"]
  c00a7e8d_e19a_6d4a_6887_5d0634281bed -->|method| a6577b96_ac1f_02a7_4a84_eb562e174042
  c1dd1383_c80c_804d_7edd_027316763c11["fill()"]
  c00a7e8d_e19a_6d4a_6887_5d0634281bed -->|method| c1dd1383_c80c_804d_7edd_027316763c11
  deeed477_2440_0cfd_cf69_74f73bf32f5b["isEmpty()"]
  c00a7e8d_e19a_6d4a_6887_5d0634281bed -->|method| deeed477_2440_0cfd_cf69_74f73bf32f5b
  16657c3a_e2e7_fd92_b734_85f139e981e7["size()"]
  c00a7e8d_e19a_6d4a_6887_5d0634281bed -->|method| 16657c3a_e2e7_fd92_b734_85f139e981e7

Relationship Graph

Source Code

common/src/main/java/io/netty/util/concurrent/MpscIntQueue.java lines 96–271

    final class MpscAtomicIntegerArrayQueue extends AtomicIntegerArray implements MpscIntQueue {
        private static final long serialVersionUID = 8740338425124821455L;
        private static final AtomicLongFieldUpdater<MpscAtomicIntegerArrayQueue> PRODUCER_INDEX =
                AtomicLongFieldUpdater.newUpdater(MpscAtomicIntegerArrayQueue.class, "producerIndex");
        private static final AtomicLongFieldUpdater<MpscAtomicIntegerArrayQueue> PRODUCER_LIMIT =
                AtomicLongFieldUpdater.newUpdater(MpscAtomicIntegerArrayQueue.class, "producerLimit");
        private static final AtomicLongFieldUpdater<MpscAtomicIntegerArrayQueue> CONSUMER_INDEX =
                AtomicLongFieldUpdater.newUpdater(MpscAtomicIntegerArrayQueue.class, "consumerIndex");
        private final int mask;
        private final int emptyValue;
        private volatile long producerIndex;
        private volatile long producerLimit;
        private volatile long consumerIndex;

        public MpscAtomicIntegerArrayQueue(int capacity, int emptyValue) {
            super(MathUtil.safeFindNextPositivePowerOfTwo(capacity));
            if (emptyValue != 0) {
                this.emptyValue = emptyValue;
                int end = length() - 1;
                for (int i = 0; i < end; i++) {
                    lazySet(i, emptyValue);
                }
                getAndSet(end, emptyValue); // 'getAndSet' acts as a full barrier, giving us initialization safety.
            } else {
                this.emptyValue = 0;
            }
            mask = length() - 1;
        }

        @Override
        public boolean offer(int value) {
            if (value == emptyValue) {
                throw new IllegalArgumentException("Cannot offer the \"empty\" value: " + emptyValue);
            }
            // use a cached view on consumer index (potentially updated in loop)
            final int mask = this.mask;
            long producerLimit = this.producerLimit;
            long pIndex;
            do {
                pIndex = producerIndex;
                if (pIndex >= producerLimit) {
                    final long cIndex = consumerIndex;
                    producerLimit = cIndex + mask + 1;
                    if (pIndex >= producerLimit) {
                        // FULL :(
                        return false;
                    } else {
                        // update producer limit to the next index that we must recheck the consumer index
                        // this is racy, but the race is benign
                        PRODUCER_LIMIT.lazySet(this, producerLimit);
                    }
                }
            } while (!PRODUCER_INDEX.compareAndSet(this, pIndex, pIndex + 1));
            /*
             * NOTE: the new producer index value is made visible BEFORE the element in the array. If we relied on
             * the index visibility to poll() we would need to handle the case where the element is not visible.
             */
            // Won CAS, move on to storing
            final int offset = (int) (pIndex & mask);
            lazySet(offset, value);
            // AWESOME :)
            return true;
        }

        @Override
        public int poll() {
            final long cIndex = consumerIndex;
            final int offset = (int) (cIndex & mask);
            // If we can't see the next available element we can't poll
            int value = get(offset);
            if (emptyValue == value) {
                /*
                 * NOTE: Queue may not actually be empty in the case of a producer (P1) being interrupted after
                 * winning the CAS on offer but before storing the element in the queue. Other producers may go on
                 * to fill up the queue after this element.
                 */
                if (cIndex != producerIndex) {
                    do {
                        value = get(offset);
                    } while (emptyValue == value);
                } else {

Frequently Asked Questions

What is the MpscAtomicIntegerArrayQueue class?
MpscAtomicIntegerArrayQueue is a class in the netty codebase, defined in common/src/main/java/io/netty/util/concurrent/MpscIntQueue.java.
Where is MpscAtomicIntegerArrayQueue defined?
MpscAtomicIntegerArrayQueue is defined in common/src/main/java/io/netty/util/concurrent/MpscIntQueue.java at line 96.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free