//插入元素,队列满后会抛出异常boolean add(E e);//移除元素,队列为空时会抛出异常E remove();//插入元素,成功反会trueboolean offer(E e);//移除元素E poll();//插入元素,队列满后会阻塞void put(E e) throws InterruptedException;//移除元素,队列空后会阻塞E take() throws InterruptedException;//限时插入boolean offer(E e, long timeout, TimeUnit unit)//限时移除E poll(long timeout, TimeUnit unit);//获取所有元素到Collection中int drainTo(Collection<? super E> c);
//通过数组来存储队列中的元素final Object[] items;//初始化一个固定的数组大小,默认使用非公平锁来控制并发public ArrayBlockingQueue(int capacity) { this(capacity, false);}//初始化固定的items数组大小,初始化notEmpty以及notFull两个Condition来控制生产消费public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair);//通过ReentrantLock来控制并发 notEmpty = lock.newCondition(); notFull = lock.newCondition();}
//插入元素到队列中public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //获取独占锁 try { while (count == items.length) //如果队列已满则通过await阻塞put方法 notFull.await(); enqueue(e); //插入元素 } finally { lock.unlock(); }}private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) //插入元素后将putIndex+1,当队列使用完后重置为0 putIndex = 0; count++; notEmpty.signal(); //队列添加元素后唤醒因notEmpty等待的消费线程}//移除队列中的元素public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //获取独占锁 try { while (count == 0) //如果队列已空则通过await阻塞take方法 notEmpty.await(); return dequeue(); //移除元素 } finally { lock.unlock(); }}private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) //移除元素后将takeIndex+1,当队列使用完后重置为0 takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); //队列消费元素后唤醒因notFull等待的消费线程 return x;}
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); //获取定时时长 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) //指定时长过后,线程仍然未被唤醒则返回false return false; nanos = notFull.awaitNanos(nanos); //指定时长内阻塞线程 } enqueue(e); return true; } finally { lock.unlock(); }}
public int drainTo(Collection<? super E> c, int maxElements) { checkNotNull(c); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final Object[] items = this.items; final ReentrantLock lock = this.lock; //仅获取一次锁 lock.lock(); try { int n = Math.min(maxElements, count); //获取队列中所有元素 int take = takeIndex; int i = 0; try { while (i < n) { @SuppressWarnings("unchecked") E x = (E) items[take]; c.add(x); //循环插入元素 items[take] = null; if (++take == items.length) take = 0; i++; } return n; } finally { // Restore invariants even if c.add() threw if (i > 0) { count -= i; takeIndex = take; if (itrs != null) { if (count == 0) itrs.queueIsEmpty(); else if (i > take) itrs.takeIndexWrapped(); } for (; i > 0 && lock.hasWaiters(notFull); i--) notFull.signal(); //唤醒等待的生产者线程 } } } finally { lock.unlock(); }}
public void put(E e) throws InterruptedException { int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; //因为使用了双锁,需要使用AtomicInteger计算元素总量,避免并发计算不准确 final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); //队列已满,阻塞生产线程 } enqueue(node); //插入元素到队列尾部 c = count.getAndIncrement(); //count + 1 if (c + 1 < capacity) //如果+1后队列还未满,通过其他生产线程继续生产 notFull.signal(); } finally { putLock.unlock(); } if (c == 0) //只有当之前是空时,消费队列才会阻塞,否则是不需要通知的 signalNotEmpty(); }private void enqueue(Node<E> node) { //将新元素添加到链表末尾,然后将last指向尾部元素 last = last.next = node;}public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); //队列为空,阻塞消费线程 } x = dequeue(); //消费一个元素 c = count.getAndDecrement(); //count - 1 if (c > 1) // 通知其他等待的消费线程继续消费 notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) //只有当之前是满的,生产队列才会阻塞,否则是不需要通知的 signalNotFull(); return x;}//消费队列头部的下一个元素,同时将新头部置空private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x;}
public void put(E e) { offer(e);}public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; //无界队列,队列长度不够时会扩容 while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { //通过comparator来实现优先级排序 Comparator<? super E> cmp = comparator; if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); size = n + 1; notEmpty.signal(); //和ArrayBlockingQueue一样,每次添加元素后通知消费线程 } finally { lock.unlock(); } return true;}public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while ( (result = dequeue()) == null) notEmpty.await(); //队列为空,阻塞消费线程 } finally { lock.unlock(); } return result;}DelayQueue
private final transient ReentrantLock lock = new ReentrantLock();private final PriorityQueue<E> q = new PriorityQueue<E>();//优先级队列public void put(E e) { offer(e);}public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); //插入元素到优先级队列 if (q.peek() == e) { //如果插入的元素在队列头部 leader = null; available.signal(); //通知消费线程 } return true; } finally { lock.unlock(); }}public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); //获取头部元素 if (first == null) available.await(); //空队列阻塞 else { long delay = first.getDelay(NANOSECONDS); //检查元素是否延迟到期 if (delay <= 0) return q.poll(); //到期则弹出元素 first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); //阻塞未到期的时间 } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); }}SynchronousQueue
//非公平情况下调用内部类TransferStack的transfer方法putpublic void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); }}//非公平情况下调用内部类TransferStack的transfer方法takepublic E take() throws InterruptedException { E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException();}//具体的put以及take方法,只有E的区别,通过E来区别REQUEST还是DATA模式E transfer(E e, boolean timed, long nanos) { SNode s = null; // constructed/reused as needed int mode = (e == null) ? REQUEST : DATA; for (;;) { SNode h = head; //栈无元素或者元素和插入的元素模式相匹配,也就是说都是插入元素 if (h == null || h.mode == mode) { //有时间限制并且超时 if (timed && nanos <= 0) { if (h != null && h.isCancelled()) casHead(h, h.next); // 重新设置头节点 else return null; } //未超时cas操作尝试设置头节点 else if (casHead(h, s = snode(s, e, h, mode))) { //自旋一段时间后未消费元素则挂起put线程 SNode m = awaitFulfill(s, timed, nanos); if (m == s) { // wait was cancelled clean(s); return null; } if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller return (E) ((mode == REQUEST) ? m.item : s.item); } } //栈不为空并且和头节点模式不匹配,存在元素则消费元素并重新设置head节点 else if (!isFulfilling(h.mode)) { // try to fulfill if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { for (;;) { // loop until matched or waiters disappear SNode m = s.next; // m is s's match if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } SNode mn = m.next; if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m return (E) ((mode == REQUEST) ? m.item : s.item); } else // lost match s.casNext(m, mn); // help unlink } } } //节点正在匹配阶段 else { // help a fulfiller SNode m = h.next; // m is h's match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink } } }}//先自旋后挂起的核心方法SNode awaitFulfill(SNode s, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); //计算自旋的次数 int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { if (w.isInterrupted()) s.tryCancel(); SNode m = s.match; //匹配成功过返回节点 if (m != null) return m; //超时控制 if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(); continue; } } //自旋检查,是否进行下一次自旋 if (spins > 0) spins = shouldSpin(s) ? (spins-1) : 0; else if (s.waiter == null) s.waiter = w; // establish waiter so can park next iter else if (!timed) LockSupport.park(this); //在这里挂起线程 else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); }}
LinkedTransferQueue相比于以上的队列还提供了一些额外的功能,它实现了TransferQueue接口,有两个关键方法transfer(E e)和tryTransfer(E e)方法,transfer在没有消费时会阻塞,tryTransfer在没有消费时不会插入到队列中,也不会等待,直接返回false。
private static final int NOW = 0; // for untimed poll, tryTransferprivate static final int ASYNC = 1; // for offer, put, addprivate static final int SYNC = 2; // for transfer, takeprivate static final int TIMED = 3; // for timed poll, tryTransfer//通过SYNC状态来实现生产阻塞public void transfer(E e) throws InterruptedException { if (xfer(e, true, SYNC, 0) != null) { Thread.interrupted(); // failure possible only due to interrupt throw new InterruptedException(); }}//通过NOW状态跳过添加元素以及阻塞public boolean tryTransfer(E e) { return xfer(e, true, NOW, 0) == null;}//通过ASYNC状态跳过阻塞public void put(E e) { xfer(e, true, ASYNC, 0);}//通过SYNC状态来实现消费阻塞public E take() throws InterruptedException { E e = xfer(null, false, SYNC, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException();}//生产消费调用同一个方法,通过e是否为空,haveData,how等参数来区分具体逻辑private E xfer(E e, boolean haveData, int how, long nanos) { if (haveData && (e == null)) throw new NullPointerException(); Node s = null; // the node to append, if needed retry: for (;;) { // restart on append race //找出第一个可用节点 for (Node h = head, p = h; p != null;) { // find & match first node boolean isData = p.isData; Object item = p.item; //队列为空时直接跳过 if (item != p && (item != null) == isData) { // unmatched //节点类型相同,跳过 if (isData == haveData) // can't match break; if (p.casItem(item, e)) { // match for (Node q = p; q != h;) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); return LinkedTransferQueue.<E>cast(item); } } Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist } //插入节点或移除节点具体逻辑 //tryTransfer方法会直接跳过并返回结果 if (how != NOW) { // No matches available if (s == null) s = new Node(e, haveData); Node pred = tryAppend(s, haveData); //加入节点 if (pred == null) continue retry; // lost race vs opposite mode if (how != ASYNC) //自旋以及阻塞消费线程逻辑,和SynchronousQueue类似,先尝试自选,失败后挂起线程 //transfer方法在没有消费线程时也会阻塞在这里 return awaitMatch(s, pred, e, (how == TIMED), nanos); } return e; // not waiting }}LinkedBlockingDeque
