前言:
而今兄弟们对“java多线程异步执行”大概比较关心,看官们都需要知道一些“java多线程异步执行”的相关内容。那么小编在网上汇集了一些有关“java多线程异步执行””的相关知识,希望看官们能喜欢,朋友们快快来学习一下吧!在软件开发中,为了加快任务处理速度,一些业务场景我们需要使用多线程异步执行任务的。而有些任务结束后,我们是需要获取任务结果的。那么如何获取执行结果呢?
一、回顾Future
Future接口在Java 5中被引入,作为Java中处理异步任务的一种机制,用于描述一个异步计算的结果。
以下是以一个示例回顾一下Future的使用:
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;@Slf4jpublic class FutureExample { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(2); long s = System.currentTimeMillis(); Future<String> future = executor.submit(() -> { Thread.sleep(5000); return "Result"; }); log.info("Task submitted"); String result = future.get(); log.info("Result: {} costTime: {} ms.", result, (System.currentTimeMillis()-s)); executor.shutdown(); }}
当我们运行上面的代码时,会看到下面的输出:
17:59:55.000 [main] INFO com.test.FutureExample - Task submitted17:59:59.999 [main] INFO com.test.FutureExample - Result: Result costTime: 5086 ms.
在这个示例中,我们使用ExecutorService.submit方法提交了一个Callable对象,并返回一个Future对象。然后使用future.get()方法等待任务完成,并从Future对象中获取结果。
虽然 Future 提供了获取异步执行任务处理结果的能力,但是对于结果的获取却很不方便。而且Future无法解决多个异步任务需要相互依赖的场景。比如,如果主线程需要等待子任务执行完之后再继续执行,我们还需要另外借助 CountDownLatch 来实现。例如:
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;@Slf4jpublic class FutureExample { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(2); long s = System.currentTimeMillis(); CountDownLatch downLatch = new CountDownLatch(2); Future<String> task1Future = executor.submit(() -> { Thread.sleep(5000); downLatch.countDown(); return "Result 1"; }); Future<String> task2Future = executor.submit(() -> { Thread.sleep(5000); downLatch.countDown(); return "Result 2"; }); log.info("Task submitted"); String result1 = task1Future.get(); String result2 = task2Future.get(); log.info("Result1: {}", result1); log.info("Result2: {}", result2); log.info("costTime: {} ms.", (System.currentTimeMillis()-s)); executor.shutdown(); }}
当我们运行上面的代码时,会看到下面的输出:
18:24:26.183 [main] INFO com.test.FutureExample - Result1: Result 118:24:26.189 [main] INFO com.test.FutureExample - Result2: Result 218:24:26.189 [main] INFO com.test.FutureExample - costTime: 5144 ms.二、异步编程利器 CompletableFuture
在Java 8中,引入了一种更强大的异步编程工具CompletableFuture。它是一种Future的扩展,提供了一种更方便的方式来编写异步代码,并允许您轻松地组合多个异步操作。
通过CompletableFuture实现上面示例
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;@Slf4jpublic class FutureExample { public static void main(String[] args) throws ExecutionException, InterruptedException { long s = System.currentTimeMillis(); CompletableFuture<String> task1Future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } return "Result 1"; }); CompletableFuture<String> task2Future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } return "Result 2"; }); log.info("Task submitted"); CompletableFuture<Void> allOf = CompletableFuture.allOf(task1Future, task2Future); log.info("allOf: {}", allOf.get()); log.info("costTime: {} ms.", (System.currentTimeMillis()-s)); }}
当我们运行上面的代码时,会看到下面的输出:
11:13:10.743 [main] INFO com.test.FutureExample - Task submitted11:13:15.732 [main] INFO com.test.FutureExample - allOf: null11:13:15.738 [main] INFO com.test.FutureExample - costTime: 5108 ms.
可以看到,通过CompletableFuture可以很轻松的实现CountDownLatch的功能。
除此之外,它还有更强大的功能,它可以支持链式操作,还可以在执行异步操作时指定回调函数,支持任务间执行结果的传递,在操作完成后自动执行回调。下面我们以不同的例子来详细了解它的强大方便之处!先看一个图,例子里会包含图示中的所有内容。
CompletableFuture 执行异步任务的方法
CompletableFuture 可以通过两种方法来执行异步任务:
supplyAsync: 有返回值。runAsync: 没有返回值。
//使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)//传入自定义线程池,根据supplier构建执行任务public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)//使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务public static CompletableFuture<Void> runAsync(Runnable runnable) //传入自定义线程池,根据runnable构建执行任务public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)CompletableFuture 结果获取的方式
//阻塞等待返回,执行结果抛出的异常为ExecutionExceptionpublic T get()//如果在指定时间内未获取结果将抛出超时异常public T get(long timeout, TimeUnit unit)//立即获取结果不阻塞,如果未完成计算将返回设定的valueIfAbsent值public T getNow(T valueIfAbsent)//执行结果抛出的异常为CompletionExceptionpublic T join()
示例:
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;@Slf4jpublic class FutureExample { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { long s = System.currentTimeMillis(); //1. getNow CompletableFuture<String> task1Future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } return "Result 1"; }); log.info("================= getNow: {}", task1Future.getNow("Result now")); log.info("================= get: {}", task1Future.get()); //2. get with timeout CompletableFuture<String> task2Future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } return "Result 2"; }); try { log.info("get with timeout: {}", task2Future.get(1, TimeUnit.SECONDS)); } catch (Exception e) { log.error("================= get with timeout", e); } //3. get CompletableFuture<String> getFuture = CompletableFuture.supplyAsync(() -> { throw new RuntimeException("compute error"); }); try { log.info("getFuture: {}", getFuture.get()); } catch (Exception e) { log.error("================= getFuture", e); } //4. join CompletableFuture<String> joinFuture = CompletableFuture.supplyAsync(() -> { throw new RuntimeException("compute error"); }); try { log.info("joinFuture: {}", joinFuture.join()); } catch (Exception e) { log.error("================= joinFuture", e); } log.info("costTime: {} ms.", (System.currentTimeMillis()-s)); }}
当我们运行上面的代码时,会看到下面的输出:
22:07:22.753 [main] INFO com.test.FutureExample - ================= getNow: Result now22:07:27.753 [main] INFO com.test.FutureExample - ================= get: Result 122:07:28.766 [main] ERROR com.test.FutureExample - ================= get with timeoutjava.util.concurrent.TimeoutException: null at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at com.test.FutureExample.main(FutureExample.java:40)22:07:28.767 [main] ERROR com.test.FutureExample - ================= getFuturejava.util.concurrent.ExecutionException: java.lang.RuntimeException: compute error at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at com.test.FutureExample.main(FutureExample.java:50)Caused by: java.lang.RuntimeException: compute error at com.test.FutureExample.lambda$main$2(FutureExample.java:47) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)22:07:28.768 [main] ERROR com.test.FutureExample - ================= joinFuturejava.util.concurrent.CompletionException: java.lang.RuntimeException: compute error at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592) at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)Caused by: java.lang.RuntimeException: compute error at com.test.FutureExample.lambda$main$3(FutureExample.java:57) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) ... 5 common frames omitted22:07:28.768 [main] INFO com.test.FutureExample - costTime: 6108 ms.
CompletableFuture 异步回调方法
CompletableFuture支持链式操作,可以在执行异步操作时指定回调函数,在操作完成后自动执行回调。以下是一些回调方法:
(1)thenRun/thenRunAsync 没有入参,没有返回
(2)thenAccept/thenAcceptAsync 有入参,没有返回
(3)thenApply/thenApplyAsync 有入参,也有返回
通俗点讲就是做完一个任务后,可以通过以上方法再做第二个任务。方法有没有Async结尾的区别在于:
如果你执行第一个任务的时候,传入了一个自定义线程池:
用没有 Async 结尾的方法去执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。用有 Async 结尾的方法去执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池。
示例:
import lombok.extern.slf4j.Slf4j;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.*;@Slf4jpublic class FutureExample { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(5); executor.setThreadNamePrefix("CustomPool-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); long s = System.currentTimeMillis(); //1. thenRun/thenRunAsync 没有没有,没有返回值 CompletableFuture thenRunFuture = CompletableFuture.runAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("runAsync =========== Step 1"); }, executor).thenRun(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("thenRun =========== Step 2"); }).thenRunAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("thenRunAsync =========== Step 3"); }); log.info("-------------------------------------------- thenRunFuture: {}", thenRunFuture.get()); //2. thenAccept/thenAcceptAsync 有入参,没有返回 CompletableFuture thenAcceptFuture = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("supplyAsync =========== Step 1"); return "Result 1"; }, executor).thenAccept(result -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("上一步结果:" + result); log.info("thenAccept =========== Step 2"); }); log.info("-------------------------------------------- thenAcceptFuture: {}", thenAcceptFuture.get()); //3. thenApply/thenApplyAsync 有入参,也有返回 CompletableFuture thenApplyFuture = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("supplyAsync =========== Step 1"); return "Result 1"; }, executor).thenApply(result -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("上一步结果:" + result); log.info("thenAccept =========== Step 2"); return "Result 2"; }); log.info("-------------------------------------------- thenApplyFuture: {}", thenApplyFuture.get()); log.info("Task submitted costTime: {} ms.", (System.currentTimeMillis()-s)); }}
当我们运行上面的代码时,会看到下面的输出:
22:53:32.047 [main] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService22:53:33.147 [CustomPool-1] INFO com.test.FutureExample - runAsync =========== Step 122:53:34.151 [CustomPool-1] INFO com.test.FutureExample - thenRun =========== Step 222:53:35.155 [ForkJoinPool.commonPool-worker-1] INFO com.test.FutureExample - thenRunAsync =========== Step 322:53:35.156 [main] INFO com.test.FutureExample - -------------------------------------------- thenRunFuture: null22:53:36.162 [CustomPool-2] INFO com.test.FutureExample - supplyAsync =========== Step 122:53:37.165 [CustomPool-2] INFO com.test.FutureExample - 上一步结果:Result 122:53:37.165 [CustomPool-2] INFO com.test.FutureExample - thenAccept =========== Step 222:53:37.166 [main] INFO com.test.FutureExample - -------------------------------------------- thenAcceptFuture: null22:53:38.169 [CustomPool-3] INFO com.test.FutureExample - supplyAsync =========== Step 122:53:39.172 [CustomPool-3] INFO com.test.FutureExample - 上一步结果:Result 122:53:39.172 [CustomPool-3] INFO com.test.FutureExample - thenAccept =========== Step 222:53:39.172 [main] INFO com.test.FutureExample - -------------------------------------------- thenApplyFuture: Result 222:53:39.172 [main] INFO com.test.FutureExample - Task submitted costTime: 7109 ms.
从上面结果可以看到:
thenRunAsync用的是ForkJoinPool.commonPool。其它没调Async的都是用CustomPool。另外:
thenRun 是没有入参,没有返回的,即第一个任务执行完成后,不会将第一个任务的执行结果,作为入参,传递到 thenRun 回调方法中,回调方法也是没有返回值的。
thenAccept 是有入参,没有返回,即第一个任务执行完成后,会将第一个任务的执行结果,作为入参,传递到 thenAccept 回调方法中,但是回调方法是没有返回值的。
thenApply 是有入参,也有返回。即第一个任务执行完成后,会将第一个任务的执行结果,作为入参,传递到 thenApply 回调方法中,回调方法也是有返回值的。
(4)whenComplete
CompletableFuture 的任务不论是正常完成还是出现异常它都会调用 whenComplete 这个回调函数。whenComplete 有两个入参 result, throwable。
正常完成时:whenComplete 入参result为上一个任务返回结果,throwable为null。出现异常时:whenComplete 入参result为null,throwable为上一个任务的异常。
示例:
import lombok.extern.slf4j.Slf4j;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.*;@Slf4jpublic class FutureExample { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(5); executor.setThreadNamePrefix("CustomPool-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); long s = System.currentTimeMillis(); //1. 正常完成 CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("supplyAsync =========== Step 1"); return "Result 1"; }, executor).whenComplete((result, throwable) -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("上一步结果:" + result); log.info("throwable:" + throwable); log.info("whenComplete =========== Step 2"); }); log.info("-------------------------------------------- completableFuture1: {}", completableFuture1.get()); //2. 出现异常 CompletableFuture completableFuture2 = CompletableFuture.supplyAsync(() -> { log.info("supplyAsync =========== Step 1"); throw new RuntimeException("compute error"); }, executor).whenComplete((result, throwable) -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("上一步结果:" + result); log.info("throwable:" + throwable); log.info("whenComplete =========== Step 2"); }); log.info("-------------------------------------------- completableFuture2: {}", completableFuture2.get()); log.info("Task submitted costTime: {} ms.", (System.currentTimeMillis()-s)); }}
当我们运行上面的代码时,会看到下面的输出:
23:34:56.631 [main] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService23:34:57.718 [CustomPool-1] INFO com.test.FutureExample - supplyAsync =========== Step 123:34:58.722 [CustomPool-1] INFO com.test.FutureExample - 上一步结果:Result 123:34:58.722 [CustomPool-1] INFO com.test.FutureExample - throwable:null23:34:58.722 [CustomPool-1] INFO com.test.FutureExample - whenComplete =========== Step 223:34:58.723 [main] INFO com.test.FutureExample - -------------------------------------------- completableFuture1: Result 123:34:58.727 [CustomPool-2] INFO com.test.FutureExample - supplyAsync =========== Step 1Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: compute error at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at com.test.FutureExample.main(FutureExample.java:62)Caused by: java.lang.RuntimeException: compute error at com.test.FutureExample.lambda$main$2(FutureExample.java:51) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)23:34:59.731 [CustomPool-2] INFO com.test.FutureExample - 上一步结果:null23:34:59.731 [CustomPool-2] INFO com.test.FutureExample - throwable:java.util.concurrent.CompletionException: java.lang.RuntimeException: compute error23:34:59.731 [CustomPool-2] INFO com.test.FutureExample - whenComplete =========== Step 2
(5)exceptionally
CompletableFuture的任务出现异常时才会调用 exceptionally 这个回调函数。exceptionally 只有一个入参 throwable,但会一个返回值。当出现异常时,exceptionally中会捕获该异常,并给出返回值。
示例:
@Slf4jpublic class FutureExample { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(5); executor.setThreadNamePrefix("CustomPool-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); long s = System.currentTimeMillis(); //1. 正常完成 CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("supplyAsync =========== Step 1"); return "Result 1"; }, executor).whenComplete((result, throwable) -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("上一步结果:" + result); log.info("throwable:" + throwable); log.info("whenComplete =========== Step 2"); }).exceptionally(throwable -> { log.info("throwable:" + throwable); log.info("exceptionally =========== Step 3"); return "result 3"; }); log.info("-------------------------------------------- completableFuture1: {}", completableFuture1.get()); //2. 出现异常 CompletableFuture completableFuture2 = CompletableFuture.supplyAsync(() -> { log.info("supplyAsync =========== Step 1"); throw new RuntimeException("compute error"); }, executor).whenComplete((result, throwable) -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("上一步结果:" + result); log.info("throwable:" + throwable); log.info("whenComplete =========== Step 2"); }).exceptionally(throwable -> { log.info("throwable:" + throwable); log.info("exceptionally =========== Step 3"); return "result 3"; }); log.info("-------------------------------------------- completableFuture2: {}", completableFuture2.get()); log.info("Task submitted costTime: {} ms.", (System.currentTimeMillis()-s)); }}
当我们运行上面的代码时,会看到下面的输出:
23:40:47.474 [main] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService23:40:48.563 [CustomPool-1] INFO com.test.FutureExample - supplyAsync =========== Step 123:40:49.565 [CustomPool-1] INFO com.test.FutureExample - 上一步结果:Result 123:40:49.565 [CustomPool-1] INFO com.test.FutureExample - throwable:null23:40:49.565 [CustomPool-1] INFO com.test.FutureExample - whenComplete =========== Step 223:40:49.565 [main] INFO com.test.FutureExample - -------------------------------------------- completableFuture1: Result 123:40:49.568 [CustomPool-2] INFO com.test.FutureExample - supplyAsync =========== Step 123:40:50.571 [CustomPool-2] INFO com.test.FutureExample - 上一步结果:null23:40:50.571 [CustomPool-2] INFO com.test.FutureExample - throwable:java.util.concurrent.CompletionException: java.lang.RuntimeException: compute error23:40:50.571 [CustomPool-2] INFO com.test.FutureExample - whenComplete =========== Step 223:40:50.572 [CustomPool-2] INFO com.test.FutureExample - throwable:java.util.concurrent.CompletionException: java.lang.RuntimeException: compute error23:40:50.572 [CustomPool-2] INFO com.test.FutureExample - exceptionally =========== Step 323:40:50.572 [main] INFO com.test.FutureExample - -------------------------------------------- completableFuture2: result 323:40:50.572 [main] INFO com.test.FutureExample - Task submitted costTime: 3091 ms.
多任务条件组合
AND组合(等待任务都完成)
(1)runAfterEither 没有入参,没有返回
(2)acceptEither 有入参,没有返回
(3)applyToEither 有入参,也有返回
(4)allOf 只要有一个任务完成就继续
示例:
import lombok.extern.slf4j.Slf4j;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.*;@Slf4jpublic class FutureExample { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(5); executor.setThreadNamePrefix("CustomPool-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); long s = System.currentTimeMillis(); //1. task1 CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("supplyAsync =========== Step 1"); return 1; }, executor); //2. task2 CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("supplyAsync =========== Step 2"); return 2; }, executor); //3. task3 CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("supplyAsync =========== Step 3"); return 3; }, executor); log.info(""); //4. runAfterBoth AND组合(task1、task2) CompletableFuture runAfterBoth = task1.runAfterBoth(task2, () -> { log.info("runAfterBoth =========== Step Both"); }); log.info("-------------------------------------------- AND组合 runAfterBoth Result: {}", runAfterBoth.get()); //5. thenAcceptBoth AND组合(task1、task2) CompletableFuture thenAcceptBoth = task1.thenAcceptBoth(task2, (t1, t2) -> { log.info("task1 Result:{}", t1); log.info("task2 Result:{}", t2); log.info("thenCombine =========== Step Both"); }); log.info("-------------------------------------------- AND组合 thenCombine Result: {}", thenAcceptBoth.get()); //6. thenCombine AND组合(task1、task2) CompletableFuture<Integer> thenCombine = task1.thenCombine(task2, (t1, t2) -> { log.info("task1 Result:{}", t1); log.info("task2 Result:{}", t2); log.info("thenCombine =========== Step Both"); return t1 + t2; }); log.info("-------------------------------------------- AND组合 thenCombine Result: {}", thenCombine.get()); //7. allOf AND组合(task1、task2、task3) CompletableFuture<Void> allOf = CompletableFuture.allOf(task1, task2, task3); log.info("-------------------------------------------- AND组合 allOf Result: {}", allOf.get()); log.info("Task submitted costTime: {} ms.", (System.currentTimeMillis()-s)); }}
当我们运行上面的代码时,会看到下面的输出:
00:42:14.095 [main] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService00:42:14.185 [main] INFO com.test.FutureExample - 00:42:15.188 [CustomPool-2] INFO com.test.FutureExample - supplyAsync =========== Step 200:42:15.188 [CustomPool-1] INFO com.test.FutureExample - supplyAsync =========== Step 100:42:15.188 [CustomPool-1] INFO com.test.FutureExample - runAfterBoth =========== Step Both00:42:15.189 [main] INFO com.test.FutureExample - -------------------------------------------- AND组合 runAfterBoth Result: null00:42:15.192 [main] INFO com.test.FutureExample - task1 Result:100:42:15.192 [main] INFO com.test.FutureExample - task2 Result:200:42:15.192 [main] INFO com.test.FutureExample - thenCombine =========== Step Both00:42:15.193 [main] INFO com.test.FutureExample - -------------------------------------------- AND组合 thenCombine Result: null00:42:15.193 [main] INFO com.test.FutureExample - task1 Result:100:42:15.193 [main] INFO com.test.FutureExample - task2 Result:200:42:15.193 [main] INFO com.test.FutureExample - thenCombine =========== Step Both00:42:15.193 [main] INFO com.test.FutureExample - -------------------------------------------- AND组合 thenCombine Result: 300:42:17.189 [CustomPool-3] INFO com.test.FutureExample - supplyAsync =========== Step 300:42:17.190 [main] INFO com.test.FutureExample - -------------------------------------------- AND组合 allOf Result: null00:42:17.191 [main] INFO com.test.FutureExample - Task submitted costTime: 3090 ms.
OR组合(只要有一个任务完成)
(1)runAfterEither 没有入参,没有返回
(2)acceptEither 有入参,没有返回
(3)applyToEither 有入参,也有返回
(4)anyOf 只要有一个任务完成就继续
示例:
import lombok.extern.slf4j.Slf4j;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.*;@Slf4jpublic class FutureExample { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(5); executor.setThreadNamePrefix("CustomPool-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); long s = System.currentTimeMillis(); //1. task1 CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("supplyAsync =========== Step 1"); return 1; }, executor); //2. task2 CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("supplyAsync =========== Step 2"); return 2; }, executor); //3. task3 CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("supplyAsync =========== Step 3"); return 3; }, executor); log.info(""); //4. runAfterEither OR组合(task1、task2) CompletableFuture runAfterEither = task1.runAfterEither(task2, () -> { log.info("runAfterEither =========== Step Either"); }); log.info("-------------------------------------------- OR组合 runAfterEither Result: {}", runAfterEither.get()); //5. acceptEither OR组合(task1、task2) CompletableFuture acceptEither = task1.acceptEither(task2, (t) -> { log.info("first completed Result:{}", t); log.info("applyToEither =========== Step Either"); }); log.info("-------------------------------------------- OR组合 applyToEither Result: {}", acceptEither.get()); //6. applyToEither OR组合(task1、task2) CompletableFuture<Integer> applyToEither = task1.applyToEither(task2, (t) -> { log.info("first completed Result:{}", t); log.info("applyToEither =========== Step Either"); return t; }); log.info("-------------------------------------------- OR组合 applyToEither Result: {}", applyToEither.get()); //7. anyOf OR组合(task1、task2、task3) CompletableFuture<Object> anyOf = CompletableFuture.anyOf(task1, task2, task3); //只要有一个有任务完成 log.info("-------------------------------------------- OR组合 anyOf Result: {}", anyOf.get()); log.info("Task submitted costTime: {} ms.", (System.currentTimeMillis()-s)); }}
当我们运行上面的代码时,会看到下面的输出:
00:49:56.887 [main] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService00:49:56.971 [main] INFO com.test.FutureExample - 00:49:57.971 [CustomPool-1] INFO com.test.FutureExample - supplyAsync =========== Step 100:49:57.972 [CustomPool-2] INFO com.test.FutureExample - supplyAsync =========== Step 200:49:57.972 [CustomPool-1] INFO com.test.FutureExample - runAfterEither =========== Step Either00:49:57.973 [main] INFO com.test.FutureExample - -------------------------------------------- OR组合 runAfterEither Result: null00:49:57.979 [main] INFO com.test.FutureExample - first completed Result:100:49:57.979 [main] INFO com.test.FutureExample - applyToEither =========== Step Either00:49:57.979 [main] INFO com.test.FutureExample - -------------------------------------------- OR组合 applyToEither Result: null00:49:57.980 [main] INFO com.test.FutureExample - first completed Result:100:49:57.980 [main] INFO com.test.FutureExample - applyToEither =========== Step Either00:49:57.980 [main] INFO com.test.FutureExample - -------------------------------------------- OR组合 applyToEither Result: 100:49:57.980 [main] INFO com.test.FutureExample - -------------------------------------------- OR组合 anyOf Result: 100:49:57.980 [main] INFO com.test.FutureExample - Task submitted costTime: 1086 ms.00:49:59.976 [CustomPool-3] INFO com.test.FutureExample - supplyAsync =========== Step 3
四、CompletableFuture使用注意点
CompletableFuture 使我们的异步编程更加便利的、代码更加优雅。但由于CompletableFuture中使用了默认的ForkJoin线程池,处理的线程个数是电脑CPU核数-1。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢。所以建议使用自定义线程池,优化线程池配置参数,并使用合适的线程池拒绝策略。另外,CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,最好添加超时时间。
标签: #java多线程异步执行