龙空技术网

springboot多任务并行+线程池处理+等待获取执行结果

IT架构 248

前言:

此时朋友们对“java主线程等待线程池执行完成”大约比较看重,我们都需要剖析一些“java主线程等待线程池执行完成”的相关知识。那么小编同时在网络上网罗了一些对于“java主线程等待线程池执行完成””的相关知识,希望各位老铁们能喜欢,朋友们快快来学习一下吧!

在日常的开发项目过程中,时常会有多线程的使用场景。最近开发的需求中也是如此,只不过这次需要开启多线程去执行,最后要等所有线程结束统一获取结果。所以在此整理一下,我们目前用的是方法二。

image.png

Java 线程池

Java通过Executors提供四种线程池,分别为:newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。优点重用存在的线程,减少对象创建、消亡的开销,性能佳。可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。提供定时执行、定期执行、单线程、并发数控制等功能。
一、方法一(CountDownLatch)
public class StatsDemo {    final static SimpleDateFormat sdf = new SimpleDateFormat(            "yyyy-MM-dd HH:mm:ss");    final static String startTime = sdf.format(new Date());    /**     * IO密集型任务  = 一般为2*CPU核心数(常出现于线程中:数据库数据交互、文件上传下载、网络数据传输等等)     * CPU密集型任务 = 一般为CPU核心数+1(常出现于线程中:复杂算法)     * 混合型任务  = 视机器配置和复杂度自测而定     */    private static int corePoolSize = Runtime.getRuntime().availableProcessors();    /**     * public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,     *                           TimeUnit unit,BlockingQueue<Runnable> workQueue)     * corePoolSize用于指定核心线程数量     * maximumPoolSize指定最大线程数     * keepAliveTime和TimeUnit指定线程空闲后的最大存活时间     * workQueue则是线程池的缓冲队列,还未执行的线程会在队列中等待     * 监控队列长度,确保队列有界     * 不当的线程池大小会使得处理速度变慢,稳定性下降,并且导致内存泄露。如果配置的线程过少,则队列会持续变大,消耗过多内存。     * 而过多的线程又会 由于频繁的上下文切换导致整个系统的速度变缓——殊途而同归。队列的长度至关重要,它必须得是有界的,这样如果线程池不堪重负了它可以暂时拒绝掉新的请求。     * ExecutorService 默认的实现是一个无界的 LinkedBlockingQueue。     */    private static ThreadPoolExecutor executor  = new ThreadPoolExecutor(corePoolSize, corePoolSize+1, 10l, TimeUnit.SECONDS,            new LinkedBlockingQueue<Runnable>(1000));    public static void main(String[] args) throws InterruptedException {        CountDownLatch latch = new CountDownLatch(5);        //使用execute方法          executor.execute(new Stats("任务A", 1000, latch));          executor.execute(new Stats("任务B", 1000, latch));          executor.execute(new Stats("任务C", 1000, latch));          executor.execute(new Stats("任务D", 1000, latch));          executor.execute(new Stats("任务E", 1000, latch));        latch.await();// 等待所有人任务结束        System.out.println("所有的统计任务执行完成:" + sdf.format(new Date()));    }    static class Stats implements Runnable  {        String statsName;        int runTime;        CountDownLatch latch;        public Stats(String statsName, int runTime, CountDownLatch latch) {            this.statsName = statsName;            this.runTime = runTime;            this.latch = latch;        }        public void run() {            try {                System.out.println(statsName+ " do stats begin at "+ startTime);                //模拟任务执行时间                Thread.sleep(runTime);                System.out.println(statsName + " do stats complete at "+ sdf.format(new Date()));                latch.countDown();//单次任务结束,计数器减一            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }}
二、方法二(Future)

重点是和springboot整合,采用注解bean方式生成ThreadPoolTaskExecutor

在springboot项目中开启异步线程需要满足一下几点

在启动类加入异步线程注解@EnableAsync创建线程池并创建Bean实例1. 自定义线程池

@Configuration//@EnableAsyncpublic class ThreadPoolConfig{    /**     *   默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,     *  当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;     *  当队列满了,就继续创建线程,当线程数量大于等于maxPoolSize后,开始使用拒绝策略拒绝     */    /**     * 获得Java虚拟机可用的处理器个数 + 1     */    private static final int THREADS = Runtime.getRuntime().availableProcessors() + 1;    @Value("${async.executor.thread.core_pool_size:0}")    public static int corePoolSizeConfig;    // 核心线程池大小    public static int corePoolSize = corePoolSizeConfig ==0 ? THREADS : corePoolSizeConfig;    // 最大可创建的线程数    //@Value("${async.executor.thread.max_pool_size}")    private int maxPoolSize = 2 * THREADS;;    // 队列最大长度    //@Value("${async.executor.thread.queue_capacity}")    private int queueCapacity = 1024;    // 线程池维护线程所允许的空闲时间    //@Value("${async.executor.thread.keep_alive_seconds}")    private int keepAliveSeconds = 300;    //线程池名前缀     //@Value("${async.executor.thread.threadNamePrefix}")    private static final String threadNamePrefix = "Async-Service-";    @Bean(name = "threadPoolTaskExecutor")    public ThreadPoolTaskExecutor threadPoolTaskExecutor()    {        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();        executor.setMaxPoolSize(maxPoolSize);        executor.setCorePoolSize(corePoolSize);        executor.setQueueCapacity(queueCapacity);        executor.setKeepAliveSeconds(keepAliveSeconds);        executor.setThreadNamePrefix(threadNamePrefix);        // 线程池对拒绝任务(无线程可用)的处理策略       // CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());      // 初始化        executor.initialize();        return executor;    }}
2. 异步执行方法

启动类添加@EnableAsync注解

@SpringBootApplication@EnableAsync@EnableSchedulingpublic class DemoApplication {    public static void main(String[] args) {        SpringApplication.run(DemoApplication.class, args);    }}

service层方法

@Servicepublic class AsyncInvokeService {    @Async("threadPoolTaskExecutor")    public Future<Boolean> exec1(String name) {        System.out.println("子线程 name -->" + Thread.currentThread().getName());        System.out.println(name);        Thread.sleep(10000);        return new AsyncResult<>(true);    }    @Async("threadPoolTaskExecutor")    public Future<Boolean> exec2(String phone) {        System.out.println("子线程 name -->" + Thread.currentThread().getName());        System.out.println(phone);        Thread.sleep(10000);        return new AsyncResult<>(true);    }
3. 多线程执行返回结果
    @GetMapping("/gettest")    public String b() throws InterruptedException, ExecutionException {        Future<Boolean> future1 = asyncInvokeService.exec1("张三");        Future<Boolean> future2 = asyncInvokeService.exec2("15618881888");        List<Future<Boolean>> futureList = new ArrayList<>();              futureList.add(future1);        futureList.add(future2);        System.out.println(new Date());        //while (!sendMessage3.isDone() || !sendMessage4.isDone()){        //    Thread.sleep(50);        //    result += sendMessage3.get();        //    result += sendMessage4.get();        //}            //查询任务执行的结果                       for (Future<?> future : futureList) {                            while (true) {//CPU高速轮询:每个future都并发轮循,判断完成状态然后获取结果,这一行,是本实现方案的精髓所在。即有10个future在高速轮询,完成一个future的获取结果,就关闭一个轮询                     if (future.isDone() && !future.isCancelled()) { //获取future成功完成状态,如果想要限制每个任务的超时时间,取消本行的状态判断+future.get(1000*1, TimeUnit.MILLISECONDS)+catch超时异常使用即可。                         Boolean result = future.get();//获取结果                        System.out.println("任务i="+i+"获取完成!"+new Date());                        list.add(result);                        break;//当前future获取结果完毕,跳出while                    } else {                        Thread.sleep(1);//每次轮询休息1毫秒(CPU纳秒级),避免CPU高速轮循耗空CPU---》新手别忘记这个                    }        System.out.println(new Date());        return "执行成功!!";    }

标签: #java主线程等待线程池执行完成