龙空技术网

异步编程利器CompletableFuture,实现多线程间执行结果的传递

IvanLan 952

前言:

而今兄弟们对“java多线程异步执行”大概比较关心,看官们都需要知道一些“java多线程异步执行”的相关内容。那么小编在网上汇集了一些有关“java多线程异步执行””的相关知识,希望看官们能喜欢,朋友们快快来学习一下吧!

在软件开发中,为了加快任务处理速度,一些业务场景我们需要使用多线程异步执行任务的。而有些任务结束后,我们是需要获取任务结果的。那么如何获取执行结果呢?

一、回顾Future

Future接口在Java 5中被引入,作为Java中处理异步任务的一种机制,用于描述一个异步计算的结果。

以下是以一个示例回顾一下Future的使用:

import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;@Slf4jpublic class FutureExample {    public static void main(String[] args) throws ExecutionException, InterruptedException {        ExecutorService executor = Executors.newFixedThreadPool(2);        long s = System.currentTimeMillis();        Future<String> future = executor.submit(() -> {            Thread.sleep(5000);            return "Result";        });        log.info("Task submitted");        String result = future.get();        log.info("Result: {} costTime: {} ms.", result, (System.currentTimeMillis()-s));        executor.shutdown();    }}

当我们运行上面的代码时,会看到下面的输出:

17:59:55.000 [main] INFO com.test.FutureExample - Task submitted17:59:59.999 [main] INFO com.test.FutureExample - Result: Result costTime: 5086 ms.

在这个示例中,我们使用ExecutorService.submit方法提交了一个Callable对象,并返回一个Future对象。然后使用future.get()方法等待任务完成,并从Future对象中获取结果。

虽然 Future 提供了获取异步执行任务处理结果的能力,但是对于结果的获取却很不方便。而且Future无法解决多个异步任务需要相互依赖的场景。比如,如果主线程需要等待子任务执行完之后再继续执行,我们还需要另外借助 CountDownLatch 来实现。例如:

import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;@Slf4jpublic class FutureExample {    public static void main(String[] args) throws ExecutionException, InterruptedException {        ExecutorService executor = Executors.newFixedThreadPool(2);        long s = System.currentTimeMillis();        CountDownLatch downLatch = new CountDownLatch(2);        Future<String> task1Future = executor.submit(() -> {            Thread.sleep(5000);            downLatch.countDown();            return "Result 1";        });        Future<String> task2Future = executor.submit(() -> {            Thread.sleep(5000);            downLatch.countDown();            return "Result 2";        });        log.info("Task submitted");        String result1 = task1Future.get();        String result2 = task2Future.get();        log.info("Result1: {}", result1);        log.info("Result2: {}", result2);        log.info("costTime: {} ms.", (System.currentTimeMillis()-s));        executor.shutdown();    }}

当我们运行上面的代码时,会看到下面的输出:

18:24:26.183 [main] INFO com.test.FutureExample - Result1: Result 118:24:26.189 [main] INFO com.test.FutureExample - Result2: Result 218:24:26.189 [main] INFO com.test.FutureExample - costTime: 5144 ms.
二、异步编程利器 CompletableFuture

在Java 8中,引入了一种更强大的异步编程工具CompletableFuture。它是一种Future的扩展,提供了一种更方便的方式来编写异步代码,并允许您轻松地组合多个异步操作。

通过CompletableFuture实现上面示例

import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;@Slf4jpublic class FutureExample {    public static void main(String[] args) throws ExecutionException, InterruptedException {        long s = System.currentTimeMillis();        CompletableFuture<String> task1Future = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(5000);            } catch (InterruptedException e) {                e.printStackTrace();            }            return "Result 1";        });        CompletableFuture<String> task2Future = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(5000);            } catch (InterruptedException e) {                e.printStackTrace();            }            return "Result 2";        });        log.info("Task submitted");        CompletableFuture<Void> allOf = CompletableFuture.allOf(task1Future, task2Future);        log.info("allOf: {}", allOf.get());        log.info("costTime: {} ms.", (System.currentTimeMillis()-s));    }}

