首页 / JAVA / 在rx-java中的套接字看门狗
在rx-java中的套接字看门狗
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了在rx-java中的套接字看门狗,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含3005字,纯文字阅读大概需要5分钟。
内容图文
![在rx-java中的套接字看门狗](/upload/InfoBanner/zyjiaocheng/817/cd7e40d0e98b4cefa028f2c507c15ca1.jpg)
我目前正在努力尝试使用rx实现tcp看门狗/重试系统,您的帮助将不胜感激.
有一个Observable,我想通过定期检查我们是否仍然可以写入套接字来获得Observable.很简单,我可以做这样的事情:
class SocketSubscribeFunc implements Observable.OnSubscribeFunc<Socket> {
private final String hostname;
private final int port;
private Socket socket;
SocketSubscribeFunc(String hostname, int port) {
this.hostname = hostname;
this.port = port;
}
public Subscription onSubscribe(final Observer<? super Socket> observer) {
try {
log.debug("Trying to connect...");
socket = new Socket(hostname, port);
observer.onNext(socket);
} catch (IOException e) {
observer.onError(e);
}
return new Subscription() {
public void unsubscribe() {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
};
}
}
Observable<Socket> socketObservable = Observable.create(new SocketSubscribeFunc(hostname,port));
Observable<Boolean> watchdog = Observable.combineLatest(socketObservable, Observable.interval(1, TimeUnit.SECONDS), new Func2<Socket, Long, Boolean>() {
public Boolean call(final Socket socket, final Long aLong) {
try {
socket.getOutputStream().write("ping\n".getBytes());
return true;
} catch (IOException e) {
return false;
}
}
});
现在,我想重新连接如果可以获取套接字(服务器/链接在创建时关闭)或变得不可写(连接成功后服务器/链接无法访问).
理想情况下,通过重新订阅套接字Observable,其OnSubscribeFunc使用重试运算符创建连接.
正如您所看到的,这将在套接字和监视器Observables之间引入循环依赖关系.
我用switchMap / materialize玩了一会儿……为了传播最终的错误无济于事.
我接近放弃这个想法并使用副作用代码中的主题.但是在全球范围内应该有一个更好的方式:)
提前致谢!
解决方法:
首先,我会避免Observable.create大部分时间,因为它通常是不需要的,并引入了不必要的复杂性.在这种情况下,Rx有一个名为using的运算符,它允许您创建一个在Observable的生命周期中存在的资源对象.它会自动捕获运行时错误,并提供一个dispose操作,因此这对于此用例中的套接字来说是完美的.我正在使用Java8 lambdas,因为它们更容易伪代码.
Observable.using(
// Resource (socket) factory
() -> {
try {
return new Socket(hostname, port);
} catch (IOException e) {
// Rx will propagate this as an one rror event.
throw new RuntimeException(e);
}
},
// Observable factory
(socket) -> {
return Observable.interval(1, TimeUnit.SECONDS)
.map((unusedTick) {
try {
socket.getOutputStream().write("ping\n".getBytes());
return true;
} catch (IOException e) {
throw new RuntimeException(e);
}
})
// Retry the inner job up to 3 times before propagating.
.retry(3);
},
// Dispose action for socket.
// In real life the close probably needs a try/catch.
(socket) -> socket.close())
// Retry the outer job up to 3 times.
.retry(3)
// If we propagate all errors, emit a 'false', signaling service is not available.
.onErrorResumeNext(Observable.just(false));
请注意,如果内部作业传播(在3次失败之后),这将重试外部作业.为了解决这个问题,您应该使用谓词和retryWhen检查重试时的文档.如果这不是内部作业传播的类型,则可以抛出特殊的RuntimeException并仅重试外部作业.
内容总结
以上是互联网集市为您收集整理的在rx-java中的套接字看门狗全部内容,希望文章能够帮你解决在rx-java中的套接字看门狗所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。