龙空技术网

Java 8 Stream 教程

IT技术院 77

前言:

目前姐妹们对“javatutorial”大体比较重视,朋友们都需要了解一些“javatutorial”的相关文章。那么小编也在网上收集了一些有关“javatutorial””的相关资讯,希望大家能喜欢,看官们一起来学习一下吧!

本文采用实例驱动的方式,对JAVA8的stream API进行一个深入的介绍。虽然JAVA8中的stream API与JAVA I/O中的InputStream和OutputStream在名字上比较类似,但是其实是另外一个东西,Stream API是JAVA函数式编程中的一个重要组成部分。

本文描述如何使用JAVA8的Stream API。通过本文,你可以了解Stream API的执行顺序,不同的执行顺序会对stream api的执行效率有较大的影响。本文会详细描述Stream API中的reduce,collect,flatMap等操作,结尾部分会深入讲解parallel streams。

如果你对JAVA8中新增的概念:lambda表达式,函数式接口,方法引用不熟悉。可以从:Java 8 Tutorial一文中获取相关的知识。

Streams如何工作?###

stream是一个可以对个序列中的元素执行各种计算操作的一个元素序列。

List<String> myList = Arrays.asList("a1", "a2", "b1", "c2", "c1");myList .stream() .filter(s -> s.startsWith("c")) .map(String::toUpperCase) .sorted() .forEach(System.out::println);// C1// C2

stream包含中间(intermediate operations)和最终(terminal operation)两种形式的操作。中间操作(intermediate operations)的返回值还是一个stream,因此可以通过链式调用将中间操作(intermediate operations)串联起来。最终操作(terminal operation)只能返回void或者一个非stream的结果。在上述例子中:filter, map ,sorted是中间操作,而forEach是一个最终操作。更多关于stream的中可用的操作可以查看java doc。上面例子中的链式调用也被称为操作管道流。

大多stream操作接受某种形式的lambda表达式作为参数,通过方法接口的形式指定操作的具体行为,这些方法接口的行为基本上都是无干扰(non-interfering)和无状态(stateless)。无干扰(non-interfering)的方法的定义是:该方法不修改stream的底层数据源,比如上述例子中:没有lambda表达式添加或者删除myList中的元素。无状态(stateless)方法的定义:操作的执行是独立的,比如上述例子中,没有lambda表达式在执行中依赖可能发生变化的外部变量或状态。

streams分类###

可以从不同的数据源创建stream。java collection包中的Collections,Lists,Sets这些类中新增stream()和parallelStream()方法,通过这些方法可以创建一个顺序stream(sequential streams)或者一个并发的stream(Parallel streams)。并发stream(Parallel streams)更适合在多线程中使用,本文先介绍顺序流(sequential streams)在结尾会描述并发stream(Parallel streams),

Arrays.asList("a1", "a2", "a3") .stream() .findFirst() .ifPresent(System.out::println); // a1

List对象上调用stream()方法可以返回一个常规的对象流。在下面的例子中我们不需要创建一个collection对象也可以使用stream:

Stream.of("a1", "a2", "a3") .findFirst() .ifPresent(System.out::println); // a1

直接使用Stream.of()方法就能从一组对象创建一个stream对象,

除了常规的对象流,JAVA 8中的IntStream,LongStream,DoubleStream这些流能够处理基本数据类型如:int,long,double。比如:IntStream可以使用range()方法能够替换掉传统的for循环

IntStream.range(1, 4) .forEach(System.out::println);// 1// 2// 3

基本类型流(primitive streams)使用方式与常规对象流类型(regular object streams)大部分相同,但是基本类型流(primitive streams)能使用一些特殊的lambda表达式,比如:用IntFunction代替Function,用IntPredicate代替Predicate,同时基本类型流(primitive streams)中可以支持一些聚合方法,如:sum(),average()等。

Arrays.stream(new int[] {1, 2, 3}) .map(n -> 2 * n + 1) .average() .ifPresent(System.out::println); // 5.0

可以通过常规对象流(regular object stream)的mapToInt(), mapToLong(),mapToDouble(),基本类型对象流(primitive streams)中的mapToObj()等方法完成常规对象流和基本类型流之间的相互转换

IntStream.range(1, 4) .mapToObj(i -> "a" + i) .forEach(System.out::println);

下面这个例子中doubles stream先被映射成int stream,然后又被映射成String类型的对象流:

Stream.of(1.0, 2.0, 3.0) .mapToInt(Double::intValue) .mapToObj(i -> "a" + i) .forEach(System.out::println);// a1// a2// a3

处理顺序###

前面描述了如何创建和使用各种stream,现在开始深入了解stream执行引擎的工作原理。

Laziness(延迟加载)是中间操作(intermediate operations)的一个重要特性。如下面这个例子:中间操作(terminal operation)缺失,当执行这个代码片段的时候,并不会在控制台打印相应的内容,这是因为只有最终操作(terminal operation)存在的时候,中间操作(intermediate operations)才会执行。

Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return true; });

给上面的例子添加最终操作(terminal operation)forEach:

Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return true; }) .forEach(s -> System.out.println("forEach: " + s));

执行结果如下:

filter: d2forEach: d2filter: a2forEach: a2filter: b1forEach: b1filter: b3forEach: b3filter: cforEach: c

执行结果比较让人惊奇,想当然的做法是水平执行此流上的所有元素。但是实际上是每一个元素沿着链垂直移动,第一个字符串"d2"执行完filter和forEach后第二个元素"a2"才开始执行。

这种沿着链垂直移动的行为可以降低每一个元素上进行操作的数量,如我们在下面的例子中所示:

Stream.of("d2", "a2", "b1", "b3", "c") .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .anyMatch(s -> {最终操作 System.out.println("anyMatch: " + s); return s.startsWith("A"); });// map: d2// anyMatch: D2// map: a2// anyMatch: A2

当对给定元素执行判断为真时anyMatch操作会立刻返回true,在上面例子中执行到元素“A2”的时候,元素判断为真anyMatch立刻返回true,由于流是沿着链垂直移动的,因此上面的map操作只会执行两次。

注:stream的执行流程类似shell中管道:ps xxx | grep "sss" | grep "ccc",是按照输入行的形式进行处理。

执行效率与steream执行链顺序的关系###

下面的例子由两个中间操作(intermediate operations)map和filter以及一个最终操作(terminal operation)forEach构成,我们观察这些动作是如何执行的。

Stream.of("d2", "a2", "b1", "b3", "c")

.map(s -> {

System.out.println("map: " + s);

return s.toUpperCase();

})

.filter(s -> {

System.out.println("filter: " + s);

return s.startsWith("A");

})

.forEach(s -> System.out.println("forEach: " + s));

// map: d2

// filter: D2

// map: a2

// filter: A2

// forEach: A2

// map: b1

// filter: B1

// map: b3

// filter: B3

// map: c

// filter: C

你可能已经猜想到:map和filter操作被执行了5次,但是forEach操作只被执行了1次。我们可以通过修改操作的执行顺序(如:将filter操作移到操作链的头部),大幅度降低执行次数

Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s));// filter: d2// filter: a2// map: a2// forEach: A2// filter: b1中间操作// filter: b3// filter: c

修改后map只被执行了1次,如果此时数据量比较大则操作管道的执行效率会有较大的提升,在处理复杂方法链的时候需要注意执行顺序对执行效率的影响。

给上面的例子添加sort操作。

Stream.of("d2", "a2", "b1", "b3", "c") .sorted((s1, s2) -> { System.out.printf("sort: %s; %s\n", s1, s2); return s1.compareTo(s2); }) .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s));

执行结果如下:

sort: a2; d2sort: b1; a2sort: b1; d2sort: b1; a2sort: b3; b1sort: b3; d2sort: c; b3sort: c; d2filter: a2map: a2forEach: A2filter: b1filter: b3filter: cfilter: d2

Sorting 是一种特殊的中间操作(intermediate operation),在对集合中元素进行排序过程中需要保存元素的状态,因此Sorting 是一种有状态的操作(stateful operation)。

首先,在整个输入集上执行排序操作(即先对集合进行水平操作),由于输入集合中的元素间存在多种组合,因此上面的例子中sorted操作被执行了8次。

可以通过对执行链重排序的方式,提升stream的执行效率。修改执行链顺序之后由于filter操作的过滤,导致sorted操作的输入集只有一个元素,在大数据量的情况下能够大幅度提高执行效率。

Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> { System.out.println("filter: " + s); return s.startsWith("a"); }) .sorted((s1, s2) -> { System.out.printf("sort: %s; %s\n", s1, s2); return s1.compareTo(s2); }) .map(s -> { System.out.println("map: " + s); return s.toUpperCase(); }) .forEach(s -> System.out.println("forEach: " + s));// filter: d2// filter: a2// filter: b1// filter: b3// filter: c// map: a2// forEach: A2