当我们运行上面的代码时,会看到下面的输出:

11:13:10.743 [main] INFO com.test.FutureExample - Task submitted11:13:15.732 [main] INFO com.test.FutureExample - allOf: null11:13:15.738 [main] INFO com.test.FutureExample - costTime: 5108 ms.

可以看到,通过CompletableFuture可以很轻松的实现CountDownLatch的功能。

除此之外,它还有更强大的功能,它可以支持链式操作,还可以在执行异步操作时指定回调函数,支持任务间执行结果的传递,在操作完成后自动执行回调。下面我们以不同的例子来详细了解它的强大方便之处!先看一个图,例子里会包含图示中的所有内容。

CompletableFuture 使用

CompletableFuture 执行异步任务的方法

CompletableFuture 可以通过两种方法来执行异步任务:

supplyAsync: 有返回值。runAsync: 没有返回值。

//使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)//传入自定义线程池,根据supplier构建执行任务public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)//使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务public static CompletableFuture<Void> runAsync(Runnable runnable) //传入自定义线程池,根据runnable构建执行任务public static CompletableFuture<Void> runAsync(Runnable runnable,  Executor executor)
CompletableFuture 结果获取的方式
//阻塞等待返回,执行结果抛出的异常为ExecutionExceptionpublic T get()//如果在指定时间内未获取结果将抛出超时异常public T get(long timeout, TimeUnit unit)//立即获取结果不阻塞,如果未完成计算将返回设定的valueIfAbsent值public T getNow(T valueIfAbsent)//执行结果抛出的异常为CompletionExceptionpublic T join()

示例:

import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;@Slf4jpublic class FutureExample {    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {        long s = System.currentTimeMillis();        //1. getNow        CompletableFuture<String> task1Future = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(5000);            } catch (InterruptedException e) {                e.printStackTrace();            }            return "Result 1";        });        log.info("================= getNow: {}", task1Future.getNow("Result now"));        log.info("================= get: {}", task1Future.get());        //2. get with timeout        CompletableFuture<String> task2Future = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(5000);            } catch (InterruptedException e) {                e.printStackTrace();            }            return "Result 2";        });        try {            log.info("get with timeout: {}", task2Future.get(1, TimeUnit.SECONDS));        } catch (Exception e) {            log.error("================= get with timeout", e);        }        //3. get        CompletableFuture<String> getFuture = CompletableFuture.supplyAsync(() -> {            throw new RuntimeException("compute error");        });        try {            log.info("getFuture: {}", getFuture.get());        } catch (Exception e) {            log.error("================= getFuture", e);        }        //4. join        CompletableFuture<String> joinFuture = CompletableFuture.supplyAsync(() -> {            throw new RuntimeException("compute error");        });        try {            log.info("joinFuture: {}", joinFuture.join());        } catch (Exception e) {            log.error("================= joinFuture", e);        }        log.info("costTime: {} ms.", (System.currentTimeMillis()-s));    }}

当我们运行上面的代码时,会看到下面的输出:

22:07:22.753 [main] INFO com.test.FutureExample - ================= getNow: Result now22:07:27.753 [main] INFO com.test.FutureExample - ================= get: Result 122:07:28.766 [main] ERROR com.test.FutureExample - ================= get with timeoutjava.util.concurrent.TimeoutException: null	at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)	at com.test.FutureExample.main(FutureExample.java:40)22:07:28.767 [main] ERROR com.test.FutureExample - ================= getFuturejava.util.concurrent.ExecutionException: java.lang.RuntimeException: compute error	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)	at com.test.FutureExample.main(FutureExample.java:50)Caused by: java.lang.RuntimeException: compute error	at com.test.FutureExample.lambda$main$2(FutureExample.java:47)	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)	at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)22:07:28.768 [main] ERROR com.test.FutureExample - ================= joinFuturejava.util.concurrent.CompletionException: java.lang.RuntimeException: compute error	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)	at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)Caused by: java.lang.RuntimeException: compute error	at com.test.FutureExample.lambda$main$3(FutureExample.java:57)	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)	... 5 common frames omitted22:07:28.768 [main] INFO com.test.FutureExample - costTime: 6108 ms.

