大家对Java8应该不陌生。 本文主要从源码层面讨论Java8的设计与实现。
看看下面的示例代码:
static class A {
@Getter
private String a;
@Getter
private Integer b;
public A(String a, Integer b) {
this.a = a;
this.b = b;
}
}
public static void main(String[] args) {
List ret = Lists.newArrayList(new A("a", 1), new A("b", 2), new A("c", 3)).stream()
.map(A::getB)
.filter(b -> b >= 2)
.collect(Collectors.toList());
System.out.println(ret);
}
在上面的代码中,实际上有几个步骤:
..地图..
一步一步来,真正调用的是方法:
default Stream stream() {
return StreamSupport.stream(spliterator(), false);
}
() 方法生成一个对象,这意味着它可以拆分。 这个主要用于中间的并行操作。 在上面的例子中,因为它被调用了,=false。
.最终结果是一个 .Head 对象:
public static Stream stream(Spliterator spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
Head 类派生自并代表 .
有了这个Head对象之后,对其调用.map其实就是调用基类的.map方法:
public final Stream map(Function mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink
opWrapSink(int flags, Sink sink) {
return new Sink.ChainedReference
(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};
}
返回的是一个,表示无状态的操作符。 这个类也是 的子类。 你可以看到它的构造函数。 第一个参数this表示Head对象作为一个对象,即它的上游。 . 方法先不说,后面再说。
然后调用 . 方法,它仍然会返回到 . 方法:
public final Stream filter(Predicate predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp
(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink
opWrapSink(int flags, Sink
sink) {
return new Sink.ChainedReference
(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
可以看到还是生成了一个对象,只是名字变了。
最后,打电话。 并继续回到 . 方法:
public final R collect(Collector collector) {
A container;
if (isParallel()
&& (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
&& (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
container = collector.supplier().get();
BiConsumer accumulator = collector.accumulator();
forEach(u -> accumulator.accept(container, u));
}
else {
container = evaluate(ReduceOps.makeRef(collector));
}
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
? (R) container
: collector.finisher().apply(container);
}
在前面的步骤中,.map 和. 实际上只是创建对象,但它们是不同的。 懂spark/flink的就知道,它其实就是一个/sink。 调用时,它实际上会触发其中每一个的执行。 这就是我们经常听到的惰性,所有的操作只有遇到才会执行。
前面提到了=false,所以上面的实际执行逻辑是:
A container = evaluate(ReduceOps.makeRef(collector));
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
? (R) container
: collector.finisher().apply(container);
}
在进入方法之前,我们先看一下.(),它实际上是根据生成的实例包裹了一层,并返回了一个对象(实际上)。
public static TerminalOp
makeRef(Collector collector) {
Supplier supplier = Objects.requireNonNull(collector).supplier();
BiConsumer accumulator = collector.accumulator();
BinaryOperator combiner = collector.combiner();
class ReducingSink extends Box
implements AccumulatingSink {
@Override
public void begin(long size) {
state = supplier.get();
}
@Override
public void accept(T t) {
accumulator.accept(state, t);
}
@Override
public void combine(ReducingSink other) {
state = combiner.apply(state, other.state);
}
}
return new ReduceOp(StreamShape.REFERENCE) {
@Override
public ReducingSink makeSink() {
return new ReducingSink();
}
@Override
public int getOpFlags() {
return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
? StreamOpFlag.NOT_ORDERED
: 0;
}
};
}
从上面的代码可以看出,基本都是直接调用的实现。 需要注意一点,它是从 Box 派生的。 Box是盒子的意思,里面有一个state成员,表示一个计算的状态。 正是通过这个状态来执行操作(实际上是一个List)。
回到方法,它实际调用:
terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
这里这是最后一个阶段源码,也就是这里我们称它为$2,也就是已经被两个算子操作过了。
将取之,即顶头。
.:
public R evaluateSequential(PipelineHelper helper,
Spliterator
spliterator) {
return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}
即$2,这里是上面返回的重载方法。
., 在其父类中实现:
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
代码:
final Sink
wrapSink(Sink sink) {
Objects.requireNonNull(sink);
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink
) sink;
}
可以看出,这里是从后台到前台分别调用各个方法,是一种责任链模型。 您可以在上面看到地图的实现。 地图非常简单。 它直接调用.apply,其实就是A::getB方法。 这也很简单。 它调用 .test 方法。
在方法旁边,这里将是真正的执行逻辑:
final void copyInto(Sink
wrappedSink, Spliterator
spliterator) {
Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
会走进这部分逻辑:
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
这里最重要的是中间线。 由于持有引用,它将调用 . 方法:
public void forEachRemaining(Consumer action) {
// ...
if ((i = index) >= 0 && (index = hi) <= a.length) {
for (; i < hi; ++i) {
@SuppressWarnings("unchecked") E e = (E) a[i];
action.accept(e);
}
if (lst.modCount == mc)
return;
}
// ...
这里的参数就是上面责任链封装的Sink(也是 的子类)。
并打电话。 这里会通过责任链层层调用各个算子。 我们从地图开始:
@Override
Sink opWrapSink(int flags, Sink sink) {
return new Sink.ChainedReference
(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
可以看到,它先调用了.apply,然后直接把传给了.,被调用了,然后来到了.,就是给state增加了一个元素,这样执行完之后,会自然就在那里。
看完上面的流程,我们再来看看内部的分类设计。 首先我们看一下它的基类,它提供了如下接口:
public interface BaseStream<T, S extends BaseStream>
extends AutoCloseable {
/**
* 返回stream中元素的迭代器
*/
Iterator iterator();
/**
* 返回stream中元素的spliterator,用于并行执行
*/
Spliterator spliterator();
/**
* 是否并行
*/
boolean isParallel();
/**
* 返回串行的stream,即强制parallel=false
*/
S sequential();
/**
* 返回并行的stream,即强制parallel=true
*/
S parallel();
// ...
}
直接继承这个接口的有 , 等。这些都是基于提供 , map , 等操作符的接口,但是这些操作符的类型有限,比如 ., 它接受的是 of常规的; 地图方法也被接受。
,这些是接口源码,仅用于描述运算符。 它们的实现都是基于基类,它的几个关键成员变量:
/**
* 最顶上的pipeline,即Head
*/
private final AbstractPipeline sourceStage;
/**
* 直接上游pipeline
*/
private final AbstractPipeline previousStage;
/**
* 直接下游pipeline
*/
@SuppressWarnings("rawtypes")
private AbstractPipeline nextStage;
/**
* pipeline深度
*/
private int depth;
/**
* head的spliterator
*/
private Spliterator sourceSpliterator;
// ...
该基类还提供了 的基本实现,以及接口的实现,如 , or , 等。
同样,子类派生自: , , 等。前三个比较容易理解。 它们提供基于原始类型的操作(并且都实现了相应的接口),同时它们提供了基于对象的操作。
类层次结构如下:
请注意,这些子类别也是唯一的。 在它们每个之下,有 Head、 和 三个子类别。 它们分别用于描述头节点、无状态中间算子和有状态中间算子。
暂无评论内容