RxJava Observable.fromEmitter奇数个反压行为
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了RxJava Observable.fromEmitter奇数个反压行为,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含1573字,纯文字阅读大概需要3分钟。
内容图文
![RxJava Observable.fromEmitter奇数个反压行为](/upload/InfoBanner/zyjiaocheng/666/397bbb09e1224b7fac45487f5b6a345a.jpg)
我一直在使用Observable.fromEmitter()作为Observable.create()的绝佳替代品.我最近遇到了一些奇怪的行为,我无法完全弄清楚为什么会这样.我真的很感谢一个对背压有一定了解的人,以及调度程序来看看这一点.
public final class EmitterTest {
public static void main(String[] args) {
Observable<Integer> obs = Observable.fromEmitter(emitter -> {
for (int i = 1; i < 1000; i++) {
if (i % 5 == 0) {
sleep(300L);
}
emitter.onNext(i);
}
emitter.onCompleted();
}, Emitter.BackpressureMode.LATEST);
obs.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation())
.subscribe(value -> System.out.println("Received " + value)); // Why does this get stuck at "Received 128"
sleep(10000L);
}
private static void sleep(Long duration) {
try {
Thread.sleep(duration);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
该应用程序的输出是
Received 1
Received 2
...
Received 128
然后它仍然停留在128(可能是因为这是RxJava的默认缓冲区大小).
如果我将fromEmitter()中指定的模式更改为BackpressureMode.NONE,则代码将按预期工作.如果我删除对observeOn()的调用,它也将按预期工作.有谁能解释为什么会这样?
解决方法:
这是同一个池的死锁情况. subscriptionOn在正在使用的同一线程上调度下游请求,但是如果该线程正忙于睡眠/发射循环,则该请求将永远不会传递到fromEmitter,因此在一段时间之后LATEST开始删除元素,直到最后如果主源等待足够长的时间,则将传递最后一个值(999). (与我们删除的onBackpressureBlock相似的情况.)
如果subscribeOn没有执行此请求调度,则该示例将正常工作.
我已经打开an issue来制定解决方案.
现在的解决方法是使用更大的缓冲区大小与observeOn(有重载)或使用fromEmitter(f,NONE).subscribeOn().onBackpressureLatest().observeOn()
内容总结
以上是互联网集市为您收集整理的RxJava Observable.fromEmitter奇数个反压行为全部内容,希望文章能够帮你解决RxJava Observable.fromEmitter奇数个反压行为所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。