流复用###

Java 8 streams不能被复用,当你执行完任何一个最终操作(terminal operation)的时候流就被关闭了。

Stream<String> stream = Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a"));stream.anyMatch(s -> true); // okstream.noneMatch(s -> true); // exception

在同一个stream中执行完anyMatch后再执行noneMatch就会抛出如下异常:

java.lang.IllegalStateException: stream has already been operated upon or closed at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229) at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459) at com.winterbe.java8.Streams5.test7(Streams5.java:38) at com.winterbe.java8.Streams5.main(Streams5.java:28)

可以通过为每个最终操作(terminal operation)创建一个新的stream链的方式来解决上面的重用问题,Stream api中已经提供了一个stream supplier类来在已经存在的中间操作(intermediate operations )的stream基础上构建一个新的stream。

Supplier<Stream<String>> streamSupplier = () -> Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a"));streamSupplier.get().anyMatch(s -> true); // okstreamSupplier.get().noneMatch(s -> true); // ok

streamSupplier的每个get()方法会构造一个新的stream,我们可以在这个stream上执行期望的最终操作(terminal operation)。

高级操作###

Streams支持多种不同的操作(operations),我们已经了解过filter,map等比较重要的操作。你可以通过Stream Javadoc进一步了解更多的操作。现在我们开始深入探讨更复杂的操作:collect flatMap reduce。

假设存在如下的用户列表:

class Person { String name; int age; Person(String name, int age) { this.name = name; this.age = age; } @Override public String toString() { return name; }}List<Person> persons = Arrays.asList( new Person("Max", 18), new Person("Peter", 23), new Person("Pamela", 23), new Person("David", 12));

Collect(收集)###

Collect(收集)是一种是十分有用的最终操作,它可以把stream中的元素转换成另外一种形式,比如;list,set,map。Collect使用Collector作为参数,Collector包含四种不同的操作:supplier(初始构造器), accumulator(累加器), combiner(组合器), finisher(终结者)。这听起来很复杂,但是一个好消息是java 8通过Collectors类内置了各种复杂的收集操作,因此对于大部分常用的操作来说,你不需要自己去实现collector类。

从一个十分常见的用类开始:

List<Person> filtered = persons .stream() .filter(p -> p.name.startsWith("P")) .collect(Collectors.toList());System.out.println(filtered); // [Peter, Pamela]

通过上面的demo可以看出,将stream转换为List十分简单,如果想转换为Set的话,只需使用Collectors.toSet()就可以了。

下面的例子暂时将用户按年龄分组:

Map<Integer, List<Person>> personsByAge = persons .stream() .collect(Collectors.groupingBy(p -> p.age));personsByAge .forEach((age, p) -> System.out.format("age %s: %s\n", age, p));// age 18: [Max]// age 23: [Peter, Pamela]// age 12: [David]

Collectors类功能繁多,你可以通过Collectors对stream中的元素进行汇聚,比如:计算所有用户的年纪。

Double averageAge = persons .stream() .collect(Collectors.averagingInt(p -> p.age));System.out.println(averageAge); // 19.0

可以通过summarizing collectors能返回一个内置的统计对象,通过这个对象能够获取更加全面的统计信息,比如用户年纪中的最大值,最小值,平均年纪等结果。

IntSummaryStatistics ageSummary = persons .stream() .collect(Collectors.summarizingInt(p -> p.age));System.out.println(ageSummary);// IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}

下面的例子展示如何将所有用户连接成一个字符串:

String phrase = persons .stream() .filter(p -> p.age >= 18) .map(p -> p.name) .collect(Collectors.joining(" and ", "In Germany ", " are of legal age."));System.out.println(phrase);// In Germany Max and Peter and Pamela are of legal age.

join collector的三个参数分别表示:连接符,字符串前缀,字符串后缀(可选)。

将一个stream转换为map,我们必须指定map的key和value如何映射。要注意的是key的值必须是唯一性的,否则会抛出IllegalStateException,但是可以通过使用合并函数(可选)绕过这个IllegalStateException异常:

Map<Integer, String> map = persons .stream() .collect(Collectors.toMap( p -> p.age, p -> p.name, (name1, name2) -> name1 + ";" + name2));System.out.println(map);// {18=Max, 23=Peter;Pamela, 12=David}

前文已经介绍了jdk内置的一些很有用的collectors,接下来开始介绍如何构造我们自己所需的collector,我们的目标是将stream中所有用户的用户名变成大写并用"|"符号连接成一个字符串。为了达成这个目标我们通过Collector.of()方法创建了一个新的collector,我们必须给这个collector提供四种功能:supplier, accumulator, combiner,finisher.

Collector<Person, StringJoiner, String> personNameCollector = Collector.of( () -> new StringJoiner(" | "), // supplier (j, p) -> j.add(p.name.toUpperCase()), // accumulator (j1, j2) -> j1.merge(j2), // combiner StringJoiner::toString); // finisherString names = persons .stream() .collect(personNameCollector);System.out.println(names); // MAX | PETER | PAMELA | DAVID

由于JAVA中String是一个不可变对象,因此我们需要一个辅助类(比如StringJoiner)来帮助collect构造我们的字符串。supplier创建了一个包含适当分隔符的StringJoiner对象,accumulator用来将每个用户名转为大写并添加到supplier创建的StringJoiner中,combiner将两个StringJoiners对象连接成一个,最后一步的finisher从StringJoiner中构建出所希望的得到的string对象。

FlatMap###

我们已经了解:通过map方法可以将stream中的一种对象转换成另外一种对象。但是map方法还是有使用场景限制,只能将一种对象映射为另外一种特定的已经存在的对象。是否能够将一个对象映射为多种对象,或者映射成一个根本不存在的对象呢。这就是flatMap方法出现的目的。

FlatMap方法可以将一个stream中的每一个元素对象转换为另一个stream中的另一种元素对象,因此可以将stream中的每个对象改造成零,一个或多个。flatMap操作的返回流包含这些改造后的对象。

为了演示flatMap,定义一个继承关系如下:

class Foo { String name; List<Bar> bars = new ArrayList<>(); Foo(String name) { this.name = name; }}class Bar { String name; Bar(String name) { this.name = name; }}

通过流实例化一队对象:

List<Foo> foos = new ArrayList<>();// create foosIntStream .range(1, 4) .forEach(i -> foos.add(new Foo("Foo" + i)));// create barsfoos.forEach(f -> IntStream .range(1, 4) .forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));

完成上述操作之后我们得到三个foos,每个foos包含三个bars。

FlatMap接收一个返回值为stream的函数做参数,通过传递合适的函数,就可以解析每一个foo下对应的bar对象

foos.stream() .flatMap(f -> f.bars.stream()) .forEach(b -> System.out.println(b.name));// Bar1 <- Foo1// Bar2 <- Foo1// Bar3 <- Foo1// Bar1 <- Foo2// Bar2 <- Foo2// Bar3 <- Foo2// Bar1 <- Foo3// Bar2 <- Foo3// Bar3 <- Foo3

正如所见,我们成功地将三个对象的stream转换成一个包含九个对象的stream

最后,上面的示例代码可以简化为一个单一管道流:

IntStream.range(1, 4) .mapToObj(i -> new Foo("Foo" + i)) .peek(f -> IntStream.range(1, 4) .mapToObj(i -> new Bar("Bar" + i + " <- " f.name)) .forEach(f.bars::add)) .flatMap(f -> f.bars.stream()) .forEach(b -> System.out.println(b.name));

FlatMap也支持JAVA8中新引入的Optional类,Optionals flatMap能返回一个另外的类的optional包装类,可以用来减少对null的检查。

假设有如下这种多层级结构:

class Outer { Nested nested;}class Nested { Inner inner;}class Inner { String foo;}

为了获取内部outer实例的内部foo对象,需要添加一系列空指针判断

Outer outer = new Outer();if (outer != null && outer.nested != null && outer.nested.inner != null) { System.out.println(outer.nested.inner.foo);}

可以采用optionals flatMap 操作获得相同的结果:

Optional.of(new Outer()) .flatMap(o -> Optional.ofNullable(o.nested)) .flatMap(n -> Optional.ofNullable(n.inner)) .flatMap(i -> Optional.ofNullable(i.foo)) .ifPresent(System.out::println);

上面的例子中flatMap的每次调用都会返回一个用Optional对象,如果有返回值则这个Optional对象是这个返回值的包装类,如果返回值不存在则返回null。

Reduce(减少)###

reduce操作可以将stream中所有元素组合起来得到一个元素,JAVA8支持三中不同的reduce方法。

第一种能从stream元素序列中提取一个特定的元素。比如下面的从用户列表中选择年纪最大的用户操作:

persons

.stream()

.reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)

