【Rxjava + Retrofit 你需要掌握的几个经典技巧】教程文章相关的互联网学习教程文章

在rxjava中使用Observable序列进行一对多映射【代码】

给定一系列源对象的Observable,如何使用rxjava从每个输入对象映射多个输出对象? (一对多映射) 我有一些菜式清单,它们代表组成餐厅订单的物品.我需要将每个菜转变为一个或多个OrderLine.每张菜品地图都会为其名称价格创建一个OrderLine,为每个浇头创建一个OrderLine,如果有注释则创建一个OrderLine.INPUT List dishes = {...}OUTPUT List orderLines = {...}class Dish {public String name;public List toppings;public String no...

android-RxJava:如何从flatMap运算符中的错误中恢复【代码】

我有一个EditText,用户在其中输入搜索查询,当用户键入内容时,我想在服务器上执行即时搜索. 我尝试使用RxJava做到这一点,如下所示:RxTextView.textChanges(editQuery) // I'm using RxBinding for listening to text changes.flatMap(new Func1<CharSequence, Observable<UserPublic[]>>() {@Overridepublic Observable<UserPublic[]> call(CharSequence query) {return api.searchUsers(query); // I'm using Retrofit 1.9 for ne...

RxJava-AbstractOnSubscribe在1.1.0中去了哪里?

版本1.1.0刚问世,似乎他们放弃了rx.observables.AbstractOnSubscribeRemovals from Public APIObservable.onBackpressureBlock rx.observables.AbstractOnSubscribe Removal of stateful methods from the generic rx.subjects.Subject abstract classAbstractOnSubscribe是我创建非平凡可观察对象的首选方式.为什么将其删除,我可以用它替换什么?解决方法:AbstractOnSubscribe被标记为实验性的,因此总是带有被删除的机会. 该类已由...

RxJava / Android监视在不同时间触发的多个订阅者的进度【代码】

我正在寻找一种方法,希望使用RxJava来保持一致性,以监视可能在不同时间触发的多个订户的进度.我知道如何从一个方法中将所有用户同时触发后将它们合并或合并在一起,但是我不知道在不同时间从不同方法中触发它们时会采用哪种方法. 例如,如果我有2个长时间运行的任务附加到按钮按下上.我按下按钮1并触发可观察者/用户,在运行的一半途中,我按下按钮2来触发第二个观察者/用户. 我想在没有任务运行时启用按钮,而在一个或多个任务运行时禁...

结合多个可观察对象的RxJava弹性方式【代码】

我有多个返回Observable的模块:O1,O2,O3 …所有模块的结果应合并为一个可观察到的Ocomb,以使单个任务可能失败,但是合并不会因单个问题而终止或影响.使用当前的解决方案,我遇到了以下示例中的各种问题: 这段代码结合了我的模块的输出:public Observable<Data> getModuleData(){List<Observable<Data>> tasks = new ArrayList<>();for(MyModule module : modules){tasks.add(module.getData());}return Observable.mergeDelayErro...

单元测试时RxJava Schedulers.immediate()行为【代码】

我正在尝试为使用反应性接口的DAO对象编写测试.我有一个包含食谱的表,我想测试一下,当我向该表中插入数据时,订阅者会收到包含食谱的列表. 我正在使用TestSubscriber类,并对该类执行断言.我的简单测试如下所示:@Test fun testSubscriber() {insertItem()val testSubscriber = TestSubscriber.create<List<Recipe>>()recipeDao.getRecipes().subscribeOn(Schedulers.immediate()).subscribe(testSubscriber)testSubscriber.assertNo...

RxJava Observable.fromEmitter奇数个反压行为【代码】

我一直在使用Observable.fromEmitter()作为Observable.create()的绝佳替代品.我最近遇到了一些奇怪的行为,我无法完全弄清楚为什么会这样.我真的很感谢一个对背压有一定了解的人,以及调度程序来看看这一点.public final class EmitterTest {public static void main(String[] args) {Observable<Integer> obs = Observable.fromEmitter(emitter -> {for (int i = 1; i < 1000; i++) {if (i % 5 == 0) {sleep(300L);}emitter.onNext(...

android-RxJava / RxBinding-检查订阅是否存在【代码】

我正在使用RxBinding并在onBindViewHolder方法的RecyclerView适配器中创建订阅,该订阅可重用项目.有没有简单的方法来检查是否已将订户分配给EditText对象,如果已删除,则删除该订户? 我的代码看起来像这样public void onBindViewHolder(final ItemViewHolder holder, int position) {holder.text.setText(mProvider.get(position).text);Subscription textSub = RxTextView.textChanges(holder.text).subscribe(new Action1<CharSe...

android-使用RxJava批处理插入【代码】

我需要从JSon供稿加载数据(最多22,000条记录),并将它们存储在我的android设备上(在SQLite中). 进行单个插入很容易实现,但速度很慢.理想情况下,我想对对象进行批处理以插入并将这些对象的列表传递给数据库. 我可以看到必须执行的操作,但是我真的很想使用RxJava进行操作,但不确定如何执行. 谢谢解决方法:您可以使用buffer运算符来累积对象. 例:Observable<String> stringObservable = Observable.create(new Observable.OnSubscribe...

RxJava主题和错误处理【代码】

我正在尝试实现类似于事件总线的行为.对于我的要求,似乎可以使用PublishSubject. 主题发出表示某些全局操作结果的项目,这些操作可能成功解决,或者在出现异常的情况下失败.在发生错误的情况下,我无法将onNext()用于成功事件,将onError()与Throwable一起使用,因为一旦调用onError(),该主题就会终止,并且除onError()之外,任何将来的订阅者都不会得到发射. 现在,按照我的观察方式,我必须创建一个表示事件的类,并在发生错误时有选择地引...

使用多线程RxJava进行反应式拉取【代码】

我正在尝试在RxJava中构建反应式拉式观察器. 我的观察者是这样的:Observable<Command> myObs = Observable.create(s -> {Command command;int i = 0;do {command = NetworkOperation1.call(i);logger.info("Init command " + i);s.onNext(command);i++;} while (!command.isLast() && i < MAX);s.onCompleted(); });我想以4个并发批处理(缓冲区)进行处理,如下所示:myObs.buffer(10).flatMap(batch -> {return Observable.from(ba...

RxJava,获取第一项(如果存在)【代码】

我正在尝试使用RxJava从列表中获取第一项.但是,如果项目不存在,我不希望它引发错误.相反,我希望能够通过提供默认项来自己处理. 我在下面创建的代码可以正确地检索列表中的第一项.虽然我不知道如何将.exists()合并到其中.api.getLibraryEntries(username).observeOn(AndroidSchedulers.mainThread()).flatMap(new Func1<List<Entry>, Observable<Entry>>() {@Overridepublic Observable<Entry> call(List<Entry> Entries) {return O...

如何在RXJava Android中对后台线程执行长计算【代码】

我想在android中使用RXJava在后台线程上执行长计算.计算后,我试图在Recylerview中呈现结果.我正在使用以下代码:Observable.just("true").subscribeOn(Schedulers.io()).map(new Func1<String, String>() {@Overridepublic String call(String s) {feedlist.clear();if (eventFeedItems != null && !eventFeedItems.isEmpty()) {for (int i = 0; i < eventFeedItems.size(); i++) {if (eventFeedItems != null && eventFeedItems.g...

如何使用RxJava重复网络通话【代码】

我有一个API,可让我使用如下所示的ID来检索商品:http://myapi.com/api/v1/item/1最后一个值是项目的ID.很好,很花哨,我可以编写一个Retrofit服务接口,并调用如下所示的项:MyService service = MyService.retrofit.create(MyService.class);service.itemById(1).observeOn(AndroidSchedulers.mainThread()).subscribeOn(Schedulers.io()).subscribe(new Subscriber<Item>() {@Overridepublic void onCompleted() {}@Overridepublic...

android-RxJava Thread.sleep在另一个线程【代码】

如何在Android中每隔一秒钟使用RxJava更新UI?我正在尝试做这样的事情:for (int i = 0; i <10 ; i++) { //testrx.Observable.just(getSleep()).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(v->updateTime());//update textView}private <T> int getSleep() {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}return 0;}但是Thread.sleep()在ui线程...