龙空技术网

Java8 Lambda实现源码解析

阿里开发者 1203

前言:

当前你们对“java的源码”大概比较关怀,我们都想要知道一些“java的源码”的相关文章。那么小编也在网上搜集了一些对于“java的源码””的相关内容,希望同学们能喜欢,朋友们快快来了解一下吧!

Java8的lambda应该大家都比较熟悉了,这篇文章主要从源码层面探讨一下lambda的设计和实现

先看下面的示例代码:

 static class A {        @Getter        private String a;        @Getter        private Integer b;        public A(String a, Integer b) {            this.a = a;            this.b = b;        }    }    public static void main(String[] args) {        List<Integer> ret = Lists.newArrayList(new A("a", 1), new A("b", 2), new A("c", 3)).stream()            .map(A::getB)            .filter(b -> b >= 2)            .collect(Collectors.toList());        System.out.println(ret);    }

上面代码中,其实主要就是几步:

ArrayList.stream.map.filter.collect

一步步来看,ArrayList.stream 实际调用的是Collector.stream方法:

 default Stream<E> stream() {        return StreamSupport.stream(spliterator(), false);    }

spliterator()方法生成的是 IteratorSpliterator 对象,spliterator的意思就是可以split的iterator,这个主要是用于lambda中的parallelStream中的并行操作,上面的例子中由于调用的是stream,所以parallel=false。

StreamSupport.stream最后生成的是一个ReferencePipeline.Head对象:

 public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {        Objects.requireNonNull(spliterator);        return new ReferencePipeline.Head<>(spliterator,                                            StreamOpFlag.fromCharacteristics(spliterator),                                            parallel);    }

Head类是从ReferencePipeline派生的,表示lambda的pipeline中的头节点。

有了这个Head对象之后,在它之上调用.map,实际上就是调用了基类ReferencePipeline.map方法:

   public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {        Objects.requireNonNull(mapper);        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {            @Override            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {                return new Sink.ChainedReference<P_OUT, R>(sink) {                    @Override                    public void accept(P_OUT u) {                        downstream.accept(mapper.apply(u));                    }                };            }        };    }

返回的是一个StatelessOp,表示一个无状态的算子,这个类也是ReferencePipeline的子类,可以看到它的构造函数,第一个参数this,表示把Head对象作为StatelessOp对象的upstream,也就是它的上游。StatelessOp.opWrapSink方法先不讲,后面会讲到。

接着调用StatelessOp.filter方法,也还是会回到ReferencePipeline.filter方法:

public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {        Objects.requireNonNull(predicate);        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,                                     StreamOpFlag.NOT_SIZED) {            @Override            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {                    @Override                    public void begin(long size) {                        downstream.begin(-1);                    }                    @Override                    public void accept(P_OUT u) {                        if (predicate.test(u))                            downstream.accept(u);                    }                };            }        };    }

可以看到,仍然生成的是一个StatelessOp对象,只是它的upstream变了而已。

最后调用StatelessOp.collect,继续回到ReferencePipeline.collect方法:

  public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {        A container;        if (isParallel()                && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))                && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {            container = collector.supplier().get();            BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();            forEach(u -> accumulator.accept(container, u));        }        else {            container = evaluate(ReduceOps.makeRef(collector));        }        return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)               ? (R) container               : collector.finisher().apply(container);    }

在前面几步,.map, .filter方法其实都只是创建StatelessOp对象,但是到collect就不一样了,了解spark/flink的就知道,collect其实是个action/sink,调用了collect,就会真实地触发这个stream上各个operator的执行。这也就是我们经常听到的lazy execution,所有的操作,只有碰到action的算子才会开始执行。

之前讲到这个stream的parallel=false,所以上面的实际执行逻辑是:

A container = evaluate(ReduceOps.makeRef(collector));return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)               ? (R) container               : collector.finisher().apply(container);    }

在进入evaluate方法之前,先看一下ReduceOps.makeRef(collector),它实际上就是基于Collectors.toList生成的CollectorImpl实例包装了一层,返回了一个 TerminalOp对象(实际是ReduceOp)。

    public static <T, I> TerminalOp<T, I>    makeRef(Collector<? super T, I, ?> collector) {        Supplier<I> supplier = Objects.requireNonNull(collector).supplier();        BiConsumer<I, ? super T> accumulator = collector.accumulator();        BinaryOperator<I> combiner = collector.combiner();        class ReducingSink extends Box<I>                implements AccumulatingSink<T, I, ReducingSink> {            @Override            public void begin(long size) {                state = supplier.get();            }            @Override            public void accept(T t) {                accumulator.accept(state, t);            }            @Override            public void combine(ReducingSink other) {                state = combiner.apply(state, other.state);            }        }        return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {            @Override            public ReducingSink makeSink() {                return new ReducingSink();            }            @Override            public int getOpFlags() {                return collector.characteristics().contains(Collector.Characteristics.UNORDERED)                       ? StreamOpFlag.NOT_ORDERED                       : 0;            }        };    }

上面代码可以看到,基本也就是直接调用了collector的实现,稍微需要注意的是,ReducingSink从Box派生,Box的意思就是盒子,它里面有个state成员,表示一个计算的状态。ReducingSink就是通过这个state,进行combine, accumulate操作(实际就是一个List)。

