首页 / JAVA / 什么是响应式编程,Java 如何实现
什么是响应式编程,Java 如何实现
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了什么是响应式编程,Java 如何实现,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含4325字,纯文字阅读大概需要7分钟。
内容图文
![什么是响应式编程,Java 如何实现](/upload/InfoBanner/zyjiaocheng/607/fd717f9e9e064f00bab198beb299e7ad.jpg)
我们这里用通过唯一 id 获取知乎的某个回答作为例子,首先我们先明确下,一次HTTP请求到服务器上处理完之后,将响应写回这次请求的连接,就是完成这次请求了,如下:
public void request(Connection connection, HttpRequest request) {
//处理request,省略代码
connection.write(response);//完成响应
}
假设获取回答需要调用两个接口,获取评论数量还有获取回答信息,传统的代码可能会这么去写:
//获取评论数量
public void getCommentCount(Connection connection, HttpRequest request) {
Integer commentCount = null;
try {
//从缓存获取评论数量,阻塞IO
commentCount = getCommnetCountFromCache(id);
} catch(Exception e) {
try {
//缓存获取失败就从数据库中获取,阻塞IO
commentCount = getVoteCountFromDB(id);
} catch(Exception ex) {
}
}
connection.write(commentCount);
}
//获取回答
public void getAnswer(Connection connection, HttpRequest request) {
//获取点赞数量
Integer voteCount = null;
try {
//从缓存获取点赞数量,阻塞IO
voteCount = getVoteCountFromCache(id);
} catch(Exception e) {
try {
//缓存获取失败就从数据库中获取,阻塞IO
voteCount = getVoteCountFromDB(id);
} catch(Exception ex) {
}
}
//从数据库获取回答信息,阻塞IO
Answer answer = getAnswerFromDB(id);
//拼装Response
ResultVO response = new ResultVO();
if (voteCount != null) {
response.setVoteCount(voteCount);
}
if (answer != null) {
response.setAnswer(answer);
}
connection.write(response);//完成响应
}
在这种实现下,你的进程只需要一个线程池,承载了所有请求。这种实现下,有两个弊端:
- 线程池 IO 阻塞,导致某个存储变慢或者缓存击穿的话,所有服务都堵住了。假设现在评论缓存突然挂了,全都访问数据库,导致请求变慢。由于线程需要等待 IO 响应,导致唯一一个线程池被堆满,无法处理获取回答的请求。
- 对于获取回答信息,获取点赞数量其实和获取回答信息是可以并发进行的。不用非得先获取点赞数量之后再获取回答信息。
现在,NIO 非阻塞 IO 很普及了,有了非阻塞 IO,我们可以通过响应式编程,来让我们的线程不会阻塞,而是一直在处理请求。这是如何实现的呢?
传统的 BIO,是线程将数据写入 Connection 之后,当前线程进入 Block 状态,直到响应返回,之后接着做响应返回后的动作。NIO 则是线程将数据写入 Connection 之后,将响应返回后需要做的事情以及参数缓存到一个地方之后,直接返回。在有响应返回后,NIO 的 Selector 的 Read 事件会是 Ready 状态,扫描 Selector 事件的线程,会告诉你的线程池数据好了,然后线程池中的某个线程,拿出刚刚缓存的要做的事情还有参数,继续处理。
那么,怎样实现缓存响应返回后需要做的事情以及参数的呢?Java 本身提供了两种接口,一个是基于回调的 Callback 接口(Java 8 引入的各种Functional Interface),一种是 Future 框架。
基于 Callback 的实现:
//获取回答
public void getAnswer(Connection connection, HttpRequest request) {
ResultVO resultVO = new ResultVO();
getVoteCountFromCache(id, (count, throwable) -> {
//异常不为null则为获取失败
if (throwable != null) {
//读取缓存失败就从数据库获取
getVoteCountFromDB(id, (count2, throwable2) -> {
if (throwable2 == null) {
resultVO.setVoteCount(voteCount);
}
//从数据库读取回答信息
getAnswerFromDB(id, (answer, throwable3) -> {
if (throwable3 == null) {
resultVO.setAnswer(answer);
connection.write(resultVO);
} else {
connection.write(throwable3);
}
});
});
} else {
//获取成功,设置voteCount
resultVO.setVoteCount(voteCount);
//从数据库读取回答信息
getAnswerFromDB(id, (answer, throwable2) -> {
if (throwable2 == null) {
resultVO.setAnswer(answer);
//返回响应
connection.write(resultVO);
} else {
//返回错误响应
connection.write(throwable2);
}
});
}
});
}
可以看出,随着调用层级的加深,callback 层级越来越深,越来越难写,而且啰嗦的代码很多。并且,基于 CallBack 想实现获取点赞数量其实和获取回答信息并发是很难写的,这里还是先获取点赞数量之后再获取回答信息。
那么基于 Future 呢?我们用 Java 8 之后引入的 CompletableFuture
来试着实现下。
//获取回答
public void getAnswer(Connection connection, HttpRequest request) {
ResultVO resultVO = new ResultVO();
//所有的异步任务都执行完之后要做的事情
CompletableFuture.allOf(
getVoteCountFromCache(id)
//发生异常,从数据库读取
.exceptionallyComposeAsync(throwable -> getVoteCountFromDB(id))
//读取完之后,设置VoteCount
.thenAccept(voteCount -> {
resultVO.setVoteCount(voteCount);
}),
getAnswerFromDB(id).thenAccept(answer -> {
resultVO.setAnswer(answer);
})
).exceptionallyAsync(throwable -> {
connection.write(throwable);
}).thenRun(() -> {
connection.write(resultVO);
});
}
这种实现就看上去简单多了,并且读取点赞数量还有读取回答内容是同时进行的。
Project Reactor 在 Completableuture 这种实现的基础上,增加了更多的组合方式以及更完善的异常处理机制,以及面对背压时候的处理机制,还有重试机制。
每日一刷,轻松提升技术,斩获各种offer:
内容总结
以上是互联网集市为您收集整理的什么是响应式编程,Java 如何实现全部内容,希望文章能够帮你解决什么是响应式编程,Java 如何实现所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。