龙空技术网

Java8已经发布7年了,不会还有人没用过CompletableFuture吧

爪哇驿站 3464

前言:

此刻同学们对“java8java7”大致比较看重,姐妹们都想要分析一些“java8java7”的相关知识。那么小编在网络上汇集了一些对于“java8java7””的相关内容,希望姐妹们能喜欢,朋友们快快来了解一下吧!

前言

CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。

Future有哪些缺陷?

多线程的场景下,我们要监听每个线程异步执行的结果,如果用Future去实现,代码如下:

/** * Future异步示例 * * @author liudong * @date 2021/12/9 15:37 */public class FutureDemo {    public static void main(String[] args) throws Exception {        // 1. 创建线程池        ExecutorService executorService = Executors.newCachedThreadPool();        // 2. 提交任务        List<Callable<String>> tasks = new ArrayList<>();        ArrayList<Integer> taskIds = Lists.newArrayList(1, 2, 3);        taskIds.forEach(taskId -> {            Callable<String> task = () -> {                Thread.sleep(1000);                return "任务" + taskId + "执行完成!";            };            tasks.add(task);        });        List<Future<String>> futures = executorService.invokeAll(tasks);        // 3. 获取任务执行结果        for (Future<String> future : futures) {            String result = future.get();            System.out.println(result);        }        executorService.shutdown();    }}

执行结果

任务1执行完毕!任务2执行完毕!任务3执行完毕!

大家有没有思考过这样使用有没有什么问题?笔者认为有如下几个缺陷:

(1) 不能回调会阻塞(2) 批量任务处理彼此依赖会阻塞(3) 不能多个任务级联执行,将结果依次往下传递(4) 得不到最先完成的任务CompletableFuture能解决吗?

针对如上几个问题,看看CompletableFuture是怎么解决的。

使用CompletableFuture提供的supplyAsync和whenCompleteAsync两个方法优化以上代码,如下:

/** * CompletableFuture异步示例 * * @author liudong * @date 2021/12/9 15:37 */public class CompletableFutureDemo {    public static void main(String[] args) throws Exception {        // 1. 创建线程池        ExecutorService executorService = Executors.newCachedThreadPool();        // 2. 提交任务        List<Callable<String>> tasks = new ArrayList<>();        ArrayList<Integer> taskIds = Lists.newArrayList(1, 2, 3);        // 3. 回调任务执行结果        List<CompletableFuture<String>> completableFutures = new ArrayList<>();        CompletableFuture[] cfs = taskIds.stream().map((taskId) -> {            CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {                try {                    Thread.sleep(1000L);                } catch (InterruptedException e) {                }                return "任务" + taskId + "执行完成!";            }, executorService);            // 异步返回执行结果            completableFuture.whenCompleteAsync((result, exception) -> {                System.out.println(result);            });            // 将处理结果传递到子任务            completableFuture.thenAccept((result) -> {                System.out.println("上级任务处理结果:" + result);            });            return completableFuture;        }).toArray(CompletableFuture[]::new);                // 获取最先执行完的任务        CompletableFuture<Object> firstEnd = CompletableFuture.anyOf(cfs);        System.out.println("最先执行完的任务:" + firstEnd.get());        executorService.shutdown();    }}

执行结果:

上级任务处理结果:任务1执行完成!上级任务处理结果:任务3执行完成!最先执行完的任务:任务1执行完成!上级任务处理结果:任务2执行完成!任务3执行完成!任务2执行完成!任务1执行完成!

以上可以看出,执行结果是异步打印,不会阻塞,也不会顺序依赖,能获取上级任务执行结果,并能够获取到最先执行完的任务。

扩展知识点:

(1) 创建异步操作:runAsync:不支持返回值supplyAsync:支持返回值(2) 计算结果完成时的回调方法:whenComplete:执行完当前任务的线程,继续执行 whenComplete 的任务。whenCompleteAsync:执行完当前任务的线程,把whenCompleteAsync 的任务继续提交给线程池来执行。exceptionally:当前任务出现异常时,执行exceptionally中的回调方法。(3) 线程串行化:thenApply:当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。thenAccept 消费处理结果,接收任务的处理结果,并消费处理,无返回结果。thenRun:跟 thenAccept 方法不一样的是,不关心任务的处理结果。只要上面的任务执行完成,就开始执行 thenAccept 。handle:执行任务完成时,handle可以对结果进行处理。handle 方法和 thenApply 方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。(4) 合并任务thenCombine:用于合并任务,thenCombine 会把 两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。thenCompose:thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。

标签: #java8java7