《Scalable IO in Java》笔记
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了《Scalable IO in Java》笔记,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含5437字,纯文字阅读大概需要8分钟。
内容图文
Scalable IO in Java
http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
基本上所有的网络处理程序都有以下基本的处理过程:
Read request
Decode request
Process service
Encode reply
Send reply
Classic Service Designs
简单的代码实现:
class Server implements Runnable { publicvoid run() { try { ServerSocket ss = new ServerSocket(PORT); while (!Thread.interrupted()) new Thread(new Handler(ss.accept())).start(); //创建新线程来handle // or, single-threaded, or a thread pool } catch (IOException ex) { /* ... */ } } staticclass Handler implements Runnable { final Socket socket; Handler(Socket s) { socket = s; } publicvoid run() { try { byte[] input = newbyte[MAX_INPUT]; socket.getInputStream().read(input); byte[] output = process(input); socket.getOutputStream().write(output); } catch (IOException ex) { /* ... */ } } privatebyte[] process(byte[] cmd) { /* ... */ } } }
对于每一个请求都分发给一个线程,每个线程中都独自处理上面的流程。
这种模型由于IO在阻塞时会一直等待,因此在用户负载增加时,性能下降的非常快。
server导致阻塞的原因:
1、serversocket的accept方法,阻塞等待client连接,直到client连接成功。
2、线程从socket inputstream读入数据,会进入阻塞状态,直到全部数据读完。
3、线程向socket outputstream写入数据,会阻塞直到全部数据写完。
client导致阻塞的原因:
1、client建立连接时会阻塞,直到连接成功。
2、线程从socket输入流读入数据,如果没有足够数据读完会进入阻塞状态,直到有数据或者读到输入流末尾。
3、线程从socket输出流写入数据,直到输出所有数据。
4、socket.setsolinger()设置socket的延迟时间,当socket关闭时,会进入阻塞状态,直到全部数据都发送完或者超时。
改进:采用基于事件驱动的设计,当有事件触发时,才会调用处理器进行数据处理。
Basic Reactor Design
代码实现:
class Reactor implements Runnable { final Selector selector; final ServerSocketChannel serverSocket; Reactor(int port) throws IOException { //Reactor初始化 selector = Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(port)); serverSocket.configureBlocking(false); //非阻塞 SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); //分步处理,第一步,接收accept事件 sk.attach(new Acceptor()); //attach callback object, Acceptor } publicvoid run() { try { while (!Thread.interrupted()) { selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) dispatch((SelectionKey)(it.next()); //Reactor负责dispatch收到的事件 selected.clear(); } } catch (IOException ex) { /* ... */ } } void dispatch(SelectionKey k) { Runnable r = (Runnable)(k.attachment()); //调用之前注册的callback对象if (r != null) r.run(); } class Acceptor implements Runnable { // innerpublicvoid run() { try { SocketChannel c = serverSocket.accept(); if (c != null) new Handler(selector, c); } catch(IOException ex) { /* ... */ } } } } finalclass Handler implements Runnable { final SocketChannel socket; final SelectionKey sk; ByteBuffer input = ByteBuffer.allocate(MAXIN); ByteBuffer output = ByteBuffer.allocate(MAXOUT); staticfinalint READING = 0, SENDING = 1; int state = READING; Handler(Selector sel, SocketChannel c) throws IOException { socket = c; c.configureBlocking(false); // Optionally try first read now sk = socket.register(sel, 0); sk.attach(this); //将Handler作为callback对象 sk.interestOps(SelectionKey.OP_READ); //第二步,接收Read事件 sel.wakeup(); } boolean inputIsComplete() { /* ... */ } boolean outputIsComplete() { /* ... */ } void process() { /* ... */ } publicvoid run() { try { if (state == READING) read(); elseif (state == SENDING) send(); } catch (IOException ex) { /* ... */ } } void read() throws IOException { socket.read(input); if (inputIsComplete()) { process(); state = SENDING; // Normally also do first write now sk.interestOps(SelectionKey.OP_WRITE); //第三步,接收write事件 } } void send() throws IOException { socket.write(output); if (outputIsComplete()) sk.cancel(); //write完就结束了, 关闭select key } } //上面 的实现用Handler来同时处理Read和Write事件, 所以里面出现状态判断 //我们可以用State-Object pattern来更优雅的实现class Handler { // ...publicvoid run() { // initial state is reader socket.read(input); if (inputIsComplete()) { process(); sk.attach(new Sender()); //状态迁移, Read后变成write, 用Sender作为新的callback对象 sk.interest(SelectionKey.OP_WRITE); sk.selector().wakeup(); } } class Sender implements Runnable { publicvoid run(){ // ... socket.write(output); if (outputIsComplete()) sk.cancel(); } } }
这里用到了Reactor模式。
关于Reactor模式的一些概念:
Reactor:负责响应IO事件,当检测到一个新的事件,将其发送给相应的Handler去处理。
Handler:负责处理非阻塞的行为,标识系统管理的资源;同时将handler与事件绑定。
Reactor为单个线程,需要处理accept连接,同时发送请求到处理器中。
由于只有单个线程,所以处理器中的业务需要能够快速处理完。
改进:使用多线程处理业务逻辑。
Worker Thread Pools
参考代码:
class Handler implements Runnable { // uses util.concurrent thread poolstatic PooledExecutor pool = new PooledExecutor(...); staticfinalint PROCESSING = 3; // ...synchronizedvoid read() { // ... socket.read(input); if (inputIsComplete()) { state = PROCESSING; pool.execute(new Processer()); //使用线程pool异步执行 } } synchronizedvoid processAndHandOff() { process(); state = SENDING; // or rebind attachment sk.interest(SelectionKey.OP_WRITE); //process完,开始等待write事件 } class Processer implements Runnable { publicvoid run() { processAndHandOff(); } } }
将处理器的执行放入线程池,多线程进行业务处理。但Reactor仍为单个线程。
继续改进:对于多个CPU的机器,为充分利用系统资源,将Reactor拆分为两部分。
Using Multiple Reactors
参考代码:
Selector[] selectors; //subReactors集合, 一个selector代表一个subReactorint next = 0; class Acceptor { // ...publicsynchronizedvoid run() { ... Socket connection = serverSocket.accept(); //主selector负责acceptif (connection != null) new Handler(selectors[next], connection); //选个subReactor去负责接收到的connectionif (++next == selectors.length) next = 0; } }
mainReactor负责监听连接,accept连接给subReactor处理,为什么要单独分一个Reactor来处理监听呢?因为像TCP这样需要经过3次握手才能建立连接,这个建立连接的过程也是要耗时间和资源的,单独分一个Reactor来处理,可以提高性能。
参考:
http://www.cnblogs.com/fxjwind/p/3363329.html
原文:http://www.cnblogs.com/luxiaoxun/p/4331110.html
内容总结
以上是互联网集市为您收集整理的《Scalable IO in Java》笔记全部内容,希望文章能够帮你解决《Scalable IO in Java》笔记所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。