.ifPresent(System.out::println); // Pamela

上面的实例中reduce方法接收一个二元累加计算函数(BinaryOperator accumulator function)作为参数,二元操作(BinaryOperator)实际就是上在两个操作数共享同一类型。示例中函数比较两人年龄,返回的最大年龄的人。

第二种reduce操作接收一个标识值和一个二元操作累加器作为参数,这个reduce方法可以把stream中所有用户的名字和年龄汇总得到一个新用户。

Person result = persons .stream() .reduce(new Person("", 0), (p1, p2) -> { p1.age += p2.age; p1.name += p2.name; return p1; });System.out.format("name=%s; age=%s", result.name, result.age);// name=MaxPeterPamelaDavid; age=76

第三种reduce方法,接收三个参数:一个标示值(identity value),一个二元操作累加器(BiFunction accumulator),一个二元组合方法。由于标识符参数未被严格限制为person类型,因此我们可以用这个reduce方法来获取用户的总年龄。

Integer ageSum = persons .stream() .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);System.out.println(ageSum); // 76

计算的结果是76,通过添加调试输出,我们可以详细地了解执行引擎中发生了什么。

Integer ageSum = persons .stream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s\n", sum, p); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2); return sum1 + sum2; });// accumulator: sum=0; person=Max// accumulator: sum=18; person=Peter// accumulator: sum=41; person=Pamela// accumulator: sum=64; person=David

从调试输出中可以看到,累加器做了所有的工作,它首先获取值为0的标示值和第一个用户Max,接下来的三步中持续sum值由于累加不断变大,在最后一步汇总的年纪增长到76。

注意,上面的调试输出中combiner没有执行,通过parallel执行上面相同stream。

Integer ageSum = persons .parallelStream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s\n", sum, p); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2); return sum1 + sum2; });// accumulator: sum=0; person=Pamela// accumulator: sum=0; person=David// accumulator: sum=0; person=Max// accumulator: sum=0; person=Peter// combiner: sum1=18; sum2=23// combiner: sum1=23; sum2=12// combiner: sum1=41; sum2=35

通过并行的方式执行上面的stream操作,得到的是另外一种完全不相同的执行动作。在并行stream中combiner方法会被调用。这是由于累加器是被并行调用的,因此组合器需要对分开的累加操作进行求和。

下一章会详细描述并行stream。

Parallel Streams(并行流)###

为了提高大量输入时的执行效率,stream可以采用并行的放行执行。并行流(Parallel Streams)通过ForkJoinPool.commonPool() 方法获取一个可用的ForkJoinPool。这个ForkJoinPool使用5个线程(实际上是由底层可用的物理cpu核数决定的)。

ForkJoinPool commonPool = ForkJoinPool.commonPool();System.out.println(commonPool.getParallelism()); // 3

On my machine the common pool is initialized with a parallelism of 3 per default. This value can be decreased or increased by setting the following JVM parameter:

在我的机器上公共池初始化为每个默认3并行,这个值可以通过调整jvm参数来修改:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5

Collections中包含parallelStream()方法,通过这个方法能够为Collections中的元素创建并行流。另外也可以调用stream的parallel()方法将一个顺序流转变为一个并行流的拷贝。

为了了解并行流的执行动作,下面的例子会打印当前线程的执行信息。

Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName()));

执行的结果如下:

filter: b1 [main]filter: a2 [ForkJoinPool.commonPool-worker-1]map: a2 [ForkJoinPool.commonPool-worker-1]filter: c2 [ForkJoinPool.commonPool-worker-3]map: c2 [ForkJoinPool.commonPool-worker-3]filter: c1 [ForkJoinPool.commonPool-worker-2]map: c1 [ForkJoinPool.commonPool-worker-2]forEach: C2 [ForkJoinPool.commonPool-worker-3]forEach: A2 [ForkJoinPool.commonPool-worker-1]map: b1 [main]forEach: B1 [main]filter: a1 [ForkJoinPool.commonPool-worker-3]map: a1 [ForkJoinPool.commonPool-worker-3]forEach: A1 [ForkJoinPool.commonPool-worker-3]forEach: C1 [ForkJoinPool.commonPool-worker-2]

