netty源码解解析(4.0)-14 Channel NIO实现:读取数据
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了netty源码解解析(4.0)-14 Channel NIO实现:读取数据,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含10802字,纯文字阅读大概需要16分钟。
内容图文
![netty源码解解析(4.0)-14 Channel NIO实现:读取数据](/upload/InfoBanner/zyjiaocheng/1113/68efc1b099c443b19194ff89bd94e3f5.jpg)
本章分析Nio Channel的数据读取功能的实现。
Channel读取数据需要Channel和ChannelHandler配合使用,netty设计数据读取功能包括三个要素:Channel, EventLoop和ChannelHandler。Channel有个read方法,这个方法不会直接读取数据,它的作用是通知持有当前channel的eventLoop可以从这个这个channel读取数据了,这个方法被调用之后eventLoop会在channel有数据可读的时候从channel读出数据然后把数据放在channelRead事件中交给ChannelInboundHandler的channelRead方法处理,当eventLoop发现channel中暂时没时间可读会触发一个channelReadComplete事件。
read: Nio Channel通知eventLoop开始读数据
channel read方法的调用栈:
1 io.netty.channel.AbstractChannel#read 2 io.netty.channel.DefaultChannelPipeline#read 3 io.netty.channel.AbstractChannelHandlerContext#read 4 io.netty.channel.AbstractChannelHandlerContext#invokeRead 5 io.netty.channel.DefaultChannelPipeline.HeadContext#read 6 io.netty.channel.AbstractChannel.AbstractUnsafe#beginRead 7 io.netty.channel.nio.AbstractNioChannel#doBeginRead
调用channel的read的方法,会触发read事件,通过pipeline调用AbstractChannel unsafe的beginRead方法,这个方法的语义是通知eventLoop可以从channel读数据了,但他没有实现具体功能,把具体功能留给doBeginRead实现。doBeginRead在AbstractChannel中定义,它是一个抽象方法。AbstractNioChannel实现了这个方法:
1 @Override 2 protected void doBeginRead() throws Exception { 3// Channel.read() or ChannelHandlerContext.read() was called 4if (inputShutdown) { 5return; 6 } 7 8final SelectionKey selectionKey = this.selectionKey; 9if (!selectionKey.isValid()) { 10return; 11 } 1213 readPending = true; 1415finalint interestOps = selectionKey.interestOps(); 16if ((interestOps & readInterestOp) == 0) { 17selectionKey.interestOps(interestOps | readInterestOp); 18 } 19 }
这里的doBeginRead实现,只有第17行是核心代码:把readInterestOps保存是的read操作标志添加到SelectableChannel的SelectionKey中。这里的readInterestOps是一个类的属性,在AbstractNioChannel中,它没有明确的定义,只有一个抽象的定义:NIO中的一个可以可以当成read操作的的标志。在NIO中可以当成read的有SelectionKey.OP_READ和SelectionKey.OP_ACCEPT。readInterestOps在AbstractNioChannel的构造方法中使用传入的参数初始化,子类就可以根据需要确定interestOps的具体含义。
设置好beginRead之后,NioEventLoop就可以使用Selector得到检测到channel上的read事件了,下面是NioEventLoop中处理read事件的代码:
1 // io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel) 2 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { 3 unsafe.read(); 4 }
这里调用了unsafe的read的方法,在Channel的Unsafe中并没有定义这个方法,它在io.netty.channel.nio.AbstractNioChannel.NioUnsafe中定义,在io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe和io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe中有两个不同的实现。这两个实现的区别是:NioMessageUnsafe.read是把从channel中读出的数据转换成Object, NioByteUnsafe.read是从channel中读出byte数据流。下面来详解分析这两种实现。
AbstractNioChannel.NioUnsafe.read实现:从channel读取数据
netty在NIO Channel的设计上,把读数据设计成独立的抽象层。之所以这样设计有两个方面的原因:
- 在NIO中,三中不同类型的Channel读取的数据类型是不一样的,NioServerSocketChannel读出的是一个新建的NioSockeChannel, NioSocketChannel读出的byte数据流,NioDatagramChannel读出是数据报。
- NIO三种Channel都运行在非阻塞模式下,相比于阻塞模式,非阻塞模式下读数据要处理的问题要复杂的多。使用Selector和非阻塞模式被动地读取数据,需要处理连接断开和socket异常,由于Selector使用的是边缘触发模式,一次read调用务必要把已经在socket recvbuffer中的数据全部读出来,否则可以导致数据丢失或数据接收不及时。把read独立出来处理读取数据的复杂性,代码结构会比较清晰。
接下来开始详细分析NioUnsafe read方法的两种不同的实现。
io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe.read实现: 从channel中读出Object
这个实现是主要功能是调用doReadMessages方法,从channel中读出Object消息,具体的类型这里没有限制,doReadMessages是一个抽象方法,留给子类实现, 下面是read方法的实现:
1 // io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe 2 @Override 3 public void read() { 4 assert eventLoop().inEventLoop(); 5 final ChannelConfig config = config(); 6if (!config.isAutoRead() && !isReadPending()) { 7// ChannelConfig.setAutoRead(false) was called in the meantime 8 removeReadOp(); 9return; 10 } 1112finalint maxMessagesPerRead = config.getMaxMessagesPerRead(); 13final ChannelPipeline pipeline = pipeline(); 14boolean closed = false; 15 Throwable exception = null; 16try { 17try { 18for (;;) { 19int localRead = doReadMessages(readBuf); 20if (localRead == 0) { 21break; 22 } 23if (localRead < 0) { 24 closed = true; 25break; 26 } 2728// stop reading and remove op29if (!config.isAutoRead()) { 30break; 31 } 3233if (readBuf.size() >= maxMessagesPerRead) { 34break; 35 } 36 } 37 } catch (Throwable t) { 38 exception = t; 39 } 40 setReadPending(false); 41int size = readBuf.size(); 42for (int i = 0; i < size; i ++) { 43 pipeline.fireChannelRead(readBuf.get(i)); 44 } 4546 readBuf.clear(); 47 pipeline.fireChannelReadComplete(); 4849if (exception != null) { 50 closed = closeOnReadError(exception); 5152 pipeline.fireExceptionCaught(exception); 53 } 5455if (closed) { 56if (isOpen()) { 57 close(voidPromise()); 58 } 59 } 60 } finally { 61// Check if there is a readPending which was not processed yet. 62// This could be for two reasons: 63// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method 64// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method 65//66// See https://github.com/netty/netty/issues/225467if (!config.isAutoRead() && !isReadPending()) { 68 removeReadOp(); 69 } 70 } 71 }
第12行,得到一次循环读取消息的最大数量maxMessagesPerRead,这个配置的默认值因不同的channel类型而不同,io.netty.channel.ChannelConfig提供了setMaxMessagesPerRead方法设置这个配置的值。调节这个值的大小可以影响I/O操作在eventLoop线程分配的执行时间,它的值越大,I/O操作站的时间越大。
18-36行,使用doReadMessages读取消息,并把消息放到readBuf中,readBuf是List<Object>类型。20,21行,没有可读的数据结束循环。23-25行,socket已经关闭。33,34行,readBuf中的消息数量已经超过限制,跳出循环。
41-47行,对readBuf中的每一个消息触发一次channelRead事件,然后清空readBuf, 触发channelReadComplete事件。
49-53行,处理异常。
55-59行,处理channel正常关闭。
doReadMessages方法有两个实现。一个是io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages,这个实现中读出的消息是NioSocketChannel。另一个是io.netty.channel.socket.nio.NioDatagramChannel#doReadMessages,这个实现中读出的消息时DatagramPacket。
io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages实现代码:
1 @Override 2 protected int doReadMessages(List<Object> buf) throws Exception { 3 SocketChannel ch = SocketUtils.accept(javaChannel()); 4 5try { 6if (ch != null) { 7 buf.add(new NioSocketChannel(this, ch)); 8return 1; 9 } 10 } catch (Throwable t) { 11 logger.warn("Failed to create a new channel from an accepted socket.", t); 1213try { 14 ch.close(); 15 } catch (Throwable t2) { 16 logger.warn("Failed to close a socket.", t2); 17 } 18 } 1920return 0; 21 }
第3行, 使用accept方法得到一个新的SocketChannel。
7,8行,使用新的SocketChannel创建NioSocketChannel,并把它放到buf中。
11-20行,出现异常,关闭这个socket, 最后返回0.
io.netty.channel.socket.nio.NioDatagramChannel#doReadMessages实现代码:
1 @Override 2 protected int doReadMessages(List<Object> buf) throws Exception { 3 DatagramChannel ch = javaChannel(); 4 DatagramChannelConfig config = config(); 5 RecvByteBufAllocator.Handle allocHandle = this.allocHandle; 6if (allocHandle == null) { 7this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); 8 } 9 ByteBuf data = allocHandle.allocate(config.getAllocator()); 10boolean free = true; 11try { 12 ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes()); 13int pos = nioData.position(); 14 InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(nioData); 15if (remoteAddress == null) { 16return 0; 17 } 1819int readBytes = nioData.position() - pos; 20 data.writerIndex(data.writerIndex() + readBytes); 21 allocHandle.record(readBytes); 2223 buf.add(new DatagramPacket(data, localAddress(), remoteAddress)); 24 free = false; 25return 1; 26 } catch (Throwable cause) { 27 PlatformDependent.throwException(cause); 28return -1; 29 } finally { 30if (free) { 31 data.release(); 32 } 33 } 34 }
4-12行,得到接收数据的缓冲区data。
13-21行,从socket收到一个数据包,这个数据报包含两部分: data中的二进制数据和发送端的地址remoteAddress(第14行)。然后设置data中的数据长度。
23-25行,把数据报转换成DatagramPacket类型放到buf中返回。
io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read实现:从channel中读byte流
这个实现的主要功能是调用doReadBytes读取byte流。doReadBytes是一个抽象方法,留给子类实现。下面是这个read的实现。
1 @Override 2 public final void read() { 3 final ChannelConfig config = config(); 4if (!config.isAutoRead() && !isReadPending()) { 5// ChannelConfig.setAutoRead(false) was called in the meantime 6 removeReadOp(); 7return; 8 } 910final ChannelPipeline pipeline = pipeline(); 11final ByteBufAllocator allocator = config.getAllocator(); 12finalint maxMessagesPerRead = config.getMaxMessagesPerRead(); 13 RecvByteBufAllocator.Handle allocHandle = this.allocHandle; 14if (allocHandle == null) { 15this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); 16 } 1718 ByteBuf byteBuf = null; 19int messages = 0; 20boolean close = false; 21try { 22int totalReadAmount = 0; 23boolean readPendingReset = false; 24do { 25 byteBuf = allocHandle.allocate(allocator); 26int writable = byteBuf.writableBytes(); 27int localReadAmount = doReadBytes(byteBuf); 28if (localReadAmount <= 0) { 29// not was read release the buffer30 byteBuf.release(); 31 byteBuf = null; 32 close = localReadAmount < 0; 33if (close) { 34// There is nothing left to read as we received an EOF.35 setReadPending(false); 36 } 37break; 38 } 39if (!readPendingReset) { 40 readPendingReset = true; 41 setReadPending(false); 42 } 43 pipeline.fireChannelRead(byteBuf); 44 byteBuf = null; 4546if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { 47// Avoid overflow.48 totalReadAmount = Integer.MAX_VALUE; 49break; 50 } 5152 totalReadAmount += localReadAmount; 5354// stop reading55if (!config.isAutoRead()) { 56break; 57 } 5859if (localReadAmount < writable) { 60// Read less than what the buffer can hold, 61// which might mean we drained the recv buffer completely.62break; 63 } 64 } while (++ messages < maxMessagesPerRead); 6566 pipeline.fireChannelReadComplete(); 67 allocHandle.record(totalReadAmount); 6869if (close) { 70 closeOnRead(pipeline); 71 close = false; 72 } 73 } catch (Throwable t) { 74 handleReadException(pipeline, byteBuf, t, close); 75 } finally { 76// Check if there is a readPending which was not processed yet. 77// This could be for two reasons: 78// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method 79// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method 80//81// See https://github.com/netty/netty/issues/225482if (!config.isAutoRead() && !isReadPending()) { 83 removeReadOp(); 84 } 85 } 86 }
10-16行,得到一个接受缓冲区的分配器和分配器的的专用handle。这两个东西的功能是高效的创建大量的接接收数据缓冲区,具体原理和实现会在后面buffer相关章节中详细分析,这里暂时略过。
24-64行,这是一个使用doReadBytes读取数据并触发channelRead事件的循环。25-27行,得到一个接受数据的缓冲区,然后从socket中读取数据。28-38行,没有数据可读了,或socket已经断开了。43行,正确收到了数据,触发channelRead事件。59-62行,读出的数据小于缓冲区的长度,表示没有socket中暂时没有数据可读了。 64行,读取次数大于上限配置,跳出。
66行,读循环完成,触发channelReadComplete事件。
69-72, 处理socket正常关闭。
74,83行,处理其他异常。
doReadBytes只有一个实现:
// io.netty.channel.socket.nio.NioSocketChannel#doWriteBytes @Override protected int doWriteBytes(ByteBuf buf) throws Exception { finalint expectedWrittenBytes = buf.readableBytes(); return buf.readBytes(javaChannel(), expectedWrittenBytes); }
这个实现非常简单,使用ByteBuf的能力从SocketChannel中读取byte流。
原文:https://www.cnblogs.com/brandonli/p/10278285.html
内容总结
以上是互联网集市为您收集整理的netty源码解解析(4.0)-14 Channel NIO实现:读取数据全部内容,希望文章能够帮你解决netty源码解解析(4.0)-14 Channel NIO实现:读取数据所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。