的设计和实现(一)的源码


大家对Java8应该不陌生。 本文主要从源码层面讨论Java8的设计与实现。

看看下面的示例代码:

 static class A {
        @Getter
        private String a;
        @Getter
        private Integer b;
        public A(String a, Integer b) {
            this.a = a;
            this.b = b;
        }
    }
    public static void main(String[] args) {
        List ret = Lists.newArrayList(new A("a", 1), new A("b", 2), new A("c", 3)).stream()
            .map(A::getB)
            .filter(b -> b >= 2)
            .collect(Collectors.toList());
        System.out.println(ret);
    }

在上面的代码中,实际上有几个步骤:

..地图..

一步一步来,真正调用的是方法:

 default Stream stream() {
        return StreamSupport.stream(spliterator(), false);
    }

() 方法生成一个对象,这意味着它可以拆分。 这个主要用于中间的并行操作。 在上面的例子中,因为它被调用了,=false。

.最终结果是一个 .Head 对象:

 public static  Stream stream(Spliterator spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }

Head 类派生自并代表 .

有了这个Head对象之后,对其调用.map其实就是调用基类的.map方法:

   public final  Stream map(Function mapper) {
        Objects.requireNonNull(mapper);
        return new StatelessOp

(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { @Override Sink

opWrapSink(int flags, Sink sink) { return new Sink.ChainedReference

(sink) {

的设计和实现(一)的源码

@Override public void accept(P_OUT u) { downstream.accept(mapper.apply(u)); } }; } }; }

返回的是一个,表示无状态的操作符。 这个类也是 的子类。 你可以看到它的构造函数。 第一个参数this表示Head对象作为一个对象,即它的上游。 . 方法先不说,后面再说。

然后调用 . 方法,它仍然会返回到 . 方法:

public final Stream

filter(Predicate predicate) { Objects.requireNonNull(predicate); return new StatelessOp

(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) { @Override Sink

opWrapSink(int flags, Sink

sink) { return new Sink.ChainedReference

(sink) { @Override public void begin(long size) { downstream.begin(-1); } @Override public void accept(P_OUT u) { if (predicate.test(u)) downstream.accept(u); } }; } }; }

可以看到还是生成了一个对象,只是名字变了。

最后,打电话。 并继续回到 . 方法:

  public final  R collect(Collector collector) {
        A container;
        if (isParallel()
                && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
                && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
            container = collector.supplier().get();
            BiConsumer accumulator = collector.accumulator();
            forEach(u -> accumulator.accept(container, u));
        }
        else {
            container = evaluate(ReduceOps.makeRef(collector));

的设计和实现(一)的源码

} return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) ? (R) container : collector.finisher().apply(container); }

在前面的步骤中,.map 和. 实际上只是创建对象,但它们是不同的。 懂spark/flink的就知道,它其实就是一个/sink。 调用时,它实际上会触发其中每一个的执行。 这就是我们经常听到的惰性,所有的操作只有遇到才会执行。

前面提到了=false,所以上面的实际执行逻辑是:

A container = evaluate(ReduceOps.makeRef(collector));
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
               ? (R) container
               : collector.finisher().apply(container);
    }

在进入方法之前,我们先看一下.(),它实际上是根据生成的实例包裹了一层,并返回了一个对象(实际上)。

    public static  TerminalOp
    makeRef(Collector collector) {
        Supplier supplier = Objects.requireNonNull(collector).supplier();
        BiConsumer accumulator = collector.accumulator();
        BinaryOperator combiner = collector.combiner();
        class ReducingSink extends Box
                implements AccumulatingSink {
            @Override
            public void begin(long size) {
                state = supplier.get();
            }
            @Override
            public void accept(T t) {
                accumulator.accept(state, t);
            }
            @Override
            public void combine(ReducingSink other) {
                state = combiner.apply(state, other.state);
            }
        }
        return new ReduceOp(StreamShape.REFERENCE) {
            @Override
            public ReducingSink makeSink() {
                return new ReducingSink();
            }
            @Override
            public int getOpFlags() {
                return collector.characteristics().contains(Collector.Characteristics.UNORDERED)

? StreamOpFlag.NOT_ORDERED : 0; } }; }

从上面的代码可以看出,基本都是直接调用的实现。 需要注意一点,它是从 Box 派生的。 Box是盒子的意思,里面有一个state成员,表示一个计算的状态。 正是通过这个状态来执行操作(实际上是一个List)。

