首页 / JAVA / 使用多线程RxJava进行反应式拉取
使用多线程RxJava进行反应式拉取
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了使用多线程RxJava进行反应式拉取,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含1428字,纯文字阅读大概需要3分钟。
内容图文
我正在尝试在RxJava中构建反应式拉式观察器.
我的观察者是这样的:
Observable<Command> myObs = Observable.create(s -> {
Command command;
int i = 0;
do {
command = NetworkOperation1.call(i);
logger.info("Init command " + i);
s.onNext(command);
i++;
} while (!command.isLast() && i < MAX);
s.onCompleted();
});
我想以4个并发批处理(缓冲区)进行处理,如下所示:
myObs
.buffer(10)
.flatMap(batch -> {
return Observable
.from(batch)
.subscribeOn(Schedulers.io())
.map(c -> {
Intermediate m = NetworkOperation2.call(c));
logger.info("Done intermediate " + m.id);
return m;
}
}, 4);
然后,我需要以不同的大小批处理结果,如下所示:
.buffer(25)
.subscribeOn(Schedulers.newThread())
.subscribe(list ->
logger.info("Finished batch with " + list.size());
问题在于Observable中的命令是一次全部处理的,而我希望它们根据需要进行处理.
这是发生的情况的日志:(请注意,同时运行所有1000条命令,而不是根据需要调用)
Init command 0
Init command 1
Init command 2
...
Init command 999
Done intermediate 0
Done intermediate 1
...
Done intermediate 24
Finished batch with 25
Done intermediate 25
Done intermediate 26
...
Done intermediate 49
Finished batch with 25
...
问题:是否有一种方法可以暂停观察者的线程,以便它不会一次发出所有命令或类似的东西?我试过了request()运算符,但无法正常工作.
谢谢.
解决方法:
您需要背压感知源和运算符.您正在使用的运算符支持背压,但您的源不支持.
改为这样做:
myObs = Observable.range(1,1000)
.map(i -> NetworkOperation1.call(i));
Observable.range支持背压,因此仅在需要时才会发出.
内容总结
以上是互联网集市为您收集整理的使用多线程RxJava进行反应式拉取全部内容,希望文章能够帮你解决使用多线程RxJava进行反应式拉取所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。