回到evaluate方法,它实际调用了:

terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));

这里this就是最后阶段的ReferencePipeline,即StatelessOp,这里我们称它为 ReferencePipeline$2,即经过两个算子操作的pipeline。

sourceSpliterator 则会取到sourceStage的spliterator,即最上面Head的spliterator。

ReduceOp.evaluateSequential:

   public <P_IN> R evaluateSequential(PipelineHelper<T> helper,                                           Spliterator<P_IN> spliterator) {            return helper.wrapAndCopyInto(makeSink(), spliterator).get();        }

helper即ReferencePipeline$2,这里makeSink即上面返回的ReducingSink重载的方法。

ReferencePipeline.wrapAndCopyInto,在其父类AbstractPipeline中实现:

        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);        return sink;

wrapSink代码:

    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {        Objects.requireNonNull(sink);        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);        }        return (Sink<P_IN>) sink;    }

可以看到,这里就是将pipeline从后至前,分别调用每个pipeline的opWrapSink方法,就是一个责任链的模式。opWrapSink可以看上面map的opWrapSink的filter的opWrapSink实现,map的很简单,直接调用mapper.apply,实际上就是A::getB方法,filter的也很简单,调用的是 predicate.test 方法。

接下来到copyInto方法,到这里才会有真正的执行逻辑:

 final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {        Objects.requireNonNull(wrappedSink);        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {            wrappedSink.begin(spliterator.getExactSizeIfKnown());            spliterator.forEachRemaining(wrappedSink);            wrappedSink.end();        }        else {            copyIntoWithCancel(wrappedSink, spliterator);        }    }

它会走入到这部分的逻辑中:

wrappedSink.begin(spliterator.getExactSizeIfKnown());spliterator.forEachRemaining(wrappedSink);wrappedSink.end();

这里面最重要的是就是中间这行了,由于spliterator持有的Collection引用,是ArrayList,因此它会调用ArrayList.forEachRemaining方法:

public void forEachRemaining(Consumer<? super E> action) {    // ...    if ((i = index) >= 0 && (index = hi) <= a.length) {       for (; i < hi; ++i) {           @SuppressWarnings("unchecked") E e = (E) a[i];           action.accept(e);       }       if (lst.modCount == mc)           return;   }    // ...

这里的action参数,就是上面经过责任链封装的Sink(它也是Consumer的子类)。

而这里调用action.accept,就会通过责任链来一层层调用每个算子的accept,我们从map的accept开始:

@OverrideSink<P_OUT> opWrapSink(int flags, Sink<R> sink) {    return new Sink.ChainedReference<P_OUT, R>(sink) {        @Override        public void accept(P_OUT u) {            downstream.accept(mapper.apply(u));        }    };}

可以看到,它先调用mapper.apply,然后把结果直接传给downstream.accept,也就是调用filter的accept,接着来到ReducingSink.accept,也就是往state中添加一个结果元素,这样forEach执行完之后,结果自然就有了。

看完上面的流程,接下来看一下lambda里面部分类设计,首先来看一下Stream,它的基类是BaseStream,提供以下接口:

public interface BaseStream<T, S extends BaseStream<T, S>>        extends AutoCloseable {    /**     * 返回stream中元素的迭代器        */    Iterator<T> iterator();    /**     * 返回stream中元素的spliterator,用于并行执行     */    Spliterator<T> spliterator();    /**     * 是否并行     */    boolean isParallel();    /**     * 返回串行的stream,即强制parallel=false     */    S sequential();    /**     * 返回并行的stream,即强制parallel=true     */    S parallel();    // ...}

直接继承此接口的,是如IntStream, LongStream,DoubleStream等,这些是在BaseStream基础上,提供了filter, map, mapToObj, distinct等算子的接口,但是这些算子,是限定类型的,如IntStream.filter, 它接受的就是 IntPredicate, 而不是常规的Predicate;map方法也是,接受的是 IntUnaryOperator。

IntStream, LongStream这些都是接口,也就是仅仅用来描述算子的。它们的实现都是基于Pipeline的,基类为 AbstractPipeline,它的几个关键成员变量:

     /**      * 最顶上的pipeline,即Head      */    private final AbstractPipeline sourceStage;    /**     * 直接上游pipeline     */    private final AbstractPipeline previousStage;    /**     * 直接下游pipeline     */    @SuppressWarnings("rawtypes")    private AbstractPipeline nextStage;    /**     * pipeline深度     */    private int depth;        /**     * head的spliterator     */    private Spliterator<?> sourceSpliterator;     // ...

这个基类还提供了pipeline的基础实现,以及对BaseStream和PipelineHelper接口的实现,如evaluate, sourceStageSpliterator, wrapAndCopyInto, wrapSink等。

类似地,从AbstractPipeline派生的子类有:IntPipeline, LongPipeline, DoublePipeline, ReferencePipeline等。前面三种比较容易理解,提供的是基于原始类型的lambda操作(且都实现了对应的XXStream接口),而ReferencePipeline提供的是基于对象的lambda操作。

类层次如下:

注意这些子类,也都是abstract的,每一种pipeline下面,都有Head, StatelessOp, StatefulOp三个子类。分别用于描述pipeline的头节点,无状态中间算子,有状态中间算子。

点击查看原文,获取更多福利!

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

标签: #java的源码