回到方法,它实际调用:

terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));

这里这是最后一个阶段源码,也就是这里我们称它为$2,也就是已经被两个算子操作过了。

将取之,即顶头。

.:

   public 

R evaluateSequential(PipelineHelper helper, Spliterator

spliterator) { return helper.wrapAndCopyInto(makeSink(), spliterator).get(); }

即$2,这里是上面返回的重载方法。

., 在其父类中实现:

        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;

代码:

    final 

Sink

wrapSink(Sink sink) { Objects.requireNonNull(sink); for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) { sink = p.opWrapSink(p.previousStage.combinedFlags, sink); } return (Sink

) sink; }

可以看出,这里是从后台到前台分别调用各个方法,是一种责任链模型。 您可以在上面看到地图的实现。 地图非常简单。 它直接调用.apply,其实就是A::getB方法。 这也很简单。 它调用 .test 方法。

在方法旁边,这里将是真正的执行逻辑:

 final 

void copyInto(Sink

wrappedSink, Spliterator

spliterator) { Objects.requireNonNull(wrappedSink); if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { wrappedSink.begin(spliterator.getExactSizeIfKnown()); spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); } else { copyIntoWithCancel(wrappedSink, spliterator); } }

会走进这部分逻辑:

wrappedSink.begin(spliterator.getExactSizeIfKnown());

的设计和实现(一)的源码

spliterator.forEachRemaining(wrappedSink); wrappedSink.end();

这里最重要的是中间线。 由于持有引用,它将调用 . 方法:

public void forEachRemaining(Consumer action) {
    // ...
    if ((i = index) >= 0 && (index = hi) <= a.length) {
       for (; i < hi; ++i) {
           @SuppressWarnings("unchecked") E e = (E) a[i];
           action.accept(e);
       }
       if (lst.modCount == mc)
           return;
   }
    // ...

这里的参数就是上面责任链封装的Sink(也是 的子类)。

并打电话。 这里会通过责任链层层调用各个算子。 我们从地图开始:

@Override
Sink

opWrapSink(int flags, Sink sink) { return new Sink.ChainedReference

(sink) { @Override public void accept(P_OUT u) { downstream.accept(mapper.apply(u)); } }; }

可以看到,它先调用了.apply,然后直接把传给了.,被调用了,然后来到了.,就是给state增加了一个元素,这样执行完之后,会自然就在那里。

看完上面的流程,我们再来看看内部的分类设计。 首先我们看一下它的基类,它提供了如下接口:

public interface BaseStream<T, S extends BaseStream>
        extends AutoCloseable {
    /**
     * 返回stream中元素的迭代器
        */
    Iterator iterator();
    /**
     * 返回stream中元素的spliterator,用于并行执行
     */
    Spliterator spliterator();
    /**
     * 是否并行
     */
    boolean isParallel();

/** * 返回串行的stream,即强制parallel=false */ S sequential(); /** * 返回并行的stream,即强制parallel=true */ S parallel(); // ... }

直接继承这个接口的有 , 等。这些都是基于提供 , map , 等操作符的接口,但是这些操作符的类型有限,比如 ., 它接受的是 of常规的; 地图方法也被接受。

,这些是接口源码,仅用于描述运算符。 它们的实现都是基于基类,它的几个关键成员变量:

     /**
      * 最顶上的pipeline,即Head
      */
    private final AbstractPipeline sourceStage;
    /**
     * 直接上游pipeline
     */
    private final AbstractPipeline previousStage;
    /**
     * 直接下游pipeline
     */
    @SuppressWarnings("rawtypes")
    private AbstractPipeline nextStage;
    /**
     * pipeline深度
     */
    private int depth;
    
    /**
     * head的spliterator
     */
    private Spliterator sourceSpliterator;
     // ...

该基类还提供了 的基本实现,以及接口的实现,如 , or , 等。

同样,子类派生自: , , 等。前三个比较容易理解。 它们提供基于原始类型的操作(并且都实现了相应的接口),同时它们提供了基于对象的操作。

类层次结构如下:

的设计和实现(一)的源码

请注意,这些子类别也是唯一的。 在它们每个之下,有 Head、 和 三个子类别。 它们分别用于描述头节点、无状态中间算子和有状态中间算子。


© 版权声明
THE END
喜欢就支持一下吧
点赞0赞赏 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容