用 Java 实现 Stream 高效混排与 Spliterator
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了用 Java 实现 Stream 高效混排与 Spliterator,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含13614字,纯文字阅读大概需要20分钟。
内容图文
对 Stream 执行排序操作只要调用排序 API 就好了,要实现相反的效果(混排)却并不简单。
本文介绍了如何使用 Java Stream `Collectors` 工厂方法与自定义 `Spliterator` 对 Stream 进行 Shuffle(混排),支持 Eager 与 Lazy 两种模式。
1. Eager Shuffle Collector
Heinz [在这篇文章][1]中给出了一种解决方案:将整个 Stream 转换为 list,对 list 执行 `Collections#shuffle`,再转为 Stream。像下面这样封装成一个复合操作:
[1]:https://www.javaspecialists.eu/archive/Issue258.html
```java
public static <T> Collector<T, ?, Stream<T>> toEagerShuffledStream() {
? ?return Collectors.collectingAndThen(
? ? ?toList(),
? ? ?list -> {
? ? ? ? ?Collections.shuffle(list);
? ? ? ? ?return list.stream();
? ? ?});
}
```
这种方法适用于对 Steam 中所有元素进行混排。由于会提前对集合中所有元素进行 Shuffle,如果只处理其中一部分则效果不佳,极端情况比如 Stream 只包含1个元素。
让我们来看看一个简单基准测试的运行结果:
```java
@State(Scope.Benchmark)
public class RandomSpliteratorBenchmark {
? ?private List<String> source;
? ?@Param({"1", "10", "100", "1000", "10000", "10000"})
? ?public int limit;
? ?@Param({"100000"})
? ?public int size;
? ?@Setup(Level.Iteration)
? ?public void setUp() {
? ? ? ?source = IntStream.range(0, size)
? ? ? ? ?.boxed()
? ? ? ? ?.map(Object::toString)
? ? ? ? ?.collect(Collectors.toList());
? ?}
? ?@Benchmark
? ?public List<String> eager() {
? ? ? ?return source.stream()
? ? ? ? ?.collect(toEagerShuffledStream())
? ? ? ? ?.limit(limit)
? ? ? ? ?.collect(Collectors.toList());
? ?}
```
```shell
? ? ? ? ? ?(limit) ? Mode ?Cnt ? ? Score ? ? Error ?Units
eager ? ? ? ? ? ? 1 ?thrpt ? ?5 ? 467.796 ± ? 9.074 ?ops/s
eager ? ? ? ? ? ?10 ?thrpt ? ?5 ? 467.694 ± ?17.166 ?ops/s
eager ? ? ? ? ? 100 ?thrpt ? ?5 ? 459.765 ± ? 8.048 ?ops/s
eager ? ? ? ? ?1000 ?thrpt ? ?5 ? 467.934 ± ?43.095 ?ops/s
eager ? ? ? ? 10000 ?thrpt ? ?5 ? 449.471 ± ? 5.549 ?ops/s
eager ? ? ? ?100000 ?thrpt ? ?5 ? 331.111 ± ? 5.626 ?ops/s
```
从上面的数据可以看出,尽管运行结果 Stream 中元素不断增加,运行效果还是相当不错。因此,对整个集合提前混排太浪费了,尤其是元素较少的时候得分很差。
让我们看看来有什么好办法。
2. Lazy Shuffle Collector
为了节省 CPU 资源,与其对集合中所有元素预处理,不如根据需要只处理其中一部分。
为了达到这个效果,需要自定义一个 Spliterator 对所有对元素随机遍历,然后通过 `StreamSupport.stream` 构造创建一个 Stream 对象:
```java
public class RandomSpliterator<T> implements Spliterator<T> {
? ?// ...
? ?public static <T> Collector<T, ?, Stream<T>> toLazyShuffledStream() {
? ? ? ?return Collectors.collectingAndThen(
? ? ? ? ?toList(),
? ? ? ? ?list -> StreamSupport.stream(
? ? ? ? ? ?new ShuffledSpliterator<>(list), false));
? ?}
}
```
3. 实现细节
即使只取出一个随机元素,也不能避免计算整个 Steam 中的元素(这意味着不支持无限序列)。因此,可以用 `List<T>` 初始化 `RandomSpliterator<T>`。“注意,这里有一个陷阱”。
如果给定 `List` 不支持在常量时间内完成随机访问,这种方案要比 Eager 方案慢得多。为了避免这种情况,可以在实例化 `Spliterator` 的时候进行简单检查:
```java
private RandomSpliterator(
?List<T> source, Supplier<? extends Random> random) {
? ?if (source.isEmpty()) { ... } // throw
? ?this.source = source instanceof RandomAccess
? ? ?? source
? ? ?: new ArrayList<>(source);
? ?this.random = random.get();
}
```
相比随机访问时间复杂度不是 O(1) 的实现,创建 `ArrayList` 的成本可以忽略不计。
现在重写最重要的 `tryAdvance()` 方法。实现很简单,每次迭代都从 `source` 集合中随机挑选并删除一个元素。
不必担心 `source` 发生改变。这里不发布 `RandomSpliterator`,只返回基于它的一个 `Collector`:
```java
@Override
public boolean tryAdvance(Consumer<? super T> action) {
? ?int remaining = source.size();
? ?if (remaining > 0 ) {
? ? ? ?action.accept(source.remove(random.nextInt(remaining)));
? ? ? ?return true;
? ?} else {
? ? ? ?return false;
? ?}
}
```
除此之外,还需要实现其它3个方法:
```java
@Override
public Spliterator<T> trySplit() {
? ?return null; // 表示 split 可不行
}
@Override
public long estimateSize() {
? ?return source.size();
}
@Override
public int characteristics() {
? ?return SIZED;
}
```
现在检查一下是否有效果:
```java
IntStream.range(0, 10).boxed()
?.collect(toLazyShuffledStream())
?.forEach(System.out::println);
```
结果如下:
```shell
3
4
8
1
7
6
5
0
2
9
```
4. 性能考虑
在这个实现中,我们把大小为 N 的数组换成 M 查找或删除:
N:集合大小
M:挑选元素的数量
从 `ArrayList` 中查找或删除单个元素通常比交换开销大,因此方案的可扩展性不够好。但是对于 M 值较小的时候性能会好很多。
现在对比 Eager 方案(都包含100000个对象):
```shell
? ? ? ? ? ?(limit) ? Mode ?Cnt ? ? Score ? ? Error ?Units
eager ? ? ? ? ? ? 1 ?thrpt ? ?5 ? 467.796 ± ? 9.074 ?ops/s
eager ? ? ? ? ? ?10 ?thrpt ? ?5 ? 467.694 ± ?17.166 ?ops/s
eager ? ? ? ? ? 100 ?thrpt ? ?5 ? 459.765 ± ? 8.048 ?ops/s
eager ? ? ? ? ?1000 ?thrpt ? ?5 ? 467.934 ± ?43.095 ?ops/s
eager ? ? ? ? 10000 ?thrpt ? ?5 ? 449.471 ± ? 5.549 ?ops/s
eager ? ? ? ?100000 ?thrpt ? ?5 ? 331.111 ± ? 5.626 ?ops/s
lazy ? ? ? ? ? ? ?1 ?thrpt ? ?5 ?1530.763 ± ?72.096 ?ops/s
lazy ? ? ? ? ? ? 10 ?thrpt ? ?5 ?1462.305 ± ?23.860 ?ops/s
lazy ? ? ? ? ? ?100 ?thrpt ? ?5 ? 823.212 ± 119.771 ?ops/s
lazy ? ? ? ? ? 1000 ?thrpt ? ?5 ? 166.786 ± ?16.306 ?ops/s
lazy ? ? ? ? ?10000 ?thrpt ? ?5 ? ?19.475 ± ? 4.052 ?ops/s
lazy ? ? ? ? 100000 ?thrpt ? ?5 ? ? 4.097 ± ? 0.416 ?ops/s
```
(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/chart.png)
可以明显看到,如果数据流元素较少,新方案的性能优于前者。但随着“处理数量/集合大小”增加,吞吐量急剧下降。
这是因为从 `ArrayList` 中移除元素会带来额外开销,每次移除都会调用 `System#arraycopy` 对内部数组执行移位操作,开销较大。
对于较大的集合(1000000个元素)可以看到类似的模式:
```shell
? ? ?(limit) ? ?(size) ? Mode ?Cnt ?Score ? Err ?Units
eager ? ? ? 1 ?10000000 ?thrpt ? ?5 ?0.915 ? ? ? ?ops/s
eager ? ? ?10 ?10000000 ?thrpt ? ?5 ?0.783 ? ? ? ?ops/s
eager ? ? 100 ?10000000 ?thrpt ? ?5 ?0.965 ? ? ? ?ops/s
eager ? ?1000 ?10000000 ?thrpt ? ?5 ?0.936 ? ? ? ?ops/s
eager ? 10000 ?10000000 ?thrpt ? ?5 ?0.860 ? ? ? ?ops/s
lazy ? ? ? ?1 ?10000000 ?thrpt ? ?5 ?4.338 ? ? ? ?ops/s
lazy ? ? ? 10 ?10000000 ?thrpt ? ?5 ?3.149 ? ? ? ?ops/s
lazy ? ? ?100 ?10000000 ?thrpt ? ?5 ?2.060 ? ? ? ?ops/s
lazy ? ? 1000 ?10000000 ?thrpt ? ?5 ?0.370 ? ? ? ?ops/s
lazy ? ?10000 ?10000000 ?thrpt ? ?5 ?0.05 ? ? ? ? ops/s
```
(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/chart-2.png)
在更小集合(128个元素)上的表现:
```shell
? ? ? (limit) ? ?(size) ? Mode ?Cnt ? ? ? Score ? Error ?Units
eager ? ? ? ?2 ? ? 128 ? ?thrpt ? ?5 ?246439.459 ? ? ? ? ?ops/s
eager ? ? ? ?4 ? ? 128 ? ?thrpt ? ?5 ?333866.936 ? ? ? ? ?ops/s
eager ? ? ? ?8 ? ? 128 ? ?thrpt ? ?5 ?340296.188 ? ? ? ? ?ops/s
eager ? ? ? 16 ? ? 128 ? ?thrpt ? ?5 ?345533.673 ? ? ? ? ?ops/s
eager ? ? ? 32 ? ? 128 ? ?thrpt ? ?5 ?231725.156 ? ? ? ? ?ops/s
eager ? ? ? 64 ? ? 128 ? ?thrpt ? ?5 ?314324.265 ? ? ? ? ?ops/s
eager ? ? ?128 ? ? 128 ? ?thrpt ? ?5 ?270451.992 ? ? ? ? ?ops/s
lazy ? ? ? ? 2 ? ? 128 ? ?thrpt ? ?5 ?765989.718 ? ? ? ? ?ops/s
lazy ? ? ? ? 4 ? ? 128 ? ?thrpt ? ?5 ?659421.041 ? ? ? ? ?ops/s
lazy ? ? ? ? 8 ? ? 128 ? ?thrpt ? ?5 ?652685.515 ? ? ? ? ?ops/s
lazy ? ? ? ?16 ? ? 128 ? ?thrpt ? ?5 ?470346.570 ? ? ? ? ?ops/s
lazy ? ? ? ?32 ? ? 128 ? ?thrpt ? ?5 ?324174.691 ? ? ? ? ?ops/s
lazy ? ? ? ?64 ? ? 128 ? ?thrpt ? ?5 ?186472.090 ? ? ? ? ?ops/s
lazy ? ? ? 128 ? ? 128 ? ?thrpt ? ?5 ?108105.699 ? ? ? ? ?ops/s
```
(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/chart-3.png)
能不能进一步优化?
5. 进一步提高性能
不幸的是,现有的解决方案扩展性不尽如人意,让我们试着改进。但在此之前,先对现有操作进行测评:
(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/Screenshot-2019-01-03-at-16.36.58.png)
不出意外,`Arraylist#remove` 是开销最大的操作之一。换句话说,从 `ArrayList` 中删除元素耗费了大量 CPU 资源。
为什么呢?从 `ArrayList` 中删除元素会对底层实现的数组执行移除操作。问题是,Java 数组不会自动调整大小,每次移除都会创建一个更小的新数组:
```java
private void fastRemove(Object[] es, int i) {
? ?modCount++;
? ?final int newSize;
? ?if ((newSize = size - 1) > i)
? ? ? ?System.arraycopy(es, i + 1, es, i, newSize - i);
? ?es[size = newSize] = null;
}
```
接下来该怎么办?避免从 `ArrayList` 中移除元素。
为了达到这个效果,可以用一个数组存储剩余的元素并记录它的大小:
```java
public class ImprovedRandomSpliterator<T> implements Spliterator<T> {
? ?private final Random random;
? ?private final T[] source;
? ?private int size;
? ?private ImprovedRandomSpliterator(
? ? ?List<T> source, Supplier<? extends Random> random) {
? ? ? ?if (source.isEmpty()) {
? ? ? ? ? ?throw new IllegalArgumentException(...);
? ? ? ?}
? ? ? ?this.source = (T[]) source.toArray();
? ? ? ?this.random = random.get();
? ? ? ?this.size = this.source.length;
? ?}
}
```
幸运的是,由于 `Spliterator` 的实例不会在线程之间共享,因此不会遇到并发问题。
现在尝试移除元素时,实际上不需要创建缩小后的新数组。相反,只要减小 `size` 并忽略数组的其余部分即可。
在此之前,把最后一个元素与返回的元素交换:
```java
@Override
public boolean tryAdvance(Consumer<? super T> action) {
? ?if (size > 0) {
? ? ? ?int nextIdx = random.nextInt(size);
? ? ? ?int lastIdx = size - 1;
? ? ? ?action.accept(source[nextIdx]);
? ? ? ?source[nextIdx] = source[lastIdx];
? ? ? ?source[lastIdx] = null; // let object be GCed
? ? ? ?size--;
? ? ? ?return true;
? ?} else {
? ? ? ?return false;
? ?}
}
```
对改进后的方案进行评测,可以看到开销最大的调用已经消失了:
(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/Screenshot-2019-01-03-at-16.38.47.png)
准备在此运行基准测试进行比较:
```shell
? ? ? ? ? ? ? (limit) ?(size) ? Mode ?Cnt ? ? Score ? ? Error ?Units
eager ? ? ? ? ? ? ? ?1 ?100000 ?thrpt ? ?3 ? 456.811 ± ?20.585 ?ops/s
eager ? ? ? ? ? ? ? 10 ?100000 ?thrpt ? ?3 ? 469.635 ± ?23.281 ?ops/s
eager ? ? ? ? ? ? ?100 ?100000 ?thrpt ? ?3 ? 466.486 ± ?68.820 ?ops/s
eager ? ? ? ? ? ? 1000 ?100000 ?thrpt ? ?3 ? 454.459 ± ?13.103 ?ops/s
eager ? ? ? ? ? ?10000 ?100000 ?thrpt ? ?3 ? 443.640 ± ?96.929 ?ops/s
eager ? ? ? ? ? 100000 ?100000 ?thrpt ? ?3 ? 335.134 ± ?21.944 ?ops/s
lazy ? ? ? ? ? ? ? ? 1 ?100000 ?thrpt ? ?3 ?1587.536 ± 389.128 ?ops/s
lazy ? ? ? ? ? ? ? ?10 ?100000 ?thrpt ? ?3 ?1452.855 ± 406.879 ?ops/s
lazy ? ? ? ? ? ? ? 100 ?100000 ?thrpt ? ?3 ? 814.978 ± 242.077 ?ops/s
lazy ? ? ? ? ? ? ?1000 ?100000 ?thrpt ? ?3 ? 167.825 ± 129.559 ?ops/s
lazy ? ? ? ? ? ? 10000 ?100000 ?thrpt ? ?3 ? ?19.782 ± ? 8.596 ?ops/s
lazy ? ? ? ? ? ?100000 ?100000 ?thrpt ? ?3 ? ? 3.970 ± ? 0.408 ?ops/s
lazy_improved ? ? ? ?1 ?100000 ?thrpt ? ?3 ?1509.264 ± 170.423 ?ops/s
lazy_improved ? ? ? 10 ?100000 ?thrpt ? ?3 ?1512.150 ± 143.927 ?ops/s
lazy_improved ? ? ?100 ?100000 ?thrpt ? ?3 ?1463.093 ± 593.370 ?ops/s
lazy_improved ? ? 1000 ?100000 ?thrpt ? ?3 ?1451.007 ± ?58.948 ?ops/s
lazy_improved ? ?10000 ?100000 ?thrpt ? ?3 ?1148.581 ± 232.218 ?ops/s
lazy_improved ? 100000 ?100000 ?thrpt ? ?3 ? 383.022 ± ?97.082 ?ops/s
```
(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/chart-5.png)
从上面的结果可以看出,改进后的方案性能受元素数量变化影响显著减小。
实际上,即使遇到最差情况,改进方案的性能也比基于 `Collections#shuffle` 的方案略好一些。
6. 完整示例
完整示例可以在 [GitHub][2] 上找到。
[2]:https://github.com/pivovarit/articles/tree/master/java-random-stream
```java
package com.pivovarit.stream;
import java.util.List;
import java.util.Random;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class ImprovedRandomSpliterator<T> implements Spliterator<T> {
? ?private final Random random;
? ?private final T[] source;
? ?private int size;
? ?ImprovedRandomSpliterator(List<T> source, Supplier<? extends Random> random) {
? ? ? ?if (source.isEmpty()) {
? ? ? ? ? ?throw new IllegalArgumentException("RandomSpliterator can't be initialized with an empty collection");
? ? ? ?}
? ? ? ?this.source = (T[]) source.toArray();
? ? ? ?this.random = random.get();
? ? ? ?this.size = this.source.length;
? ?}
? ? @Override
? ?public boolean tryAdvance(Consumer<? super T> action) {
? ? ? ?if (size > 0) {
? ? ? ? ? ?int nextIdx = random.nextInt(size);
? ? ? ? ? ?int lastIdx = size - 1;
? ? ? ? ? ?action.accept(source[nextIdx]);
? ? ? ? ? ?source[nextIdx] = source[lastIdx];
? ? ? ? ? ?source[lastIdx] = null; // let object be GCed
? ? ? ? ? ?size--;
? ? ? ? ? ?return true;
? ? ? ?} else {
? ? ? ? ? ?return false;
? ? ? ?}
? ?}
? ?@Override
? ?public Spliterator<T> trySplit() {
? ? ? ?return null;
? ?}
? ?@Override
? ?public long estimateSize() {
? ? ? ?return source.length;
? ?}
? ?@Override
? ?public int characteristics() {
? ? ? ?return SIZED;
? ?}
}
```
```java
package com.pivovarit.stream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Random;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static java.util.stream.Collectors.toCollection;
public final class RandomCollectors {
? ?private RandomCollectors() {
? ?}
? ?public static <T> Collector<T, ?, Stream<T>> toImprovedLazyShuffledStream() {
? ? ? ?return Collectors.collectingAndThen(
? ? ? ? ?toCollection(ArrayList::new),
? ? ? ? ?list -> !list.isEmpty()
? ? ? ? ? ?? StreamSupport.stream(new ImprovedRandomSpliterator<>(list, Random::new), false)
? ? ? ? ? ?: Stream.empty());
? ?}
? ?public static <T> Collector<T, ?, Stream<T>> toLazyShuffledStream() {
? ? ? ?return Collectors.collectingAndThen(
? ? ? ? ?toCollection(ArrayList::new),
? ? ? ? ?list -> !list.isEmpty()
? ? ? ? ? ?? StreamSupport.stream(new RandomSpliterator<>(list, Random::new), false)
? ? ? ? ? ?: Stream.empty());
? ?}
? ?public static <T> Collector<T, ?, Stream<T>> toEagerShuffledStream() {
? ? ? ?return Collectors.collectingAndThen(
? ? ? ? ?toCollection(ArrayList::new),
? ? ? ? ?list -> {
? ? ? ? ? ? ?Collections.shuffle(list);
? ? ? ? ? ? ?return list.stream();
? ? ? ? ?});
? ?}
}
```
内容总结
以上是互联网集市为您收集整理的用 Java 实现 Stream 高效混排与 Spliterator全部内容,希望文章能够帮你解决用 Java 实现 Stream 高效混排与 Spliterator所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。