龙空技术网

java并发编程之深入学习Concurrent包(三,Condition接口)

搞笑暖暖 58

前言:

此时咱们对“htmlcondition”可能比较讲究,朋友们都需要分析一些“htmlcondition”的相关资讯。那么小编同时在网络上汇集了一些关于“htmlcondition””的相关知识,希望姐妹们能喜欢,咱们一起来了解一下吧!

引言:

Condition在Concurrent包中,主要用于替代以前对象Object上的wait()、notify()等方法实现线程间的协作。

相比wait()、notify(),Condition根据和Lock的结合,可以实现更复杂和精细的线程协同和等待。

Condition包含了接口和在AbstractQueuedSynchronizer类中的ConditionObject类的实现,今天一起学习下这两块的使用。

Condition接口的使用:

Condition的使用,需要结合Lock锁一起使用,通过锁获取Condition的实例,进行线程的等待和唤醒。

闲话不说,请看下接口的源码:

public interface Condition {

void await() throws InterruptedException;

void awaitUninterruptibly();

long awaitNanos(long nanosTimeout) throws InterruptedException;

boolean await(long time, TimeUnit unit) throws InterruptedException;

boolean awaitUntil(Date deadline) throws InterruptedException;

void signal();

void signalAll();

}

如上所示,主要方法如下:await(),awaitUninterruptibly(),awaitNanos(long nanosTimeout),await(long time, TimeUnit unit)和awaitUntil(Date deadline),signal() 和signalAll()方法。

await()方法:让当前线程进入等待,直到线程接收到唤醒信号或被中断

awaitUninterruptibly()方法:让当前线程一直等待,直到被中断

awaitNanos(long nanosTimeout)方法:当前线程等待一定时间(nanosTimeout)后自动唤醒,或者被唤醒或者被中断

await(long time, TimeUnit unit)方法:当前线程等待一定时间(指定时间和时间单位)后自动唤醒,或者被唤醒或者被中断

awaitUntil(Date deadline) 方法:当前线程等待到某个时间点(deadline)后自动唤醒,或者被唤醒或者被中断

signal() 方法:唤醒在condition处等待的线程。

signalAll()方法:唤醒所有等待线程。

Condition到底如何得好,口说无凭,看以下这个例子。

来源于javadoc,地址:

class BoundedBuffer {

final Lock lock = new ReentrantLock();

final Condition notFull = lock.newCondition();

final Condition notEmpty = lock.newCondition();

final Object[] items = new Object[100];

int putptr, takeptr, count;

public void put(Object x) throws InterruptedException {

lock.lock();

try {

while (count == items.length)

notFull.await();

items[putptr] = x;

if (++putptr == items.length) putptr = 0;

++count;

notEmpty.signal();

} finally {

lock.unlock();

}

}

public Object take() throws InterruptedException {

lock.lock();

try {

while (count == 0)

notEmpty.await();

Object x = items[takeptr];

if (++takeptr == items.length) takeptr = 0;

--count;

notFull.signal();

return x;

} finally {

lock.unlock();

}

}

}

如上面的例子所示,对一个数组实现读写操作,实现了两个Condition,notEmpty负责当数组为空时,让来读数组的线程等待,直到数组有写入,唤醒等待读的线程。notFull负责当数组写满时,让写数组的线程等待,直到数组有被读出。

下面我们看AQS中Condition的内部实现类,这是一个被实现的Condition接口的类。

AbstractQueuedSynchronizer.ConditionObject类

首先,AbstractQueuedSynchronizer是一个虚拟类,同步器,通俗的说,就是实现让等待的线程进行first-in-first-out (FIFO) 排队的类。

首先,这个类中包含了一个Node内部类,

先来看看这个内部类的源码:

static final class Node {

/** Marker to indicate a node is waiting in shared mode */

static final Node SHARED = new Node();

/** Marker to indicate a node is waiting in exclusive mode */

static final Node EXCLUSIVE = null;

/** waitStatus value to indicate thread has cancelled */

static final int CANCELLED = 1;

/** waitStatus value to indicate successor's thread needs unparking */

static final int SIGNAL = -1;

/** waitStatus value to indicate thread is waiting on condition */

static final int CONDITION = -2;

/**

* waitStatus value to indicate the next acquireShared should

* unconditionally propagate

*/

static final int PROPAGATE = -3;

volatile int waitStatus;

volatile Node prev;

volatile Node next;

volatile Thread thread;

Node nextWaiter;

/**

* Returns true if node is waiting in shared mode.

*/

final boolean isShared() {

return nextWaiter == SHARED;

}

final Node predecessor() throws NullPointerException {

Node p = prev;

if (p == null)

throw new NullPointerException();

else

return p;

}

Node() { // Used to establish initial head or SHARED marker

}

Node(Thread thread, Node mode) { // Used by addWaiter

this.nextWaiter = mode;

this.thread = thread;

}

Node(Thread thread, int waitStatus) { // Used by Condition

this.waitStatus = waitStatus;

this.thread = thread;

}

}

/**

* Head of the wait queue, lazily initialized. Except for

* initialization, it is modified only via method setHead. Note:

* If head exists, its waitStatus is guaranteed not to be

* CANCELLED.

*/

private transient volatile Node head;

/**

* Tail of the wait queue, lazily initialized. Modified only via

* method enq to add new wait node.

*/