CompletableFuture 异步回调方法

CompletableFuture支持链式操作,可以在执行异步操作时指定回调函数,在操作完成后自动执行回调。以下是一些回调方法:

(1)thenRun/thenRunAsync 没有入参,没有返回

(2)thenAccept/thenAcceptAsync 有入参,没有返回

(3)thenApply/thenApplyAsync 有入参,也有返回

通俗点讲就是做完一个任务后,可以通过以上方法再做第二个任务。方法有没有Async结尾的区别在于:

如果你执行第一个任务的时候,传入了一个自定义线程池:

用没有 Async 结尾的方法去执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。用有 Async 结尾的方法去执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池。

示例:

import lombok.extern.slf4j.Slf4j;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.*;@Slf4jpublic class FutureExample {    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();        executor.setCorePoolSize(5);        executor.setMaxPoolSize(5);        executor.setThreadNamePrefix("CustomPool-");        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());        executor.initialize();        long s = System.currentTimeMillis();        //1. thenRun/thenRunAsync 没有没有,没有返回值        CompletableFuture thenRunFuture = CompletableFuture.runAsync(() -> {            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }            log.info("runAsync =========== Step 1");        }, executor).thenRun(() -> {            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }            log.info("thenRun =========== Step 2");        }).thenRunAsync(() -> {            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }            log.info("thenRunAsync =========== Step 3");        });        log.info("-------------------------------------------- thenRunFuture: {}", thenRunFuture.get());        //2. thenAccept/thenAcceptAsync 有入参,没有返回        CompletableFuture thenAcceptFuture = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }            log.info("supplyAsync =========== Step 1");            return "Result 1";        }, executor).thenAccept(result -> {            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }            log.info("上一步结果:" + result);            log.info("thenAccept =========== Step 2");        });        log.info("-------------------------------------------- thenAcceptFuture: {}", thenAcceptFuture.get());        //3. thenApply/thenApplyAsync 有入参,也有返回        CompletableFuture thenApplyFuture = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }            log.info("supplyAsync =========== Step 1");            return "Result 1";        }, executor).thenApply(result -> {            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }            log.info("上一步结果:" + result);            log.info("thenAccept =========== Step 2");            return "Result 2";        });        log.info("-------------------------------------------- thenApplyFuture: {}", thenApplyFuture.get());        log.info("Task submitted costTime: {} ms.", (System.currentTimeMillis()-s));    }}

当我们运行上面的代码时,会看到下面的输出:

22:53:32.047 [main] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService22:53:33.147 [CustomPool-1] INFO com.test.FutureExample - runAsync =========== Step 122:53:34.151 [CustomPool-1] INFO com.test.FutureExample - thenRun =========== Step 222:53:35.155 [ForkJoinPool.commonPool-worker-1] INFO com.test.FutureExample - thenRunAsync =========== Step 322:53:35.156 [main] INFO com.test.FutureExample - -------------------------------------------- thenRunFuture: null22:53:36.162 [CustomPool-2] INFO com.test.FutureExample - supplyAsync =========== Step 122:53:37.165 [CustomPool-2] INFO com.test.FutureExample - 上一步结果:Result 122:53:37.165 [CustomPool-2] INFO com.test.FutureExample - thenAccept =========== Step 222:53:37.166 [main] INFO com.test.FutureExample - -------------------------------------------- thenAcceptFuture: null22:53:38.169 [CustomPool-3] INFO com.test.FutureExample - supplyAsync =========== Step 122:53:39.172 [CustomPool-3] INFO com.test.FutureExample - 上一步结果:Result 122:53:39.172 [CustomPool-3] INFO com.test.FutureExample - thenAccept =========== Step 222:53:39.172 [main] INFO com.test.FutureExample - -------------------------------------------- thenApplyFuture: Result 222:53:39.172 [main] INFO com.test.FutureExample - Task submitted costTime: 7109 ms.

