android – RXJava – buffer observable 1直到observable 2发出一个项目
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了android – RXJava – buffer observable 1直到observable 2发出一个项目,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含2326字,纯文字阅读大概需要4分钟。
内容图文
![android – RXJava – buffer observable 1直到observable 2发出一个项目](/upload/InfoBanner/zyjiaocheng/810/93b2cb2c1402495292581434d0e6bfb6.jpg)
我想要以下行为:
observableMain应该缓冲所有项目,直到observableResumed发出一个值.然后observableMain应该发出所有缓冲和所有功能值……
我在我的活动中做了什么onCreate:
PublishSubject<T> subject = ...; // I create a subject to emit items to and to subscribe to
// 1) I create a main observable from my subject
final Observable<T> observableMain = subject
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
// 2) I use a custom base class in which I can register listeners
// for the onResume event and which I can query the isResumed state!
// I call the object the pauseResumeProvider!
// 2.1) I create an observable, it emits a value ONLY if my activity is resumed
final Observable<Boolean> obsIsResumed = Observable
.defer(() -> Observable.just(pauseResumeProvider.isActivityResumed()))
.skipWhile(aBoolean -> aBoolean != true);
// 2.2) I create a second observable, it emits a value as soon as my activity is resumed
final Observable<Boolean> obsOnResumed = Observable.create(new Observable.OnSubscribe<Boolean>()
{
@Override
public void call(final Subscriber<? super Boolean> subscriber)
{
pauseResumeProvider.addPauseResumeListener(new IPauseResumeListener() {
@Override
public void onResume() {
pauseResumeProvider.removePauseResumeListener(this);
subscriber.onNext(true);
subscriber.onCompleted();
}
@Override
public void onPause() {
}
});
}
});
// 2.3) I combine the resumed observables and only emit the FIRST value I can get
final Observable<Boolean> observableResumed = Observable
.concat(obsIsResumed, obsOnResumed)
.first();
// 3) here I'm stuck
// 3 - Variant 1:
Observable<T> observable = observableMain
.buffer(observableResumed)
.concatMap(values -> Observable.from(values));
// 3 - Variant 2:
// Observable<T> observable = observableMain.delay(t -> observableResumed);
// 4) I emit events to my my subject...
// this event is LOST!
subject.onNext("Test in onCreate");
问题
恢复活动后发送给主题的所有项目都在工作,之前的所有项目都将丢失(至少使用延迟解决方案).我无法达到理想的行为.我该如何正确解决这个问题?
解决方法:
重播源并使用delaySubscription触发真实订阅.
PublishSubject<Integer> emitNow = PublishSubject.create();
ConnectableObservable<T> co = source.replay();
co.subscribe();
co.connect();
co.delaySubscription(emitNow).subscribe(...);
emitNow.onNext(1);
编辑:
Here is a gist与运算符,您可以升级到一个序列,可以暂停和恢复上游的排放.
内容总结
以上是互联网集市为您收集整理的android – RXJava – buffer observable 1直到observable 2发出一个项目全部内容,希望文章能够帮你解决android – RXJava – buffer observable 1直到observable 2发出一个项目所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。