通过分析调试输出,我们可以更好地了解哪一个线程执行了哪些stream操作。从上面的输出中我们可以看到parallel stream使用了ForkJoinPool提供的所有可用的线程来执行流的各种操作。由于不能确定哪个线程会执行并行流的哪个操作,因此反复执行上面的代码,打印的结果会不同。

扩充上面的例子,添加sort操作

Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .sorted((s1, s2) -> { System.out.format("sort: %s <> %s [%s]\n", s1, s2, Thread.currentThread().getName()); return s1.compareTo(s2); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName()));

执行结果如下:

filter: c2 [ForkJoinPool.commonPool-worker-3]

filter: c1 [ForkJoinPool.commonPool-worker-2]

map: c1 [ForkJoinPool.commonPool-worker-2]

filter: a2 [ForkJoinPool.commonPool-worker-1]

map: a2 [ForkJoinPool.commonPool-worker-1]

filter: b1 [main]

map: b1 [main]

filter: a1 [ForkJoinPool.commonPool-worker-2]

map: a1 [ForkJoinPool.commonPool-worker-2]

map: c2 [ForkJoinPool.commonPool-worker-3]

sort: A2 <> A1 [main]

sort: B1 <> A2 [main]

sort: C2 <> B1 [main]

sort: C1 <> C2 [main]

sort: C1 <> B1 [main]

sort: C1 <> C2 [main]

forEach: A1 [ForkJoinPool.commonPool-worker-1]

forEach: C2 [ForkJoinPool.commonPool-worker-3]

forEach: B1 [main]

forEach: A2 [ForkJoinPool.commonPool-worker-2]

forEach: C1 [ForkJoinPool.commonPool-worker-1]

这个执行结果看起来比较奇怪,看起来sort操作只是在main线程中顺序执行的。实际上,parallel stream中的sort操作使用了JAVA 8的一个新方法:Arrays.parallelSort()。JAVA doc中是这样描述Arrays.parallelSort()的:待排序数组的长度决定了排序操作是顺序执行还是并行执行。java doc 描述如下:

If the length of the specified array is less than the minimum granularity, then it is sorted using the appropriate Arrays.sort method.

回到上一章的例子,我们已经了解combiner方法只能在parallel streams中调用,让我们来看下那些线程被实际调用:

List<Person> persons = Arrays.asList( new Person("Max", 18), new Person("Peter", 23), new Person("Pamela", 23), new Person("David", 12));persons .parallelStream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s [%s]\n", sum, p, Thread.currentThread().getName()); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s [%s]\n", sum1, sum2, Thread.currentThread().getName()); return sum1 + sum2; });

执行结果如下:

accumulator: sum=0; person=Pamela; [main]accumulator: sum=0; person=Max; [ForkJoinPool.commonPool-worker-3]accumulator: sum=0; person=David; [ForkJoinPool.commonPool-worker-2]accumulator: sum=0; person=Peter; [ForkJoinPool.commonPool-worker-1]combiner: sum1=18; sum2=23; [ForkJoinPool.commonPool-worker-1]combiner: sum1=23; sum2=12; [ForkJoinPool.commonPool-worker-2]combiner: sum1=41; sum2=35; [ForkJoinPool.commonPool-worker-2]

从控制台输出可以看到accumulator和combiner操作都被可用的线程并行执行了。

总结起来:在大数据量输入的时候,parallel streams可以带来比较大的性能提升。但是应该记住,一些并行操作,比如:reduce,collect需要额外的计算(组合操作),但是在顺序流中,这些组合操作是不需要的。

另外,我们知道所有的parallel stream操作共享一个jvm范围内的ForkJoinPool,所以你应该注意避免在parallel stream上执行慢阻塞流操作,因为这些操作可能导致你应用中依赖parallel streams操作的其他部分也会响应变慢。

结尾###

如果你想更多了解JAVA 8 的stream,你可以阅读stream的JAVA doc,如果你想更深入了解stream的底层机制,你可以阅读Martin Fowlers的文章Collection Pipelines。

如果你对js也感兴趣,你可以查看Stream.js(一个用js实现的java 8 stream api),你也可以查看我写的java8教程。

标签: #javatutorial