从上面结果可以看到:

thenRunAsync用的是ForkJoinPool.commonPool。其它没调Async的都是用CustomPool。另外:

thenRun 是没有入参,没有返回的,即第一个任务执行完成后,不会将第一个任务的执行结果,作为入参,传递到 thenRun 回调方法中,回调方法也是没有返回值的。

thenAccept 是有入参,没有返回,即第一个任务执行完成后,会将第一个任务的执行结果,作为入参,传递到 thenAccept 回调方法中,但是回调方法是没有返回值的。

thenApply 是有入参,也有返回。即第一个任务执行完成后,会将第一个任务的执行结果,作为入参,传递到 thenApply 回调方法中,回调方法也是有返回值的。

(4)whenComplete

CompletableFuture 的任务不论是正常完成还是出现异常它都会调用 whenComplete 这个回调函数。whenComplete 有两个入参 result, throwable。

正常完成时:whenComplete 入参result为上一个任务返回结果,throwable为null。出现异常时:whenComplete 入参result为null,throwable为上一个任务的异常。

示例:

import lombok.extern.slf4j.Slf4j;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.*;@Slf4jpublic class FutureExample {    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();        executor.setCorePoolSize(5);        executor.setMaxPoolSize(5);        executor.setThreadNamePrefix("CustomPool-");        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());        executor.initialize();        long s = System.currentTimeMillis();        //1. 正常完成        CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }            log.info("supplyAsync =========== Step 1");            return "Result 1";        }, executor).whenComplete((result, throwable) -> {            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }            log.info("上一步结果:" + result);            log.info("throwable:" + throwable);            log.info("whenComplete =========== Step 2");        });        log.info("-------------------------------------------- completableFuture1: {}", completableFuture1.get());        //2. 出现异常        CompletableFuture completableFuture2 = CompletableFuture.supplyAsync(() -> {            log.info("supplyAsync =========== Step 1");            throw new RuntimeException("compute error");        }, executor).whenComplete((result, throwable) -> {            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }            log.info("上一步结果:" + result);            log.info("throwable:" + throwable);            log.info("whenComplete =========== Step 2");        });        log.info("-------------------------------------------- completableFuture2: {}", completableFuture2.get());        log.info("Task submitted costTime: {} ms.", (System.currentTimeMillis()-s));    }}

当我们运行上面的代码时,会看到下面的输出:

23:34:56.631 [main] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService23:34:57.718 [CustomPool-1] INFO com.test.FutureExample - supplyAsync =========== Step 123:34:58.722 [CustomPool-1] INFO com.test.FutureExample - 上一步结果:Result 123:34:58.722 [CustomPool-1] INFO com.test.FutureExample - throwable:null23:34:58.722 [CustomPool-1] INFO com.test.FutureExample - whenComplete =========== Step 223:34:58.723 [main] INFO com.test.FutureExample - -------------------------------------------- completableFuture1: Result 123:34:58.727 [CustomPool-2] INFO com.test.FutureExample - supplyAsync =========== Step 1Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: compute error	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)	at com.test.FutureExample.main(FutureExample.java:62)Caused by: java.lang.RuntimeException: compute error	at com.test.FutureExample.lambda$main$2(FutureExample.java:51)	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)	at java.lang.Thread.run(Thread.java:748)23:34:59.731 [CustomPool-2] INFO com.test.FutureExample - 上一步结果:null23:34:59.731 [CustomPool-2] INFO com.test.FutureExample - throwable:java.util.concurrent.CompletionException: java.lang.RuntimeException: compute error23:34:59.731 [CustomPool-2] INFO com.test.FutureExample - whenComplete =========== Step 2