private transient volatile Node tail;

/**

* The synchronization state.

*/

private volatile int state;

....

....

....

这个结构一看就很熟悉,对的,这是一个双向链表的节点类,节点内容就是一个Thread的对象,再加上一头一尾,这就是这个队列的基本元素。

接线来,我们再来看我们关心的ConditionObject类,

闲话不说先放源码:

public class ConditionObject implements Condition, java.io.Serializable {

private static final long serialVersionUID = 1173984872572414699L;

/** First node of condition queue. */

private transient Node firstWaiter;

/** Last node of condition queue. */

private transient Node lastWaiter;

public ConditionObject() { }

private Node addConditionWaiter() {

Node t = lastWaiter;

// If lastWaiter is cancelled, clean out.

if (t != null && t.waitStatus != Node.CONDITION) {

unlinkCancelledWaiters();

t = lastWaiter;

}

Node node = new Node(Thread.currentThread(), Node.CONDITION);

if (t == null)

firstWaiter = node;

else

t.nextWaiter = node;

lastWaiter = node;

return node;

}

private void doSignal(Node first) {

do {

if ( (firstWaiter = first.nextWaiter) == null)

lastWaiter = null;

first.nextWaiter = null;

} while (!transferForSignal(first) &&

(first = firstWaiter) != null);

}

private void doSignalAll(Node first) {

lastWaiter = firstWaiter = null;

do {

Node next = first.nextWaiter;

first.nextWaiter = null;

transferForSignal(first);

first = next;

} while (first != null);

}

public final void signal() {

if (!isHeldExclusively())

throw new IllegalMonitorStateException();

Node first = firstWaiter;

if (first != null)

doSignal(first);

}

public final void signalAll() {

if (!isHeldExclusively())

throw new IllegalMonitorStateException();

Node first = firstWaiter;

if (first != null)

doSignalAll(first);

}

public final void awaitUninterruptibly() {

Node node = addConditionWaiter();

int savedState = fullyRelease(node);

boolean interrupted = false;

while (!isOnSyncQueue(node)) {

LockSupport.park(this);

if (Thread.interrupted())

interrupted = true;

}

if (acquireQueued(node, savedState) || interrupted)

selfInterrupt();

}

private static final int THROW_IE = -1;

private int checkInterruptWhileWaiting(Node node) {

return Thread.interrupted() ?

(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :

0;

}

private void reportInterruptAfterWait(int interruptMode)

throws InterruptedException {

if (interruptMode == THROW_IE)

throw new InterruptedException();

else if (interruptMode == REINTERRUPT)

selfInterrupt();

}

public final void await() throws InterruptedException {

if (Thread.interrupted())

throw new InterruptedException();

Node node = addConditionWaiter();

int savedState = fullyRelease(node);

int interruptMode = 0;

while (!isOnSyncQueue(node)) {

LockSupport.park(this);

if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

break;

}

if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

interruptMode = REINTERRUPT;

if (node.nextWaiter != null) // clean up if cancelled

unlinkCancelledWaiters();

if (interruptMode != 0)

reportInterruptAfterWait(interruptMode);

}

final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {

return sync == AbstractQueuedSynchronizer.this;

}

protected final boolean hasWaiters() {

if (!isHeldExclusively())

throw new IllegalMonitorStateException();

for (Node w = firstWaiter; w != null; w = w.nextWaiter) {

if (w.waitStatus == Node.CONDITION)

return true;

}

return false;

}

protected final int getWaitQueueLength() {

if (!isHeldExclusively())

throw new IllegalMonitorStateException();

int n = 0;

for (Node w = firstWaiter; w != null; w = w.nextWaiter) {

if (w.waitStatus == Node.CONDITION)

++n;

}

return n;

}

protected final Collection<Thread> getWaitingThreads() {

if (!isHeldExclusively())

throw new IllegalMonitorStateException();

ArrayList<Thread> list = new ArrayList<Thread>();

for (Node w = firstWaiter; w != null; w = w.nextWaiter) {

if (w.waitStatus == Node.CONDITION) {

Thread t = w.thread;

if (t != null)

list.add(t);

}

}

return list;

}

}

我们先看这里的await()是如何让一个线程等待的:

步骤1,调用 addConditionWaiter(),并在方法中调用Node node = new Node(Thread.currentThread(), Node.CONDITION),使当前线程被包装为一个Node,并加入到等待队列中,至于链表头尾之类的处理,不再赘述。

步骤2,释放占有的锁,并保存占有锁的状态

步骤3,如果线程没有在同步队列(即没被唤醒),则线程阻塞LockSupport.park,这个线程一直被阻塞在while中,直到被它被唤醒。

条件队列的结构如下所示:

Conditin等待队列.jpg


而唤醒的过程则如下所示:

private void doSignal(Node first) {

do {

if ( (firstWaiter = first.nextWaiter) == null)

lastWaiter = null;

first.nextWaiter = null;

} while (!transferForSignal(first) &&

(first = firstWaiter) != null);

}

可以看出,从第一个线程开始进行唤醒。

以上就是Condition的主要内容,还有很多关于同步器的细节,后续章节继续讲解。

感兴趣的小伙伴,请关注下本人公众号:暖爸的java家园


标签: #htmlcondition