龙空技术网

怎样用Java 8优雅的开发业务

switchvov 2917

前言:

如今各位老铁们对“java流式编程与网络程序设计心得体会”大致比较关怀,大家都想要分析一些“java流式编程与网络程序设计心得体会”的相关资讯。那么小编在网上收集了一些有关“java流式编程与网络程序设计心得体会””的相关文章,希望看官们能喜欢,姐妹们快快来了解一下吧!

怎样用Java 8优雅的开发业务

[TOC]

函数式编程

匿名函数

λ演算

流式编程基本原理

在Java中流式编程的基本原理有两点。

构建流数据流转(流水线)规约

IntStream.rangeClosed(1, 100) // 1. 构建流    .mapToObj(String::valueOf)// 2. 数据流转(流水线)    .collect(joining());      // 3. 规约
案例英雄的主位置一共有几类,分别是什么
@Testfun t1() {    // 英雄的主位置一共有几类,分别是什么    // 映射    val roleMains = heroes.map(Hero::getRoleMain)        // 过滤为空的数据        .filter(Objects::nonNull)        // 去重        .distinct()    println(roleMains.size)    println(roleMains)}
@Testpublic void t1() {    // 英雄的主位置一共有几类,分别是什么    List<String> roleMains = heroes.stream()            // 映射            .map(Hero::getRoleMain)            // 过滤为空的数据            .filter(Objects::nonNull)            // 去重            .distinct()            // 收集列表            .collect(toList());    System.out.println(roleMains.size());    System.out.println(roleMains);}
英雄按主次位置分组后,输出每个分组有多少英雄,其中:近战英雄有多少位,远程英雄有多少位
@Testfun t2() {    // 英雄按主次位置分组后,输出每个分组有多少英雄,其中:近战英雄有多少位,远程英雄有多少位    // 主次位置分组的英雄数量    val groupHeroCount = heroes.groupingBy {        Pair.of(it.roleMain, it.roleAssist)    }.eachCount()    // 主次分组后,再按攻击范围分组的英雄数量    val groupThenGroupCount = heroes.groupBy {        Pair.of(it.roleMain, it.roleAssist)    }.map {        val value = it.value.groupingBy(Hero::getAttackRange).eachCount()        Pair.of(it.key, value)    }.associateBy({ it.left }, { it.value })    // 遍历输出    groupThenGroupCount.forEach { (groupKey, groupValue) ->        val groupingCount = groupHeroCount[groupKey]        print("英雄分组key为:$groupKey;英雄数量:$groupingCount;")        groupValue.forEach { (countKey, countValue) ->            print("英雄攻击范围:$countKey;英雄数量:$countValue;")        }        println()    }}
@Testpublic void t2() {    // 英雄按主次位置分组后,输出每个分组有多少英雄,其中:近战英雄有多少位,远程英雄有多少位    // 主次位置分组的英雄数量    Map<Pair<String, String>, Long> groupHeroCount = heroes.stream()            .collect(groupingBy(hero -> Pair.of(hero.getRoleMain(), hero.getRoleAssist()), counting()));    // 主次分组后,再按攻击范围分组的英雄数量    Map<Pair<String, String>, Map<String, Long>> groupThenGroupCount = heroes.stream()            .collect(groupingBy(hero -> Pair.of(hero.getRoleMain(), hero.getRoleAssist()),                    groupingBy(Hero::getAttackRange, counting())));    // 遍历输出    groupThenGroupCount.forEach((groupKey, groupValue) -> {        Long groupingCount = groupHeroCount.get(groupKey);        System.out.print("英雄分组key为:" + groupKey + ";英雄数量:" + groupingCount + ";");        groupValue.forEach((countKey, countValue) -> System.out.print("英雄攻击范围:" + countKey + ";英雄数量:" + countValue + ";"));        System.out.println();    });}
求近战英雄HP初始值的加总
@Testfun t3() {    // 求近战英雄HP初始值的加总    val sum = heroes.filter { "近战" == it.attackRange }        .map(Hero::getHpStart)        .filter(Objects::nonNull)        .reduce(BigDecimal::add)    println("近战英雄HP初始值的加总为:$sum")}
@Testpublic void t3() {    // 求近战英雄HP初始值的加总    BigDecimal sum = heroes.stream()            .filter(hero -> "近战".equals(hero.getAttackRange()))            .map(Hero::getHpStart)            .filter(Objects::nonNull)            .reduce(BigDecimal.ZERO, BigDecimal::add);    System.out.println("近战英雄HP初始值的加总为:" + sum);}
通过最小列表收集器获取最小列表
@Testpublic void t4() {    // 通过最小列表收集器获取最小列表    List<BigDecimal> minAttackGrowth = heroes.stream()            .map(Hero::getAttackGrowth)            .collect(new MinListCollector<>());    System.out.println(minAttackGrowth);    List<Hero> minHero = heroes.stream()            .collect(new MinListCollector<>());    System.out.println(minHero);}
import java.util.*;import java.util.concurrent.atomic.AtomicReference;import java.util.function.BiConsumer;import java.util.function.BinaryOperator;import java.util.function.Function;import java.util.function.Supplier;import java.util.stream.Collector;import java.util.stream.Collectors;import static java.util.stream.Collector.Characteristics.*;/** * 最小列表收集器 * * @author switch * @since 2020/8/18 */public class MinListCollector<T extends Comparable<? super T>> implements Collector<T, List<T>, List<T>> {    /**     * 收集器的特性     *     * @see Characteristics     */    private final static Set<Characteristics> CHARACTERISTICS = Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH));    private final static int ZERO = 0;    /**     * 最小值     */    private final AtomicReference<T> min = new AtomicReference<>();    @Override    public Supplier<List<T>> supplier() {        // supplier参数用于生成结果容器,容器类型为A        return ArrayList::new;    }    @Override    public BiConsumer<List<T>, T> accumulator() {        // accumulator用于消费元素,也就是归纳元素,这里的T就是元素,它会将流中的元素一个一个与结果容器A发生操作        return (list, element) -> {            // 获取最小值            T minValue = min.get();            if (Objects.isNull(minValue)) {                // 第一次比较                list.add(element);                min.set(element);            } else if (element.compareTo(minValue) < ZERO) {                // 发现更小的值                list.clear();                list.add(element);                min.compareAndSet(minValue, element);            } else if (element.compareTo(minValue) == ZERO) {                // 与最小值相等                list.add(element);            }        };    }    @Override    public BinaryOperator<List<T>> combiner() {        // combiner用于两个两个合并并行执行的线程的执行结果,将其合并为一个最终结果A        return (left, right) -> {            // 最小值列表合并            List<T> leftList = getMinList(left);            List<T> rightList = getMinList(right);            leftList.addAll(rightList);            return leftList;        };    }    private List<T> getMinList(List<T> list) {        return list.stream()                .filter(element -> element.compareTo(min.get()) == ZERO)                .collect(Collectors.toList());    }    @Override    public Function<List<T>, List<T>> finisher() {        // finisher用于将之前整合完的结果R转换成为A        return Function.identity();    }    @Override    public Set<Characteristics> characteristics() {        // characteristics表示当前Collector的特征值,这是个不可变Set        return CHARACTERISTICS;    }}
优雅的空处理

import org.junit.Test;import java.util.Optional;/** * @author switch * @since 2020/8/18 */public class OptionalTests {    @Test    public void t1() {        // orElse        System.out.println(Optional.ofNullable(null).orElse("张三"));        System.out.println(Optional.ofNullable(null).orElseGet(() -> "李四"));        System.out.println(Optional.ofNullable("王五").orElseThrow(NullPointerException::new));    }    @Test    public void t2() {        // isPresent        Optional<String> name = Optional.ofNullable("张三");        if (name.isPresent()) {            System.out.println(name.get());        }    }    @Test    public void t3() {        // map        Optional<Integer> number = Optional.of("123456").map(Integer::valueOf);        if (number.isPresent()) {            System.out.println(number.get());        }    }    @Test    public void t4() {        // flatMap        Optional<Integer> number = Optional.of("123456").flatMap(s -> Optional.of(Integer.valueOf(s)));        if (number.isPresent()) {            System.out.println(number.get());        }    }    @Test    public void t5() {        // 过滤        String number = "123456";        String filterNumber = Optional.of(number).filter(s -> !s.equals(number)).orElse("654321");        System.out.println(filterNumber);    }}
新的并发工具类CompletableFuture

单机批处理多线程执行模型

该模型适用于百万级量级的任务。超过千万数据,可以考虑分组,多机器并行执行。基本流程:

从数据库获取Id列表拆分成n个子Id列表通过子Id列表获取关联数据(注意:都需要提供批量查询接口)映射到需要处理的Model(提交到CompletableFuture)->处理数据->收集成list)(java 8流式处理)收集的list进行join操作收集list模型

模型原理:Stream+CompletableFuture+lambda

简要解释:

CompletableFuture是java8提供的一个工具类,主要是用于异步处理流程编排的。Stream是java8提供的一个集合流式处理工具类,主要用于数据的流水线处理。lambda在java中是基于内部匿名类实现的,可以大幅减少重复代码。总结:在该模型中Stream用于集合流水线处理、CompletableFuture解决异步编排问题(非阻塞)、lambda简化代码。数据流动

List<List<String>> -> Stream<List<String>> -> Stream<List<Model>> -> Stream<CompletableFuture<List<Model>>> -> Stream<CompletableFuture<List<映射类型>>> -> List<CompletableFuture<Void>>
案例ThreadPoolUtil
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

public final class ThreadPoolUtil { public static ThreadPoolTaskExecutor getDefaultExecutor(Integer poolSize, Integer maxPoolSize, Integer queueCapacity) { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setAllowCoreThreadTimeOut(true); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setCorePoolSize(poolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; }}

- `ThreadPoolConfig```` javaimport org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;@Configurationpublic class ThreadPoolConfig {    /**     * 计算规则:N(thread) = N(cpu) * U(cpu) * (1 + w/c)     * N(thread):线程池大小     * N(cpu):处理器核数     * U(cpu):期望CPU利用率(该值应该介于0和1之间)     * w/c:是等待时间与计算时间的比率,比如说IO操作即为等待时间,计算处理即为计算时间     */    private static final Integer TASK_POOL_SIZE = 50;    private static final Integer TASK_MAX_POOL_SIZE = 100;    private static final Integer TASK_QUEUE_CAPACITY = 1000;    @Bean("taskExecutor")    public ThreadPoolTaskExecutor taskExecutor() {        return ThreadPoolUtil.getDefaultExecutor(TASK_POOL_SIZE, TASK_MAX_POOL_SIZE, TASK_QUEUE_CAPACITY);    }}

#getFuturesStream

public Stream<CompletableFuture<List<Model>>> getFuturesStream(List<List<String>> idSubLists) {  return idSubLists.stream()      .map(ids ->           CompletableFuture.supplyAsync(() -> modelService.listByIds(ids), taskExecutor)      );}

#standardisation

public void standardisation() {  List<CompletableFuture<Void>> batchFutures = getFuturesStream(idSubLists)          .map(future -> future.thenApply(this::listByNormalize))          .map(future -> future.thenAccept(modelService::batchUpdateData))          .collect(Collectors.toList());  List<Void> results = batchFutures.stream()          .map(CompletableFuture::join)          .collect(Collectors.toList());}
调整线程池的大小

《Java并发编程实战》一书中,Brian Goetz和合著者们为线程池大小的优化提供了不少中肯的建议。这非常重要,如果线程池中线程的数量过多,最终它们会竞争稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。反之,如果线程的数目过少,正如你的应用所面临的情况,处理器的一些核可能就无法充分利用。Brian Goetz建议,线程池大小与处理器的利用率之比可以使用下面的公式进行估算:$$N_{threads} = N_{CPU} * U_{CPU} * (1 + \frac{W}{C})$$

其中:

$N_{CPU}$是处理器的核的数目,可以通过Runtime.getRuntime().availableProcessors()得到$U_{CPU}$是期望的CPU利用率(该值应该介于0和1之间)$\frac{W}{C}$是等待时间与计算时间的比率,比如说IO操作即为等待时间,计算处理即为计算时间并行——使用流还是CompletableFutures?

对集合进行并行计算有两种方式:要么将其转化为并行流,利用map这样的操作开展工作,要么枚举出集合中的每一个元素,创建新的线程,在CompletableFuture内对其进行操作。后者提供了更多的灵活性,可以调整线程池的大小,而这能帮助确保整体的计算不会因为线程都在等待I/O而发生阻塞。

使用这些API的建议如下:

如果进行的是计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要创建比处理器核数更多的线程)。反之,如果并行的工作单元还涉及等待I/O的操作(包括网络连接等待),那么使用CompletableFuture灵活性更好,可以依据等待/计算,或者$\frac{W}{C}$的比率设定需要使用的线程数。这种情况不使用并行流的另一个原因是,处理流的流水线中如果发生I/O等待,流的延迟特性很难判断到底什么时候触发了等待。日期和时间API

使用指南:(密码:gtag) 《时区工具类使用指南》

项目地址

GitHub:java8-fluent

参考Java 8 实战学习笔记Java 8 函数式编程学习笔记深入理解Java函数式编程和Streams API

分享并记录所学所见

标签: #java流式编程与网络程序设计心得体会