whenComplete

(5)exceptionally

CompletableFuture的任务出现异常时才会调用 exceptionally 这个回调函数。exceptionally 只有一个入参 throwable,但会一个返回值。当出现异常时,exceptionally中会捕获该异常,并给出返回值。

示例:

@Slf4jpublic class FutureExample {    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();        executor.setCorePoolSize(5);        executor.setMaxPoolSize(5);        executor.setThreadNamePrefix("CustomPool-");        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());        executor.initialize();        long s = System.currentTimeMillis();        //1. 正常完成        CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }            log.info("supplyAsync =========== Step 1");            return "Result 1";        }, executor).whenComplete((result, throwable) -> {            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }            log.info("上一步结果:" + result);            log.info("throwable:" + throwable);            log.info("whenComplete =========== Step 2");        }).exceptionally(throwable -> {            log.info("throwable:" + throwable);            log.info("exceptionally =========== Step 3");            return "result 3";        });        log.info("-------------------------------------------- completableFuture1: {}", completableFuture1.get());        //2. 出现异常        CompletableFuture completableFuture2 = CompletableFuture.supplyAsync(() -> {            log.info("supplyAsync =========== Step 1");            throw new RuntimeException("compute error");        }, executor).whenComplete((result, throwable) -> {            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }            log.info("上一步结果:" + result);            log.info("throwable:" + throwable);            log.info("whenComplete =========== Step 2");        }).exceptionally(throwable -> {            log.info("throwable:" + throwable);            log.info("exceptionally =========== Step 3");            return "result 3";        });        log.info("-------------------------------------------- completableFuture2: {}", completableFuture2.get());        log.info("Task submitted costTime: {} ms.", (System.currentTimeMillis()-s));    }}

当我们运行上面的代码时,会看到下面的输出:

23:40:47.474 [main] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService23:40:48.563 [CustomPool-1] INFO com.test.FutureExample - supplyAsync =========== Step 123:40:49.565 [CustomPool-1] INFO com.test.FutureExample - 上一步结果:Result 123:40:49.565 [CustomPool-1] INFO com.test.FutureExample - throwable:null23:40:49.565 [CustomPool-1] INFO com.test.FutureExample - whenComplete =========== Step 223:40:49.565 [main] INFO com.test.FutureExample - -------------------------------------------- completableFuture1: Result 123:40:49.568 [CustomPool-2] INFO com.test.FutureExample - supplyAsync =========== Step 123:40:50.571 [CustomPool-2] INFO com.test.FutureExample - 上一步结果:null23:40:50.571 [CustomPool-2] INFO com.test.FutureExample - throwable:java.util.concurrent.CompletionException: java.lang.RuntimeException: compute error23:40:50.571 [CustomPool-2] INFO com.test.FutureExample - whenComplete =========== Step 223:40:50.572 [CustomPool-2] INFO com.test.FutureExample - throwable:java.util.concurrent.CompletionException: java.lang.RuntimeException: compute error23:40:50.572 [CustomPool-2] INFO com.test.FutureExample - exceptionally =========== Step 323:40:50.572 [main] INFO com.test.FutureExample - -------------------------------------------- completableFuture2: result 323:40:50.572 [main] INFO com.test.FutureExample - Task submitted costTime: 3091 ms.

exceptionally

多任务条件组合

AND组合(等待任务都完成)

(1)runAfterEither 没有入参,没有返回

(2)acceptEither 有入参,没有返回

(3)applyToEither 有入参,也有返回

(4)allOf 只要有一个任务完成就继续

示例:

