龙空技术网

带着BAT大厂的面试问题:一文理解JUC中AQS类(一)

Java全栈知识体系 81

前言:

此刻小伙伴们对“java面试pdai”大约比较讲究,同学们都需要学习一些“java面试pdai”的相关资讯。那么小编同时在网摘上汇集了一些有关“java面试pdai””的相关资讯,希望各位老铁们能喜欢,咱们一起来学习一下吧!

AbstractQueuedSynchronizer抽象类是核心,需要重点掌握。它提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架。@pdai

带着BAT大厂的面试问题去理解

请带着这些问题继续后文,会很大程度上帮助你更好的理解相关知识点。@pdai

什么是AQS? 为什么它是核心?AQS的核心思想是什么? 它是怎么实现的? 底层数据结构等AQS有哪些核心的方法?AQS定义什么样的资源获取方式? AQS定义了两种资源获取方式:独占(只有一个线程能访问执行,又根据是否按队列的顺序分为公平锁和非公平锁,如ReentrantLock) 和共享(多个线程可同时访问执行,如Semaphore、CountDownLatch、 CyclicBarrier )。ReentrantReadWriteLock可以看成是组合式,允许多个线程同时对某一资源进行读。AQS底层使用了什么样的设计模式? 模板AQS的应用示例?AbstractQueuedSynchronizer简介

AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。当然,我们自己也能利用AQS非常轻松容易地构造出符合我们自己需求的同步器。

AQS 核心思想

AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。

AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS对该同步状态进行原子操作实现对其值的修改。

private volatile int state;//共享变量,使用volatile修饰保证线程可见性

状态信息通过procted类型的getState,setState,compareAndSetState进行操作

//返回同步状态的当前值protected final int getState() {          return state;} // 设置同步状态的值protected final void setState(int newState) {         state = newState;}//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)protected final boolean compareAndSetState(int expect, int update) {        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);}
AQS 对资源的共享方式

AQS定义两种资源共享方式

Exclusive(独占):只有一个线程能执行,如ReentrantLock。又可分为公平锁和非公平锁: 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的Share(共享):多个线程可同时执行,如Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock 我们都会在后面讲到。

ReentrantReadWriteLock 可以看成是组合式,因为ReentrantReadWriteLock也就是读写锁允许多个线程同时对某一资源进行读。

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在上层已经帮我们实现好了。

AQS底层使用了模板方法模式

同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):

使用者继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放) 将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

这和我们以往通过实现接口的方式有很大区别,AQS使用了模板方法模式,自定义同步器时需要重写下面几个AQS提供的模板方法:

isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。

默认情况下,每个方法都抛出 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS类中的其他方法都是final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。

以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

AbstractQueuedSynchronizer数据结构

AbstractQueuedSynchronizer类底层的数据结构是使用CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。其中Sync queue,即同步队列,是双向链表,包括head结点和tail结点,head结点主要用作后续的调度。而Condition queue不是必须的,其是一个单向链表,只有当使用Condition时,才会存在此单向链表。并且可能会有多个Condition queue。

image

AbstractQueuedSynchronizer源码分析类的继承关系

AbstractQueuedSynchronizer继承自AbstractOwnableSynchronizer抽象类,并且实现了Serializable接口,可以进行序列化。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable

其中AbstractOwnableSynchronizer抽象类的源码如下:

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {        // 版本序列号    private static final long serialVersionUID = 3737899427754241961L;    // 构造方法    protected AbstractOwnableSynchronizer() { }    // 独占模式下的线程    private transient Thread exclusiveOwnerThread;        // 设置独占线程     protected final void setExclusiveOwnerThread(Thread thread) {        exclusiveOwnerThread = thread;    }        // 获取独占线程     protected final Thread getExclusiveOwnerThread() {        return exclusiveOwnerThread;    }}

AbstractOwnableSynchronizer抽象类中,可以设置独占资源线程和获取独占资源线程。分别为setExclusiveOwnerThread与getExclusiveOwnerThread方法,这两个方法会被子类调用。

AbstractQueuedSynchronizer类有两个内部类,分别为Node类与ConditionObject类。下面分别做介绍。

类的内部类 - Node类

static final class Node {    // 模式,分为共享与独占    // 共享模式    static final Node SHARED = new Node();    // 独占模式    static final Node EXCLUSIVE = null;            // 结点状态    // CANCELLED,值为1,表示当前的线程被取消    // SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark    // CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中    // PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行    // 值为0,表示当前节点在sync队列中,等待着获取锁    static final int CANCELLED =  1;    static final int SIGNAL    = -1;    static final int CONDITION = -2;    static final int PROPAGATE = -3;            // 结点状态    volatile int waitStatus;            // 前驱结点    volatile Node prev;        // 后继结点    volatile Node next;            // 结点所对应的线程    volatile Thread thread;            // 下一个等待者    Node nextWaiter;        // 结点是否在共享模式下等待    final boolean isShared() {        return nextWaiter == SHARED;    }        // 获取前驱结点,若前驱结点为空,抛出异常    final Node predecessor() throws NullPointerException {        // 保存前驱结点        Node p = prev;         if (p == null) // 前驱结点为空,抛出异常            throw new NullPointerException();        else // 前驱结点不为空,返回            return p;    }        // 无参构造方法    Node() {    // Used to establish initial head or SHARED marker    }        // 构造方法        Node(Thread thread, Node mode) {    // Used by addWaiter        this.nextWaiter = mode;        this.thread = thread;    }        // 构造方法    Node(Thread thread, int waitStatus) { // Used by Condition        this.waitStatus = waitStatus;        this.thread = thread;    }}

每个线程被阻塞的线程都会被封装成一个Node结点,放入队列。每个节点包含了一个Thread类型的引用,并且每个节点都存在一个状态,具体状态如下。

CANCELLED,值为1,表示当前的线程被取消。SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,需要进行unpark操作。CONDITION,值为-2,表示当前节点在等待condition,也就是在condition queue中。PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行。值为0,表示当前节点在sync queue中,等待着获取锁。类的内部类 - ConditionObject类

这个类有点长,耐心看下:

// 内部类public class ConditionObject implements Condition, java.io.Serializable {    // 版本号    private static final long serialVersionUID = 1173984872572414699L;    /** First node of condition queue. */    // condition队列的头结点    private transient Node firstWaiter;    /** Last node of condition queue. */    // condition队列的尾结点    private transient Node lastWaiter;    /**        * Creates a new {@code ConditionObject} instance.        */    // 构造方法    public ConditionObject() { }    // Internal methods    /**        * Adds a new waiter to wait queue.        * @return its new wait node        */    // 添加新的waiter到wait队列    private Node addConditionWaiter() {        // 保存尾结点        Node t = lastWaiter;        // If lastWaiter is cancelled, clean out.        if (t != null && t.waitStatus != Node.CONDITION) { // 尾结点不为空,并且尾结点的状态不为CONDITION            // 清除状态为CONDITION的结点            unlinkCancelledWaiters();             // 将最后一个结点重新赋值给t            t = lastWaiter;        }        // 新建一个结点        Node node = new Node(Thread.currentThread(), Node.CONDITION);        if (t == null) // 尾结点为空            // 设置condition队列的头结点            firstWaiter = node;        else // 尾结点不为空            // 设置为节点的nextWaiter域为node结点            t.nextWaiter = node;        // 更新condition队列的尾结点        lastWaiter = node;        return node;    }    /**        * Removes and transfers nodes until hit non-cancelled one or        * null. Split out from signal in part to encourage compilers        * to inline the case of no waiters.        * @param first (non-null) the first node on condition queue        */    private void doSignal(Node first) {        // 循环        do {            if ( (firstWaiter = first.nextWaiter) == null) // 该节点的nextWaiter为空                // 设置尾结点为空                lastWaiter = null;            // 设置first结点的nextWaiter域            first.nextWaiter = null;        } while (!transferForSignal(first) &&                    (first = firstWaiter) != null); // 将结点从condition队列转移到sync队列失败并且condition队列中的头结点不为空,一直循环    }    /**        * Removes and transfers all nodes.        * @param first (non-null) the first node on condition queue        */    private void doSignalAll(Node first) {        // condition队列的头结点尾结点都设置为空        lastWaiter = firstWaiter = null;        // 循环        do {            // 获取first结点的nextWaiter域结点            Node next = first.nextWaiter;            // 设置first结点的nextWaiter域为空            first.nextWaiter = null;            // 将first结点从condition队列转移到sync队列            transferForSignal(first);            // 重新设置first            first = next;        } while (first != null);    }    /**        * Unlinks cancelled waiter nodes from condition queue.        * Called only while holding lock. This is called when        * cancellation occurred during condition wait, and upon        * insertion of a new waiter when lastWaiter is seen to have        * been cancelled. This method is needed to avoid garbage        * retention in the absence of signals. So even though it may        * require a full traversal, it comes into play only when        * timeouts or cancellations occur in the absence of        * signals. It traverses all nodes rather than stopping at a        * particular target to unlink all pointers to garbage nodes        * without requiring many re-traversals during cancellation        * storms.        */    // 从condition队列中清除状态为CANCEL的结点    private void unlinkCancelledWaiters() {        // 保存condition队列头结点        Node t = firstWaiter;        Node trail = null;        while (t != null) { // t不为空            // 下一个结点            Node next = t.nextWaiter;            if (t.waitStatus != Node.CONDITION) { // t结点的状态不为CONDTION状态                // 设置t节点的额nextWaiter域为空                t.nextWaiter = null;                if (trail == null) // trail为空                    // 重新设置condition队列的头结点                    firstWaiter = next;                else // trail不为空                    // 设置trail结点的nextWaiter域为next结点                    trail.nextWaiter = next;                if (next == null) // next结点为空                    // 设置condition队列的尾结点                    lastWaiter = trail;            }            else // t结点的状态为CONDTION状态                // 设置trail结点                trail = t;            // 设置t结点            t = next;        }    }    // public methods    /**        * Moves the longest-waiting thread, if one exists, from the        * wait queue for this condition to the wait queue for the        * owning lock.        *        * @throws IllegalMonitorStateException if {@link #isHeldExclusively}        *         returns {@code false}        */    // 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。    public final void signal() {        if (!isHeldExclusively()) // 不被当前线程独占,抛出异常            throw new IllegalMonitorStateException();        // 保存condition队列头结点        Node first = firstWaiter;        if (first != null) // 头结点不为空            // 唤醒一个等待线程            doSignal(first);    }    /**        * Moves all threads from the wait queue for this condition to        * the wait queue for the owning lock.        *        * @throws IllegalMonitorStateException if {@link #isHeldExclusively}        *         returns {@code false}        */    // 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。    public final void signalAll() {        if (!isHeldExclusively()) // 不被当前线程独占,抛出异常            throw new IllegalMonitorStateException();        // 保存condition队列头结点        Node first = firstWaiter;        if (first != null) // 头结点不为空            // 唤醒所有等待线程            doSignalAll(first);    }    /**        * Implements uninterruptible condition wait.        * <ol>        * <li> Save lock state returned by {@link #getState}.        * <li> Invoke {@link #release} with saved state as argument,        *      throwing IllegalMonitorStateException if it fails.        * <li> Block until signalled.        * <li> Reacquire by invoking specialized version of        *      {@link #acquire} with saved state as argument.        * </ol>        */    // 等待,当前线程在接到信号之前一直处于等待状态,不响应中断    public final void awaitUninterruptibly() {        // 添加一个结点到等待队列        Node node = addConditionWaiter();        // 获取释放的状态        int savedState = fullyRelease(node);        boolean interrupted = false;        while (!isOnSyncQueue(node)) { //             // 阻塞当前线程            LockSupport.park(this);            if (Thread.interrupted()) // 当前线程被中断                // 设置interrupted状态                interrupted = true;         }        if (acquireQueued(node, savedState) || interrupted) //             selfInterrupt();    }    /*        * For interruptible waits, we need to track whether to throw        * InterruptedException, if interrupted while blocked on        * condition, versus reinterrupt current thread, if        * interrupted while blocked waiting to re-acquire.        */    /** Mode meaning to reinterrupt on exit from wait */    private static final int REINTERRUPT =  1;    /** Mode meaning to throw InterruptedException on exit from wait */    private static final int THROW_IE    = -1;    /**        * Checks for interrupt, returning THROW_IE if interrupted        * before signalled, REINTERRUPT if after signalled, or        * 0 if not interrupted.        */    private int checkInterruptWhileWaiting(Node node) {        return Thread.interrupted() ?            (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :            0;     }    /**        * Throws InterruptedException, reinterrupts current thread, or        * does nothing, depending on mode.        */    private void reportInterruptAfterWait(int interruptMode)        throws InterruptedException {        if (interruptMode == THROW_IE)            throw new InterruptedException();        else if (interruptMode == REINTERRUPT)            selfInterrupt();    }    /**        * Implements interruptible condition wait.        * <ol>        * <li> If current thread is interrupted, throw InterruptedException.        * <li> Save lock state returned by {@link #getState}.        * <li> Invoke {@link #release} with saved state as argument,        *      throwing IllegalMonitorStateException if it fails.        * <li> Block until signalled or interrupted.        * <li> Reacquire by invoking specialized version of        *      {@link #acquire} with saved state as argument.        * <li> If interrupted while blocked in step 4, throw InterruptedException.        * </ol>        */    // // 等待,当前线程在接到信号或被中断之前一直处于等待状态    public final void await() throws InterruptedException {        if (Thread.interrupted()) // 当前线程被中断,抛出异常            throw new InterruptedException();        // 在wait队列上添加一个结点        Node node = addConditionWaiter();        //         int savedState = fullyRelease(node);        int interruptMode = 0;        while (!isOnSyncQueue(node)) {            // 阻塞当前线程            LockSupport.park(this);            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 检查结点等待时的中断类型                break;        }        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)            interruptMode = REINTERRUPT;        if (node.nextWaiter != null) // clean up if cancelled            unlinkCancelledWaiters();        if (interruptMode != 0)            reportInterruptAfterWait(interruptMode);    }    /**        * Implements timed condition wait.        * <ol>        * <li> If current thread is interrupted, throw InterruptedException.        * <li> Save lock state returned by {@link #getState}.        * <li> Invoke {@link #release} with saved state as argument,        *      throwing IllegalMonitorStateException if it fails.        * <li> Block until signalled, interrupted, or timed out.        * <li> Reacquire by invoking specialized version of        *      {@link #acquire} with saved state as argument.        * <li> If interrupted while blocked in step 4, throw InterruptedException.        * </ol>        */    // 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态     public final long awaitNanos(long nanosTimeout)            throws InterruptedException {        if (Thread.interrupted())            throw new InterruptedException();        Node node = addConditionWaiter();        int savedState = fullyRelease(node);        final long deadline = System.nanoTime() + nanosTimeout;        int interruptMode = 0;        while (!isOnSyncQueue(node)) {            if (nanosTimeout <= 0L) {                transferAfterCancelledWait(node);                break;            }            if (nanosTimeout >= spinForTimeoutThreshold)                LockSupport.parkNanos(this, nanosTimeout);            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)                break;            nanosTimeout = deadline - System.nanoTime();        }        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)            interruptMode = REINTERRUPT;        if (node.nextWaiter != null)            unlinkCancelledWaiters();        if (interruptMode != 0)            reportInterruptAfterWait(interruptMode);        return deadline - System.nanoTime();    }    /**        * Implements absolute timed condition wait.        * <ol>        * <li> If current thread is interrupted, throw InterruptedException.        * <li> Save lock state returned by {@link #getState}.        * <li> Invoke {@link #release} with saved state as argument,        *      throwing IllegalMonitorStateException if it fails.        * <li> Block until signalled, interrupted, or timed out.        * <li> Reacquire by invoking specialized version of        *      {@link #acquire} with saved state as argument.        * <li> If interrupted while blocked in step 4, throw InterruptedException.        * <li> If timed out while blocked in step 4, return false, else true.        * </ol>        */    // 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态    public final boolean awaitUntil(Date deadline)            throws InterruptedException {        long abstime = deadline.getTime();        if (Thread.interrupted())            throw new InterruptedException();        Node node = addConditionWaiter();        int savedState = fullyRelease(node);        boolean timedout = false;        int interruptMode = 0;        while (!isOnSyncQueue(node)) {            if (System.currentTimeMillis() > abstime) {                timedout = transferAfterCancelledWait(node);                break;            }            LockSupport.parkUntil(this, abstime);            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)                break;        }        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)            interruptMode = REINTERRUPT;        if (node.nextWaiter != null)            unlinkCancelledWaiters();        if (interruptMode != 0)            reportInterruptAfterWait(interruptMode);        return !timedout;    }    /**        * Implements timed condition wait.        * <ol>        * <li> If current thread is interrupted, throw InterruptedException.        * <li> Save lock state returned by {@link #getState}.        * <li> Invoke {@link #release} with saved state as argument,        *      throwing IllegalMonitorStateException if it fails.        * <li> Block until signalled, interrupted, or timed out.        * <li> Reacquire by invoking specialized version of        *      {@link #acquire} with saved state as argument.        * <li> If interrupted while blocked in step 4, throw InterruptedException.        * <li> If timed out while blocked in step 4, return false, else true.        * </ol>        */    // 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0    public final boolean await(long time, TimeUnit unit)            throws InterruptedException {        long nanosTimeout = unit.toNanos(time);        if (Thread.interrupted())            throw new InterruptedException();        Node node = addConditionWaiter();        int savedState = fullyRelease(node);        final long deadline = System.nanoTime() + nanosTimeout;        boolean timedout = false;        int interruptMode = 0;        while (!isOnSyncQueue(node)) {            if (nanosTimeout <= 0L) {                timedout = transferAfterCancelledWait(node);                break;            }            if (nanosTimeout >= spinForTimeoutThreshold)                LockSupport.parkNanos(this, nanosTimeout);            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)                break;            nanosTimeout = deadline - System.nanoTime();        }        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)            interruptMode = REINTERRUPT;        if (node.nextWaiter != null)            unlinkCancelledWaiters();        if (interruptMode != 0)            reportInterruptAfterWait(interruptMode);        return !timedout;    }    //  support for instrumentation    /**        * Returns true if this condition was created by the given        * synchronization object.        *        * @return {@code true} if owned        */    final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {        return sync == AbstractQueuedSynchronizer.this;    }    /**        * Queries whether any threads are waiting on this condition.        * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.        *        * @return {@code true} if there are any waiting threads        * @throws IllegalMonitorStateException if {@link #isHeldExclusively}        *         returns {@code false}        */    //  查询是否有正在等待此条件的任何线程    protected final boolean hasWaiters() {        if (!isHeldExclusively())            throw new IllegalMonitorStateException();        for (Node w = firstWaiter; w != null; w = w.nextWaiter) {            if (w.waitStatus == Node.CONDITION)                return true;        }        return false;    }    /**        * Returns an estimate of the number of threads waiting on        * this condition.        * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.        *        * @return the estimated number of waiting threads        * @throws IllegalMonitorStateException if {@link #isHeldExclusively}        *         returns {@code false}        */    // 返回正在等待此条件的线程数估计值    protected final int getWaitQueueLength() {        if (!isHeldExclusively())            throw new IllegalMonitorStateException();        int n = 0;        for (Node w = firstWaiter; w != null; w = w.nextWaiter) {            if (w.waitStatus == Node.CONDITION)                ++n;        }        return n;    }    /**        * Returns a collection containing those threads that may be        * waiting on this Condition.        * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.        *        * @return the collection of threads        * @throws IllegalMonitorStateException if {@link #isHeldExclusively}        *         returns {@code false}        */    // 返回包含那些可能正在等待此条件的线程集合    protected final Collection<Thread> getWaitingThreads() {        if (!isHeldExclusively())            throw new IllegalMonitorStateException();        ArrayList<Thread> list = new ArrayList<Thread>();        for (Node w = firstWaiter; w != null; w = w.nextWaiter) {            if (w.waitStatus == Node.CONDITION) {                Thread t = w.thread;                if (t != null)                    list.add(t);            }        }        return list;    }}

此类实现了Condition接口,Condition接口定义了条件操作规范,具体如下

public interface Condition {    // 等待,当前线程在接到信号或被中断之前一直处于等待状态    void await() throws InterruptedException;        // 等待,当前线程在接到信号之前一直处于等待状态,不响应中断    void awaitUninterruptibly();        //等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态     long awaitNanos(long nanosTimeout) throws InterruptedException;        // 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0    boolean await(long time, TimeUnit unit) throws InterruptedException;        // 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态    boolean awaitUntil(Date deadline) throws InterruptedException;        // 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。    void signal();        // 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。    void signalAll();}

Condition接口中定义了await、signal方法,用来等待条件、释放条件。之后会详细分析CondtionObject的源码。

标签: #java面试pdai