龙空技术网

常用的并发工具类

慢慢编程 941

前言:

现在大家对“java 并发工具”大概比较关注,同学们都需要知道一些“java 并发工具”的相关资讯。那么小编在网络上汇集了一些关于“java 并发工具””的相关文章,希望姐妹们能喜欢,姐妹们一起来了解一下吧!

在 JDK1.5 后,推出了几个并发的工具类,位于 JUC(java.util.concurrent)包下。

CountDownLatch

CountDownLatch 类是使一个线程等待其他线程各自执行完毕后再执行。

类似于现实中某个活动需要等到全部人齐了才可以开始。

实现原理:

基于 AQS 的共享模式。

从ReentrantLock的实现看AQS的原理及应用

这个类是一个同步计数器,主要用于线程间的控制。当 CountDownLatch 的 count 计数 > 0 时,本线程的 await() 会造成阻塞,直到 count 变为 0,开始执行本线程。

package test;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class Test1 {    public static void main(String[] args) {        final CountDownLatch latch = new CountDownLatch(2);      // 计数器初始化为 2,要等两个线程执行完毕        System.out.println("主线程开始执行");        ExecutorService es1 = Executors.newSingleThreadExecutor();        es1.execute(new Runnable() {            @Override            public void run() {                try {                    Thread.sleep(3000);                    System.out.println("子线程:" + Thread.currentThread().getName() + "执行");                }catch (InterruptedException e){                    e.printStackTrace();                }                latch.countDown();    // 使计数器减一            }        });        ExecutorService es2 = Executors.newSingleThreadExecutor();        es2.execute(new Runnable() {            @Override            public void run() {                try {                    Thread.sleep(3000);                }catch (InterruptedException e){                    e.printStackTrace();                }                System.out.println("子线程:" + Thread.currentThread().getName() + "执行");                latch.countDown();            }        });        System.out.println("等待两个线程执行完毕");        try {            latch.await();       // 主线程挂起,等待两个线程执行完        }catch (InterruptedException e){            e.printStackTrace();        }        System.out.println("两个子线程都执行完毕,继续执行主线程");    }}主线程开始执行                      等待两个线程执行完毕                子线程:pool-2-thread-1执行         子线程:pool-1-thread-1执行      两个子线程都执行完毕,继续执行主线程
CyclicBarrier

与 CountDownLatch 功能一样,不过它可以重复循环,而 CountDownLatch 只能执行一次。

实现原理:

基于 ReentrantLock 和 Condition

//同步操作锁private final ReentrantLock lock = new ReentrantLock();//线程拦截器private final Condition trip = lock.newCondition();//每次拦截的线程数private final int parties;//换代前执行的任务private final Runnable barrierCommand;//表示栅栏的当前代private Generation generation = new Generation();//计数器private int count;//静态内部类Generationprivate static class Generation {  boolean broken = false;
上面贴出了 CyclicBarrier 所有的成员变量,可以看到 CyclicBarrier 内部使通过条件 trip 来对线程进行阻塞。并且其内部维护了两个 int 型变量 parites 和 count,parties 表示每次拦截的线程数,该值在构造时进行赋值。count 是内部计数器,他的初始值和 parties 相同,以后随着每次 await 方法的调用而减一,直到减为零将唤醒主线程。CyclicBarrier 有一个静态内部类 Generation,该类的对象代表栅栏的当前代,就像玩游戏时代表的本局游戏,利用它可以实现循环等待。barrierCommand 表示换代前执行的任务,当 count 减为零时表示本局游戏结束,需要转到下一局。在转到下一局游戏之前,利用它可以实现循环等待。
package test;import java.util.concurrent.CyclicBarrier;public class Test2 {    static class TaskThread extends Thread{        CyclicBarrier barrier;        public TaskThread(CyclicBarrier barrier){            this.barrier = barrier;        }        @Override        public void run(){            try{                Thread.sleep(100);                System.out.println(getName() + "到达栅栏 A");                barrier.await();              // 等待所有线程都执行到这,才执行主线程                System.out.println(getName() + "冲破栅栏 A");   // 主线程完成后继续执行                Thread.sleep(2000);                System.out.println(getName() + "到达栅栏 B");                barrier.await();                System.out.println(getName() + "冲破栅栏 B");            }catch (Exception e){                e.printStackTrace();            }        }    }    public static void main(String[] args) {        int threadNum = 5;        CyclicBarrier barrier = new CyclicBarrier(threadNum, new Runnable() {            @Override            public void run() {                System.out.println(Thread.currentThread().getName() + "完成任务");            }        });        for (int i = 0; i < threadNum; i++) {            new TaskThread(barrier).start();        }    }}
Semaphore

该类用于控制信号量的个数,可以控制同时访问资源的线程个数,并提供了同步机制。例如,实现一个文件允许的并发访问数。

Semaphore 的主要方法:

acquire():从此信号量中获取一个许可,若已超过许可量,则阻塞此请求线程。release():释放一个许可,将其返回给信号量。availablePermits():返回此信号量中当前可用的许可数。hasQueuedThreads():查询是否有线程正在等待获取。

package test;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;public class Test {    public static void main(String[] args) {        ExecutorService service = Executors.newCachedThreadPool();        final Semaphore sp = new Semaphore(3);   // 创建 Semaphore 信号量,初始化许可大小为3        for (int i = 0; i < 10; i++) {            try {                Thread.sleep(100);            }catch (InterruptedException e){                e.printStackTrace();            }            Runnable runnable = new Runnable() {                @Override                public void run() {                    try {                        sp.acquire();  // 请求获取许可,如果有可获取许可,则继续往下指向,许可数减一。                    } catch (InterruptedException e){                        e.printStackTrace();                    }                    System.out.println("线程" + Thread.currentThread().getName() +                            "进入,当前已有" + (3 - sp.availablePermits()) + "个并发") ;                    try{                        Thread.sleep((long)(Math.random() * 10000));                    }catch (InterruptedException e){                        e.printStackTrace();                    }                    System.out.println("线程" + Thread.currentThread().getName() + "即将离开");                    sp.release();  // 释放许可证,许可数+1                }            };            service.execute(runnable);        }    }}
Exchanger

这个类用于交换数据,只能用于两个线程。当一个线程运行到 exchange() 方法时会阻塞,另一个线程运行到 exchange() 时,两者交换数据,然后执行后面的程序。

package test;import java.util.concurrent.Exchanger;public class Test3 {    static class Producer extends Thread{   // 生产者线程        private Exchanger<Integer> exchanger;     // 交换标志        private static int data = 0;        Producer(String name, Exchanger<Integer> exchanger){            super("Producer-" + name);            this.exchanger = exchanger;        }        @Override        public void run(){            for (int i = 1; i < 5; i++) {                try {                    Thread.sleep(1000);                    data = i;                    System.out.println(getName() + "交换前:" + data);                    data = exchanger.exchange(data);        // 将此 data 与 消费者的 data 进行交换                    System.out.println(getName() + "交换后:" + data);                }catch (InterruptedException e){                    e.printStackTrace();                }            }        }    }    static class Consumer extends Thread{   // 消费者线程        private Exchanger<Integer> exchanger;   // 交换标志        private static int data = 0;        Consumer(String name, Exchanger<Integer> exchanger){            super("Consumer-" + name);            this.exchanger = exchanger;        }        @Override        public void run(){            while(true){                data = 0;                System.out.println(getName() + "交换前:" + data);                try{                    data = exchanger.exchange(data);   // 将此 data 与生产者的 data 进行交换,因为先执行到这,会阻塞知道生产者执行到交换                }catch (InterruptedException e){                    e.printStackTrace();                }                System.out.println(getName() + "交换后:" + data);            }        }    }    public static void main(String[] args) throws InterruptedException {        Exchanger<Integer> exchanger = new Exchanger<>();        new Producer("", exchanger).start();        new Consumer("", exchanger).start();        Thread.sleep(7000);    }}

标签: #java 并发工具