import lombok.extern.slf4j.Slf4j;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.*;@Slf4jpublic class FutureExample {    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();        executor.setCorePoolSize(5);        executor.setMaxPoolSize(5);        executor.setThreadNamePrefix("CustomPool-");        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());        executor.initialize();        long s = System.currentTimeMillis();        //1. task1        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }            log.info("supplyAsync =========== Step 1");            return 1;        }, executor);        //2. task2        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }            log.info("supplyAsync =========== Step 2");            return 2;        }, executor);        //3. task3        CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(3000);            } catch (InterruptedException e) {                e.printStackTrace();            }            log.info("supplyAsync =========== Step 3");            return 3;        }, executor);        log.info("");        //4. runAfterBoth AND组合(task1、task2)        CompletableFuture runAfterBoth = task1.runAfterBoth(task2, () -> {            log.info("runAfterBoth =========== Step Both");        });        log.info("-------------------------------------------- AND组合 runAfterBoth Result: {}", runAfterBoth.get());        //5. thenAcceptBoth AND组合(task1、task2)        CompletableFuture thenAcceptBoth = task1.thenAcceptBoth(task2, (t1, t2) -> {            log.info("task1 Result:{}", t1);            log.info("task2 Result:{}", t2);            log.info("thenCombine =========== Step Both");        });        log.info("-------------------------------------------- AND组合 thenCombine Result: {}", thenAcceptBoth.get());        //6. thenCombine AND组合(task1、task2)        CompletableFuture<Integer> thenCombine = task1.thenCombine(task2, (t1, t2) -> {            log.info("task1 Result:{}", t1);            log.info("task2 Result:{}", t2);            log.info("thenCombine =========== Step Both");            return t1 + t2;        });        log.info("-------------------------------------------- AND组合 thenCombine Result: {}", thenCombine.get());        //7. allOf AND组合(task1、task2、task3)        CompletableFuture<Void> allOf = CompletableFuture.allOf(task1, task2, task3);        log.info("-------------------------------------------- AND组合 allOf Result: {}", allOf.get());        log.info("Task submitted costTime: {} ms.", (System.currentTimeMillis()-s));    }}

当我们运行上面的代码时,会看到下面的输出:

00:42:14.095 [main] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService00:42:14.185 [main] INFO com.test.FutureExample - 00:42:15.188 [CustomPool-2] INFO com.test.FutureExample - supplyAsync =========== Step 200:42:15.188 [CustomPool-1] INFO com.test.FutureExample - supplyAsync =========== Step 100:42:15.188 [CustomPool-1] INFO com.test.FutureExample - runAfterBoth =========== Step Both00:42:15.189 [main] INFO com.test.FutureExample - -------------------------------------------- AND组合 runAfterBoth Result: null00:42:15.192 [main] INFO com.test.FutureExample - task1 Result:100:42:15.192 [main] INFO com.test.FutureExample - task2 Result:200:42:15.192 [main] INFO com.test.FutureExample - thenCombine =========== Step Both00:42:15.193 [main] INFO com.test.FutureExample - -------------------------------------------- AND组合 thenCombine Result: null00:42:15.193 [main] INFO com.test.FutureExample - task1 Result:100:42:15.193 [main] INFO com.test.FutureExample - task2 Result:200:42:15.193 [main] INFO com.test.FutureExample - thenCombine =========== Step Both00:42:15.193 [main] INFO com.test.FutureExample - -------------------------------------------- AND组合 thenCombine Result: 300:42:17.189 [CustomPool-3] INFO com.test.FutureExample - supplyAsync =========== Step 300:42:17.190 [main] INFO com.test.FutureExample - -------------------------------------------- AND组合 allOf Result: null00:42:17.191 [main] INFO com.test.FutureExample - Task submitted costTime: 3090 ms.

AND 组合

OR组合(只要有一个任务完成)

(1)runAfterEither 没有入参,没有返回

(2)acceptEither 有入参,没有返回

(3)applyToEither 有入参,也有返回

(4)anyOf 只要有一个任务完成就继续

示例:

import lombok.extern.slf4j.Slf4j;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.*;@Slf4jpublic class FutureExample {    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();        executor.setCorePoolSize(5);        executor.setMaxPoolSize(5);        executor.setThreadNamePrefix("CustomPool-");        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());        executor.initialize();        long s = System.currentTimeMillis();        //1. task1        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }            log.info("supplyAsync =========== Step 1");            return 1;        }, executor);        //2. task2        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }            log.info("supplyAsync =========== Step 2");            return 2;        }, executor);        //3. task3        CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> {            try {                Thread.sleep(3000);            } catch (InterruptedException e) {                e.printStackTrace();            }            log.info("supplyAsync =========== Step 3");            return 3;        }, executor);        log.info("");        //4. runAfterEither OR组合(task1、task2)        CompletableFuture runAfterEither = task1.runAfterEither(task2, () -> {            log.info("runAfterEither =========== Step Either");        });        log.info("-------------------------------------------- OR组合 runAfterEither Result: {}", runAfterEither.get());        //5. acceptEither OR组合(task1、task2)        CompletableFuture acceptEither = task1.acceptEither(task2, (t) -> {            log.info("first completed Result:{}", t);            log.info("applyToEither =========== Step Either");        });        log.info("-------------------------------------------- OR组合 applyToEither Result: {}", acceptEither.get());        //6. applyToEither OR组合(task1、task2)        CompletableFuture<Integer> applyToEither = task1.applyToEither(task2, (t) -> {            log.info("first completed Result:{}", t);            log.info("applyToEither =========== Step Either");            return t;        });        log.info("-------------------------------------------- OR组合 applyToEither Result: {}", applyToEither.get());        //7. anyOf OR组合(task1、task2、task3)        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(task1, task2, task3);        //只要有一个有任务完成        log.info("-------------------------------------------- OR组合 anyOf Result: {}", anyOf.get());        log.info("Task submitted costTime: {} ms.", (System.currentTimeMillis()-s));    }}

当我们运行上面的代码时,会看到下面的输出:

00:49:56.887 [main] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService00:49:56.971 [main] INFO com.test.FutureExample - 00:49:57.971 [CustomPool-1] INFO com.test.FutureExample - supplyAsync =========== Step 100:49:57.972 [CustomPool-2] INFO com.test.FutureExample - supplyAsync =========== Step 200:49:57.972 [CustomPool-1] INFO com.test.FutureExample - runAfterEither =========== Step Either00:49:57.973 [main] INFO com.test.FutureExample - -------------------------------------------- OR组合 runAfterEither Result: null00:49:57.979 [main] INFO com.test.FutureExample - first completed Result:100:49:57.979 [main] INFO com.test.FutureExample - applyToEither =========== Step Either00:49:57.979 [main] INFO com.test.FutureExample - -------------------------------------------- OR组合 applyToEither Result: null00:49:57.980 [main] INFO com.test.FutureExample - first completed Result:100:49:57.980 [main] INFO com.test.FutureExample - applyToEither =========== Step Either00:49:57.980 [main] INFO com.test.FutureExample - -------------------------------------------- OR组合 applyToEither Result: 100:49:57.980 [main] INFO com.test.FutureExample - -------------------------------------------- OR组合 anyOf Result: 100:49:57.980 [main] INFO com.test.FutureExample - Task submitted costTime: 1086 ms.00:49:59.976 [CustomPool-3] INFO com.test.FutureExample - supplyAsync =========== Step 3

OR 组合

四、CompletableFuture使用注意点

CompletableFuture 使我们的异步编程更加便利的、代码更加优雅。但由于CompletableFuture中使用了默认的ForkJoin线程池,处理的线程个数是电脑CPU核数-1。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢。所以建议使用自定义线程池,优化线程池配置参数,并使用合适的线程池拒绝策略。另外,CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,最好添加超时时间。

标签: #java多线程异步执行