前言:
此时同学们对“netty接收数据不完整”大致比较关切,我们都需要了解一些“netty接收数据不完整”的相关内容。那么小编在网摘上收集了一些关于“netty接收数据不完整””的相关内容,希望同学们能喜欢,姐妹们一起来学习一下吧!本系列Netty源码解析文章基于 4.1.56.Final版本
本文概览.png
前文回顾
在前边的系列文章中,我们从内核如何收发网络数据开始以一个C10K的问题作为主线详细从内核角度阐述了网络IO模型的演变,最终在此基础上引出了Netty的网络IO模型如下图所示:
netty中的reactor.png
详细内容可回看?《从内核角度看IO模型的演变》
后续我们又围绕着Netty的主从Reactor网络IO线程模型,在?《Reactor模型在Netty中的实现》一文中详细阐述了Netty的主从Reactor模型的创建,以及介绍了Reactor模型的关键组件。搭建了Netty的核心骨架如下图所示:
主从Reactor线程组.png
在核心骨架搭建完毕之后,我们随后又在?《详细图解Reactor启动全流程》一文中阐述了Reactor启动的全流程,一个非常重要的核心组件NioServerSocketChannel开始在这里初次亮相,承担着一个网络框架最重要的任务--高效接收网络连接。我们介绍了NioServerSocketChannel的创建,初始化,向Main Reactor注册并监听OP_ACCEPT事件的整个流程。在此基础上,Netty得以整装待发,枕戈待旦开始迎接海量的客户端连接。
Reactor启动后的结构.png
随后紧接着我们在?《Netty如何高效接收网络连接》一文中详细介绍了Netty高效接收客户端网络连接的全流程,在这里Netty的核心重要组件NioServerSocketChannel开始正是登场,在NioServerSocketChannel中我们创建了客户端连接NioSocketChannel,并详细介绍了NioSocketChannel的初始化过程,随后通过在NioServerSocketChannel的pipeline中触发ChannelRead事件,并最终在ServerBootstrapAcceptor中将客户端连接NioSocketChannel注册到Sub Reactor中开始监听客户端连接上的OP_READ事件,准备接收客户端发送的网络数据也就是本文的主题内容。
image.png
自此Netty的核心组件全部就绪并启动完毕,开始起飞~~~
主从Reactor组完整结构.png
之前文章中的主角是Netty中主Reactor组中的Main Reactor以及注册在Main Reactor上边的NioServerSocketChannel,那么从本文开始,我们文章中的主角就切换为Sub Reactor以及注册在SubReactor上的NioSocketChannel了。
下面就让我们正式进入今天的主题,看一下Netty是如何处理OP_READ事件以及如何高效接收网络数据的。
1. Sub Reactor处理OP_READ事件流程总览
OP_READ事件处理.png
客户端发起系统IO调用向服务端发送数据之后,当网络数据到达服务端的网卡并经过内核协议栈的处理,最终数据到达Socket的接收缓冲区之后,Sub Reactor轮询到NioSocketChannel上的OP_READ事件就绪,随后Sub Reactor线程就会从JDK Selector上的阻塞轮询APIselector.select(timeoutMillis)调用中返回。转而去处理NioSocketChannel上的OP_READ事件。
注意这里的Reactor为负责处理客户端连接的Sub Reactor。连接的类型为NioSocketChannel,处理的事件为OP_READ事件。
在之前的文章中笔者已经多次强调过了,Reactor在处理Channel上的IO事件入口函数为NioEventLoop#processSelectedKey。
public final class NioEventLoop extends SingleThreadEventLoop { private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); ..............省略................. try { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_CONNECT) != 0) { ..............处理OP_CONNECT事件................. } if ((readyOps & SelectionKey.OP_WRITE) != 0) { ..............处理OP_WRITE事件................. } if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { //本文重点处理OP_ACCEPT事件 unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }}
这里需要重点强调的是,当前的执行线程现在已经变成了Sub Reactor,而Sub Reactor上注册的正是netty客户端NioSocketChannel负责处理连接上的读写事件。
所以这里入口函数的参数AbstractNioChannel ch则是IO就绪的客户端连接NioSocketChannel。
开头通过ch.unsafe()获取到的NioUnsafe操作类正是NioSocketChannel中对底层JDK NIO SocketChannel的Unsafe底层操作类。实现类型为NioByteUnsafe定义在下图继承结构中的AbstractNioByteChannel父类中。
image.png
下面我们到NioByteUnsafe#read方法中来看下Netty对OP_READ事件的具体处理过程:
2. Netty接收网络数据流程总览
我们直接按照老规矩,先从整体上把整个OP_READ事件的逻辑处理框架提取出来,让大家先总体俯视下流程全貌,然后在针对每个核心点位进行各个击破。
Netty接收网络数据流程.png
流程中相关置灰的步骤为Netty处理连接关闭时的逻辑,和本文主旨无关,我们这里暂时忽略,等后续笔者介绍连接关闭时,会单独开一篇文章详细为大家介绍。
从上面这张Netty接收网络数据总体流程图可以看出NioSocketChannel在接收网络数据的整个流程和我们在上篇文章?《Netty如何高效接收网络连接》中介绍的NioServerSocketChannel在接收客户端连接时的流程在总体框架上是一样的。
NioSocketChannel在接收网络数据的过程处理中,也是通过在一个do{....}while(...)循环read loop中不断的循环读取连接NioSocketChannel上的数据。
同样在NioSocketChannel读取连接数据的read loop中也是受最大读取次数的限制。默认配置最多只能读取16次,超过16次无论此时NioSocketChannel中是否还有数据可读都不能在进行读取了。
这里read loop循环最大读取次数可在启动配置类ServerBootstrap中通过ChannelOption.MAX_MESSAGES_PER_READ选项设置,默认为16。
ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.MAX_MESSAGES_PER_READ, 自定义次数)
**Netty这里为什么非得限制read loop的最大读取次数呢?**为什么不在read loop中一次性把数据读取完呢?
这时候就是考验我们大局观的时候了,在前边的文章介绍中我们提到Netty的IO模型为主从Reactor线程组模型,在Sub Reactor Group中包含了多个Sub Reactor专门用于监听处理客户端连接上的IO事件。
为了能够高效有序的处理全量客户端连接上的读写事件,Netty将服务端承载的全量客户端连接分摊到多个Sub Reactor中处理,同时也能保证Channel上IO处理的线程安全性。
其中一个Channel只能分配给一个固定的Reactor。一个Reactor负责处理多个Channel上的IO就绪事件,Reactor与Channel之间的对应关系如下图所示:
image.png
而一个Sub Reactor上注册了多个NioSocketChannel,Netty不可能在一个NioSocketChannel上无限制的处理下去,要将读取数据的机会均匀分摊给其他NioSocketChannel,所以需要限定每个NioSocketChannel上的最大读取次数。
此外,Sub Reactor除了需要监听处理所有注册在它上边的NioSocketChannel中的IO就绪事件之外,还需要腾出事件来处理有用户线程提交过来的异步任务。从这一点看,Netty也不会一直停留在NioSocketChannel的IO处理上。所以限制read loop的最大读取次数是非常必要的。
关于Reactor的整体运转架构,对细节部分感兴趣的同学可以回看下笔者的?《一文聊透Netty核心引擎Reactor的运转架构》这篇文章。
所以基于这个原因,我们需要在read loop循环中,每当通过doReadBytes方法从NioSocketChannel中读取到数据时(方法返回值会大于0,并记录在allocHandle.lastBytesRead中),都需要通过allocHandle.incMessagesRead(1)方法统计已经读取的次数。当达到16次时不管NioSocketChannel是否还有数据可读,都需要在read loop末尾退出循环。转去执行Sub Reactor上的异步任务。以及其他NioSocketChannel上的IO就绪事件。平均分配,雨露均沾!!
public abstract class MaxMessageHandle implements ExtendedHandle { //read loop总共读取了多少次 private int totalMessages; @Override public final void incMessagesRead(int amt) { totalMessages += amt; }}
本次read loop读取到的数据大小会记录在allocHandle.lastBytesRead中
public abstract class MaxMessageHandle implements ExtendedHandle { //本次read loop读取到的字节数 private int lastBytesRead; //整个read loop循环总共读取的字节数 private int totalBytesRead; @Override public void lastBytesRead(int bytes) { lastBytesRead = bytes; if (bytes > 0) { totalBytesRead += bytes; } }}lastBytesRead < 0:表示客户端主动发起了连接关闭流程,Netty开始连接关闭处理流程。这个和本文的主旨无关,我们先不用管。后面笔者会专门用一篇文章来详解关闭流程。lastBytesRead = 0:表示当前NioSocketChannel上的数据已经全部读取完毕,没有数据可读了。本次OP_READ事件圆满处理完毕,可以开开心心的退出read loop。当lastBytesRead > 0:表示在本次read loop中从NioSocketChannel中读取到了数据,会在NioSocketChannel的pipeline中触发ChannelRead事件。进而在pipeline中负责IO处理的ChannelHandelr中响应,处理网络请求。
fireChannelRread.png
public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { .......处理网络请求,比如解码,反序列化等操作....... }}
最后会在read loop循环的末尾调用allocHandle.continueReading()判断是否结束本次read loop循环。这里的结束循环条件的判断会比我们在介绍NioServerSocketChannel接收连接时的判断条件复杂很多,笔者会将这个判断条件的详细解析放在文章后面细节部分为大家解读,这里大家只需要把握总体核心流程,不需要关注太多细节。
总体上在NioSocketChannel中读取网络数据的read loop循环结束条件需要满足以下几点:
当前NioSocketChannel中的数据已经全部读取完毕,则退出循环。本轮read loop如果没有读到任何数据,则退出循环。read loop的读取次数达到16次,退出循环。
当满足这里的read loop退出条件之后,Sub Reactor线程就会退出循环,随后会调用allocHandle.readComplete()方法根据本轮read loop总共读取到的字节数totalBytesRead来决定是否对用于接收下一轮OP_READ事件数据的ByteBuffer进行扩容或者缩容。
最后在NioSocketChannel的pipeline中触发ChannelReadComplete事件,通知ChannelHandler本次OP_READ事件已经处理完毕。
fireChannelReadComplete.png
public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { .......处理网络请求,比如解码,反序列化等操作....... } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ......本次OP_READ事件处理完毕....... ......决定是否向客户端响应处理结果...... }}2.1 ChannelRead与ChannelReadComplete事件的区别
有些小伙伴可能对Netty中的一些传播事件触发的时机,或者事件之间的区别理解的不是很清楚,概念容易混淆。在后面的文章中笔者也会从源码的角度出发给大家说清楚Netty中定义的所有异步事件,以及这些事件之间的区别和联系和触发时机,传播机制。
这里我们主要探讨本文主题中涉及到的两个事件:ChannelRead事件与ChannelReadComplete事件。
从上述介绍的Netty接收网络数据流程总览中我们可以看出ChannelRead事件和ChannelReadComplete事件是不一样的,但是对于刚接触Netty的小伙伴来说从命名上乍一看感觉又差不多。
下面我们来看这两个事件之间的差别:
Netty服务端对于一次OP_READ事件的处理,会在一个do{}while()循环read loop中分多次从客户端NioSocketChannel中读取网络数据。每次读取我们分配的ByteBuffer容量大小,初始容量为2048。
ChanneRead事件:一次循环读取一次数据,就触发一次ChannelRead事件。本次最多读取在read loop循环开始分配的DirectByteBuffer容量大小。这个容量会动态调整,文章后续笔者会详细介绍。ChannelReadComplete事件:当读取不到数据或者不满足continueReading的任意一个条件就会退出read loop,这时就会触发ChannelReadComplete事件。表示本次OP_READ事件处理完毕。
这里需要特别注意下触发ChannelReadComplete事件并不代表NioSocketChannel中的数据已经读取完了,只能说明本次OP_READ事件处理完毕。因为有可能是客户端发送的数据太多,Netty读了16次还没读完,那就只能等到下次OP_READ事件到来的时候在进行读取了。
以上内容就是Netty在接收客户端发送网络数据的全部核心逻辑。目前为止我们还未涉及到这部分的主干核心源码,笔者想的是先给大家把核心逻辑讲解清楚之后,这样理解起来核心主干源码会更加清晰透彻。
经过前边对网络数据接收的核心逻辑介绍,笔者在把这张流程图放出来,大家可以结合这张图在来回想下主干核心逻辑。
Netty接收网络数据流程.png
下面笔者会结合这张流程图,给大家把这部分的核心主干源码框架展现出来,大家可以将我们介绍过的核心逻辑与主干源码做个一一对应,还是那句老话,我们要从主干框架层面把握整体处理流程,不需要读懂每一行代码,文章后续笔者会将这个过程中涉及到的核心点位给大家拆开来各个击破!!
image.png
3. 源码核心框架总览
@Override public final void read() { final ChannelConfig config = config(); ...............处理半关闭相关代码省略............... //获取NioSocketChannel的pipeline final ChannelPipeline pipeline = pipeline(); //PooledByteBufAllocator 具体用于实际分配ByteBuf的分配器 final ByteBufAllocator allocator = config.getAllocator(); //自适应ByteBuf分配器 AdaptiveRecvByteBufAllocator ,用于动态调节ByteBuf容量 //需要与具体的ByteBuf分配器配合使用 比如这里的PooledByteBufAllocator final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); //allocHandler用于统计每次读取数据的大小,方便下次分配合适大小的ByteBuf //重置清除上次的统计指标 allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { //利用PooledByteBufAllocator分配合适大小的byteBuf 初始大小为2048 byteBuf = allocHandle.allocate(allocator); //记录本次读取了多少字节数 allocHandle.lastBytesRead(doReadBytes(byteBuf)); //如果本次没有读取到任何字节,则退出循环 进行下一轮事件轮询 if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { ......表示客户端发起连接关闭..... } break; } //read loop读取数据次数+1 allocHandle.incMessagesRead(1); //客户端NioSocketChannel的pipeline中触发ChannelRead事件 pipeline.fireChannelRead(byteBuf); //解除本次读取数据分配的ByteBuffer引用,方便下一轮read loop分配 byteBuf = null; } while (allocHandle.continueReading());//判断是否应该继续read loop //根据本次read loop总共读取的字节数,决定下次是否扩容或者缩容 allocHandle.readComplete(); //在NioSocketChannel的pipeline中触发ChannelReadComplete事件,表示一次read事件处理完毕 //但这并不表示 客户端发送来的数据已经全部读完,因为如果数据太多的话,这里只会读取16次,剩下的会等到下次read事件到来后在处理 pipeline.fireChannelReadComplete(); .........省略连接关闭流程处理......... } catch (Throwable t) { ...............省略............... } finally { ...............省略............... } } }
这里再次强调下当前执行线程为Sub Reactor线程,处理连接数据读取逻辑是在NioSocketChannel中。
首先通过config()获取客户端NioSocketChannel的Channel配置类NioSocketChannelConfig。
通过pipeline()获取NioSocketChannel的pipeline。我们在?《详细图解Netty Reactor启动全流程》一文中提到的Netty服务端模板所举的示例中,NioSocketChannelde pipeline中只有一个EchoChannelHandler。
客户端channel pipeline结构.png
3.1 分配DirectByteBuffer接收网络数据
Sub Reactor在接收NioSocketChannel上的IO数据时,都会分配一个ByteBuffer用来存放接收到的IO数据。
这里大家可能觉得比较奇怪,为什么在NioSocketChannel接收数据这里会有两个ByteBuffer分配器呢?一个是ByteBufAllocator,另一个是RecvByteBufAllocator。
final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
这两个ByteBuffer又有什么区别和联系呢?
在上篇文章?《抓到Netty一个Bug,顺带来透彻地聊一下Netty是如何高效接收网络连接》中,笔者为了阐述上篇文章中提到的Netty在接收网络连接时的Bug时,简单和大家介绍了下这个RecvByteBufAllocator。
在上篇文章提到的NioServerSocketChannelConfig中,这里的RecvByteBufAllocator类型为ServerChannelRecvByteBufAllocator。
image.png
还记得这个ServerChannelRecvByteBufAllocator类型在4.1.69.final版本引入是为了解决笔者在上篇文章中提到的那个Bug吗?在4.1.69.final版本之前,NioServerSocketChannelConfig中的RecvByteBufAllocator类型为AdaptiveRecvByteBufAllocator。
而在本文中NioSocketChannelConfig中的RecvByteBufAllocator类型为AdaptiveRecvByteBufAllocator。
image.png
所以这里recvBufAllocHandle()获得到的RecvByteBufAllocator为AdaptiveRecvByteBufAllocator。顾名思义,这个类型的RecvByteBufAllocator可以根据NioSocketChannel上每次到来的IO数据大小来自适应动态调整ByteBuffer的容量。
对于客户端NioSocketChannel来说,它里边包含的IO数据时客户端发送来的网络数据,长度是不定的,所以才会需要这样一个可以根据每次IO数据的大小来自适应动态调整容量的ByteBuffer来接收。
如果我们把用于接收数据用的ByteBuffer看做一个桶的话,那么小数据用大桶装或者大数据用小桶装肯定是不合适的,所以我们需要根据接收数据的大小来动态调整桶的容量。而AdaptiveRecvByteBufAllocator的作用正是用来根据每次接收数据的容量大小来动态调整ByteBuffer的容量的。
现在RecvByteBufAllocator笔者为大家解释清楚了,接下来我们继续看ByteBufAllocator。
大家这里需要注意的是AdaptiveRecvByteBufAllocator并不会真正的去分配ByteBuffer,它只是负责动态调整分配ByteBuffer的大小。
而真正具体执行内存分配动作的是这里的ByteBufAllocator类型为PooledByteBufAllocator。它会根据AdaptiveRecvByteBufAllocator动态调整出来的大小去真正的申请内存分配ByteBuffer。
PooledByteBufAllocator为Netty中的内存池,用来管理堆外内存DirectByteBuffer。
AdaptiveRecvByteBufAllocator中的allocHandle在上篇文章中我们也介绍过了,它的实际类型为MaxMessageHandle。
public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator { @Override public Handle newHandle() { return new HandleImpl(minIndex, maxIndex, initial); } private final class HandleImpl extends MaxMessageHandle { .................省略................ }}
在MaxMessageHandle中包含了用于动态调整ByteBuffer容量的统计指标。
public abstract class MaxMessageHandle implements ExtendedHandle { private ChannelConfig config; //用于控制每次read loop里最大可以循环读取的次数,默认为16次 //可在启动配置类ServerBootstrap中通过ChannelOption.MAX_MESSAGES_PER_READ选项设置。 private int maxMessagePerRead; //用于统计read loop中总共接收的连接个数,NioSocketChannel中表示读取数据的次数 //每次read loop循环后会调用allocHandle.incMessagesRead增加记录接收到的连接个数 private int totalMessages; //用于统计在read loop中总共接收到客户端连接上的数据大小 private int totalBytesRead; //表示本次read loop 尝试读取多少字节,byteBuffer剩余可写的字节数 private int attemptedBytesRead; //本次read loop读取到的字节数 private int lastBytesRead; //预计下一次分配buffer的容量,初始:2048 private int nextReceiveBufferSize; ...........省略.............}
在每轮read loop开始之前,都会调用allocHandle.reset(config)重置清空上一轮read loop的统计指标。
@Override public void reset(ChannelConfig config) { this.config = config; //默认每次最多读取16次 maxMessagePerRead = maxMessagesPerRead(); totalMessages = totalBytesRead = 0; }
在每次开始从NioSocketChannel中读取数据之前,需要利用PooledByteBufAllocator在内存池中为ByteBuffer分配内存,默认初始化大小为2048,这个容量由guess()方法决定。
byteBuf = allocHandle.allocate(allocator);
@Override public ByteBuf allocate(ByteBufAllocator alloc) { return alloc.ioBuffer(guess()); } @Override public int guess() { //预计下一次分配buffer的容量,一开始为2048 return nextReceiveBufferSize; }
在每次通过doReadBytes从NioSocketChannel中读取到数据后,都会调用allocHandle.lastBytesRead(doReadBytes(byteBuf))记录本次读取了多少字节数据,并统计本轮read loop目前总共读取了多少字节。
@Override public void lastBytesRead(int bytes) { lastBytesRead = bytes; if (bytes > 0) { totalBytesRead += bytes; } }
每次循环从NioSocketChannel中读取数据之后,都会调用allocHandle.incMessagesRead(1)。统计当前已经读取了多少次。如果超过了最大读取限制此时16次,就需要退出read loop。去处理其他NioSocketChannel上的IO事件。
@Override public final void incMessagesRead(int amt) { totalMessages += amt; }
在每次read loop循环的末尾都需要通过调用allocHandle.continueReading()来判断是否继续read loop循环读取NioSocketChannel中的数据。
@Override public boolean continueReading() { return continueReading(defaultMaybeMoreSupplier); } private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() { @Override public boolean get() { //判断本次读取byteBuffer是否满载而归 return attemptedBytesRead == lastBytesRead; } }; @Override public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) { return config.isAutoRead() && (!respectMaybeMoreData || maybeMoreDataSupplier.get()) && totalMessages < maxMessagePerRead && totalBytesRead > 0; }attemptedBytesRead :表示当前ByteBuffer预计尝试要写入的字节数。lastBytesRead :表示本次read loop真实读取到了多少个字节。
defaultMaybeMoreSupplier用于判断经过本次read loop读取数据后,ByteBuffer是否满载而归。如果是满载而归的话(attemptedBytesRead == lastBytesRead),表明可能NioSocketChannel里还有数据。如果不是满载而归,表明NioSocketChannel里没有数据了已经。
是否继续进行read loop需要同时满足以下几个条件:
totalMessages < maxMessagePerRead 当前读取次数是否已经超过16次,如果超过,就退出do(...)while()循环。进行下一轮OP_READ事件的轮询。因为每个Sub Reactor管理了多个NioSocketChannel,不能在一个NioSocketChannel上占用太多时间,要将机会均匀地分配给Sub Reactor所管理的所有NioSocketChannel。totalBytesRead > 0 本次OP_READ事件处理是否读取到了数据,如果已经没有数据可读了,那么就直接退出read loop。!respectMaybeMoreData || maybeMoreDataSupplier.get() 这个条件比较复杂,它其实就是通过respectMaybeMoreData字段来控制NioSocketChannel中可能还有数据可读的情况下该如何处理。 maybeMoreDataSupplier.get():true表示本次读取从NioSocketChannel中读取数据,ByteBuffer满载而归。说明可能NioSocketChannel中还有数据没读完。fasle表示ByteBuffer还没有装满,说明NioSocketChannel中已经没有数据可读了。respectMaybeMoreData = true表示要对可能还有更多数据进行处理的这种情况要respect认真对待,如果本次循环读取到的数据已经装满ByteBuffer,表示后面可能还有数据,那么就要进行读取。如果ByteBuffer还没装满表示已经没有数据可读了那么就退出循环。 respectMaybeMoreData = false表示对可能还有更多数据的这种情况不认真对待 not respect。不管本次循环读取数据ByteBuffer是否满载而归,都要继续进行读取,直到读取不到数据在退出循环,属于无脑读取。
同时满足以上三个条件,那么read loop继续进行。继续从NioSocketChannel中读取数据,直到读取不到或者不满足三个条件中的任意一个为止。
3.2 从NioSocketChannel中读取数据
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel { @Override protected int doReadBytes(ByteBuf byteBuf) throws Exception { final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.attemptedBytesRead(byteBuf.writableBytes()); return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); }}
这里会直接调用底层JDK NIO的SocketChannel#read方法将数据读取到DirectByteBuffer中。读取数据大小为本次分配的DirectByteBuffer容量,初始为2048。
4. ByteBuffer动态自适应扩缩容机制
由于我们一开始并不知道客户端会发送多大的网络数据,所以这里先利用PooledByteBufAllocator分配一个初始容量为2048的DirectByteBuffer用于接收数据。
byteBuf = allocHandle.allocate(allocator);
这就好比我们需要拿着一个桶去排队装水,但是第一次去装的时候,我们并不知道管理员会给我们分配多少水,桶拿大了也不合适拿小了也不合适,于是我们就先预估一个差不多容量大小的桶,如果分配的多了,我们下次就拿更大一点的桶,如果分配少了,下次我们就拿一个小点的桶。
在这种场景下,我们需要ByteBuffer可以自动根据每次网络数据的大小来动态自适应调整自己的容量。
而ByteBuffer动态自适应扩缩容机制依赖于AdaptiveRecvByteBufAllocator类的实现。让我们先回到AdaptiveRecvByteBufAllocator类的创建起点开始说起~~
4.1 AdaptiveRecvByteBufAllocator的创建
在前文?《Netty是如何高效接收网络连接》中我们提到,当Main Reactor监听到OP_ACCPET事件活跃后,会在NioServerSocketChannel中accept完成三次握手的客户端连接。并创建NioSocketChannel,伴随着NioSocketChannel的创建其对应的配置类NioSocketChannelConfig类也会随之创建。
public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket); config = new NioSocketChannelConfig(this, socket.socket()); }
最终会在NioSocketChannelConfig的父类DefaultChannelConfig的构造器中创建AdaptiveRecvByteBufAllocator。并保存在RecvByteBufAllocator rcvBufAllocator字段中。
public class DefaultChannelConfig implements ChannelConfig { //用于Channel接收数据用的buffer分配器 AdaptiveRecvByteBufAllocator private volatile RecvByteBufAllocator rcvBufAllocator; public DefaultChannelConfig(Channel channel) { this(channel, new AdaptiveRecvByteBufAllocator()); }}
在new AdaptiveRecvByteBufAllocator()创建AdaptiveRecvByteBufAllocator类实例的时候会先触发AdaptiveRecvByteBufAllocator类的初始化。
我们先来看下AdaptiveRecvByteBufAllocator类的初始化都做了些什么事情:
4.2 AdaptiveRecvByteBufAllocator类的初始化
public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator { //扩容步长 private static final int INDEX_INCREMENT = 4; //缩容步长 private static final int INDEX_DECREMENT = 1; //RecvBuf分配容量表(扩缩容索引表)按照表中记录的容量大小进行扩缩容 private static final int[] SIZE_TABLE; static { //初始化RecvBuf容量分配表 List<Integer> sizeTable = new ArrayList<Integer>(); //当分配容量小于512时,扩容单位为16递增 for (int i = 16; i < 512; i += 16) { sizeTable.add(i); } //当分配容量大于512时,扩容单位为一倍 for (int i = 512; i > 0; i <<= 1) { sizeTable.add(i); } //初始化RecbBuf扩缩容索引表 SIZE_TABLE = new int[sizeTable.size()]; for (int i = 0; i < SIZE_TABLE.length; i ++) { SIZE_TABLE[i] = sizeTable.get(i); } }}
AdaptiveRecvByteBufAllocator 主要的作用就是为接收数据的ByteBuffer进行扩容缩容,那么每次怎么扩容?扩容多少?怎么缩容?缩容多少呢??
这四个问题将是本小节笔者要为大家解答的内容~~~
Netty中定义了一个int型的数组SIZE_TABLE来存储每个扩容单位对应的容量大小。建立起扩缩容的容量索引表。每次扩容多少,缩容多少全部记录在这个容量索引表中。
在AdaptiveRecvByteBufAllocatorl类初始化的时候会在static{}静态代码块中对扩缩容索引表SIZE_TABLE进行初始化。
从源码中我们可以看出SIZE_TABLE的初始化分为两个部分:
当索引容量小于512时,SIZE_TABLE中定义的容量索引是从16开始按16递增。
image.png
当索引容量大于512时,SIZE_TABLE中定义的容量索引是按前一个索引容量的2倍递增。
image.png
4.3 扩缩容逻辑
现在扩缩容索引表SIZE_TABLE已经初始化完毕了,那么当我们需要对ByteBuffer进行扩容或者缩容的时候如何根据SIZE_TABLE决定扩容多少或者缩容多少呢??
这就用到了在AdaptiveRecvByteBufAllocator类中定义的扩容步长INDEX_INCREMENT = 4,缩容步长INDEX_DECREMENT = 1了。
我们就以上面两副扩缩容容量索引表SIZE_TABLE中的容量索引展示截图为例,来介绍下扩缩容逻辑,假设我们当前ByteBuffer的容量索引为33,对应的容量为2048。
4.3.1 扩容
当对容量为2048的ByteBuffer进行扩容时,根据当前的容量索引index = 33 加上 扩容步长INDEX_INCREMENT = 4计算出扩容后的容量索引为37,那么扩缩容索引表SIZE_TABLE下标37对应的容量就是本次ByteBuffer扩容后的容量SIZE_TABLE[37] = 32768
4.3.1 缩容
同理对容量为2048的ByteBuffer进行缩容时,我们就需要用当前容量索引index = 33 减去 缩容步长INDEX_DECREMENT = 1计算出缩容后的容量索引32,那么扩缩容索引表SIZE_TABLE下标32对应的容量就是本次ByteBuffer缩容后的容量SIZE_TABLE[32] = 1024
4.4 扩缩容时机
public abstract class AbstractNioByteChannel extends AbstractNioChannel { @Override public final void read() { .........省略...... try { do { .........省略...... } while (allocHandle.continueReading()); //根据本次read loop总共读取的字节数,决定下次是否扩容或者缩容 allocHandle.readComplete(); .........省略......... } catch (Throwable t) { ...............省略............... } finally { ...............省略............... } }}
在每轮read loop结束之后,我们都会调用allocHandle.readComplete()来根据在allocHandle中统计的在本轮read loop中读取字节总大小,来决定在下一轮read loop中是否对DirectByteBuffer进行扩容或者缩容。
public abstract class MaxMessageHandle implements ExtendedHandle { @Override public void readComplete() { //是否对recvbuf进行扩容缩容 record(totalBytesRead()); } private void record(int actualReadBytes) { if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) { if (decreaseNow) { index = max(index - INDEX_DECREMENT, minIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; } else { decreaseNow = true; } } else if (actualReadBytes >= nextReceiveBufferSize) { index = min(index + INDEX_INCREMENT, maxIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; } } }
我们以当前ByteBuffer容量为2048,容量索引index = 33为例,对allocHandle的扩容缩容规则进行说明。
扩容步长INDEX_INCREMENT = 4,缩容步长INDEX_DECREMENT = 1。
image.png
4.4.1 缩容如果本次OP_READ事件实际读取到的总字节数actualReadBytes在SIZE_TABLE[index - INDEX_DECREMENT]与SIZE_TABLE[index]之间的话,也就是如果本轮read loop结束之后总共读取的字节数在[1024,2048]之间。说明此时分配的ByteBuffer容量正好,不需要进行缩容也不需要进行扩容。 比如本次actualReadBytes = 2000,正好处在1024与2048之间。说明2048的容量正好。如果actualReadBytes 小于等于 SIZE_TABLE[index - INDEX_DECREMENT],也就是如果本轮read loop结束之后总共读取的字节数小于等于1024。表示本次读取到的字节数比当前ByteBuffer容量的下一级容量还要小,说明当前ByteBuffer的容量分配的有些大了,设置缩容标识decreaseNow = true。当下次OP_READ事件继续满足缩容条件的时候,开始真正的进行缩容。缩容后的容量为SIZE_TABLE[index - INDEX_DECREMENT],但不能小于SIZE_TABLE[minIndex]。
注意需要满足两次缩容条件才会进行缩容,且缩容步长为1,缩容比较谨慎
4.4.2 扩容
如果本次OP_READ事件处理总共读取的字节数actualReadBytes 大于等于 当前ByteBuffer容量(nextReceiveBufferSize)时,说明ByteBuffer分配的容量有点小了,需要进行扩容。扩容后的容量为SIZE_TABLE[index + INDEX_INCREMENT],但不能超过SIZE_TABLE[maxIndex]。
满足一次扩容条件就进行扩容,并且扩容步长为4, 扩容比较奔放
4.5 AdaptiveRecvByteBufAllocator类的实例化
AdaptiveRecvByteBufAllocator类的实例化主要是确定ByteBuffer的初始容量,以及最小容量和最大容量在扩缩容索引表SIZE_TABLE中的下标:minIndex和maxIndex。
AdaptiveRecvByteBufAllocator定义了三个关于ByteBuffer容量的字段:
DEFAULT_MINIMUM :表示ByteBuffer最小的容量,默认为64,也就是无论ByteBuffer在怎么缩容,容量也不会低于64。DEFAULT_INITIAL:表示ByteBuffer的初始化容量。默认为2048。DEFAULT_MAXIMUM :表示ByteBuffer的最大容量,默认为65536,也就是无论ByteBuffer在怎么扩容,容量也不会超过65536。
public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator { static final int DEFAULT_MINIMUM = 64; static final int DEFAULT_INITIAL = 2048; static final int DEFAULT_MAXIMUM = 65536; public AdaptiveRecvByteBufAllocator() { this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM); } public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) { .................省略异常检查逻辑............. //计算minIndex maxIndex //在SIZE_TABLE中二分查找最小 >= minimum的容量索引 :3 int minIndex = getSizeTableIndex(minimum); if (SIZE_TABLE[minIndex] < minimum) { this.minIndex = minIndex + 1; } else { this.minIndex = minIndex; } //在SIZE_TABLE中二分查找最大 <= maximum的容量索引 :38 int maxIndex = getSizeTableIndex(maximum); if (SIZE_TABLE[maxIndex] > maximum) { this.maxIndex = maxIndex - 1; } else { this.maxIndex = maxIndex; } this.initial = initial; }}
接下来的事情就是确定最小容量DEFAULT_MINIMUM 在SIZE_TABLE中的下标minIndex,以及最大容量DEFAULT_MAXIMUM 在SIZE_TABLE中的下标maxIndex。
从AdaptiveRecvByteBufAllocator类初始化的过程中,我们可以看出SIZE_TABLE中存储的数据特征是一个有序的集合。
我们可以通过二分查找在SIZE_TABLE中找出第一个容量大于等于DEFAULT_MINIMUM的容量索引minIndex。
同理通过二分查找在SIZE_TABLE中找出最后一个容量小于等于DEFAULT_MAXIMUM的容量索引maxIndex。
根据上一小节关于SIZE_TABLE中容量数据分布的截图,我们可以看出minIndex = 3,maxIndex = 38
4.5.1 二分查找容量索引下标
private static int getSizeTableIndex(final int size) { for (int low = 0, high = SIZE_TABLE.length - 1;;) { if (high < low) { return low; } if (high == low) { return high; } int mid = low + high >>> 1;//无符号右移,高位始终补0 int a = SIZE_TABLE[mid]; int b = SIZE_TABLE[mid + 1]; if (size > b) { low = mid + 1; } else if (size < a) { high = mid - 1; } else if (size == a) { return mid; } else { return mid + 1; } } }
经常刷LeetCode的小伙伴肯定一眼就看出这个是二分查找的模板了。
它的目的就是根据给定容量,在扩缩容索引表SIZE_TABLE中,通过二分查找找到最贴近给定size的容量的索引下标(第一个大于等于 size的容量)
4.6 RecvByteBufAllocator.Handle
前边我们提到最终动态调整ByteBuffer容量的是由AdaptiveRecvByteBufAllocator中的Handler负责的,我们来看下这个allocHandle的创建过程。
protected abstract class AbstractUnsafe implements Unsafe { private RecvByteBufAllocator.Handle recvHandle; @Override public RecvByteBufAllocator.Handle recvBufAllocHandle() { if (recvHandle == null) { recvHandle = config().getRecvByteBufAllocator().newHandle(); } return recvHandle; }}
从allocHandle的获取过程我们看到最allocHandle的创建是由AdaptiveRecvByteBufAllocator#newHandle方法执行的。
public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator { @Override public Handle newHandle() { return new HandleImpl(minIndex, maxIndex, initial); } private final class HandleImpl extends MaxMessageHandle { //最小容量在扩缩容索引表中的index private final int minIndex; //最大容量在扩缩容索引表中的index private final int maxIndex; //当前容量在扩缩容索引表中的index 初始33 对应容量2048 private int index; //预计下一次分配buffer的容量,初始:2048 private int nextReceiveBufferSize; //是否缩容 private boolean decreaseNow; HandleImpl(int minIndex, int maxIndex, int initial) { this.minIndex = minIndex; this.maxIndex = maxIndex; //在扩缩容索引表中二分查找到最小大于等于initial 的容量 index = getSizeTableIndex(initial); //2048 nextReceiveBufferSize = SIZE_TABLE[index]; } .......................省略................... }}
这里我们看到Netty中用于动态调整ByteBuffer容量的allocHandle的实际类型为MaxMessageHandle。
下面我们来介绍下HandleImpl中的核心字段,它们都和ByteBuffer的容量有关:
minIndex :最小容量在扩缩容索引表SIZE_TABE中的index。默认是3。maxIndex :最大容量在扩缩容索引表SIZE_TABE中的index。默认是38。index :当前容量在扩缩容索引表SIZE_TABE中的index。初始是33。nextReceiveBufferSize :预计下一次分配buffer的容量,初始为2048。在每次申请内存分配ByteBuffer的时候,采用nextReceiveBufferSize的值指定容量。decreaseNow : 是否需要进行缩容。5. 使用堆外内存为ByteBuffer分配内存
AdaptiveRecvByteBufAllocator类只是负责动态调整ByteBuffer的容量,而具体为ByteBuffer申请内存空间的是由PooledByteBufAllocator负责。
5.1 类名前缀Pooled的来历
在我们使用Java进行日常开发过程中,在为对象分配内存空间的时候我们都会选择在JVM堆中为对象分配内存,这样做对我们Java开发者特别的友好,我们只管使用就好而不必过多关心这块申请的内存如何回收,因为JVM堆完全受Java虚拟机控制管理,Java虚拟机会帮助我们回收不再使用的内存。
但是JVM在进行垃圾回收时候的stop the world会对我们应用程序的性能造成一定的影响。
除此之外我们在?《聊聊Netty那些事儿之从内核角度看IO模型》一文中介绍IO模型的时候提到,当数据达到网卡时,网卡会通过DMA的方式将数据拷贝到内核空间中,这是第一次拷贝。当用户线程在用户空间发起系统IO调用时,CPU会将内核空间的数据再次拷贝到用户空间。这是第二次拷贝。
于此不同的是当我们在JVM中发起IO调用时,比如我们使用JVM堆内存读取Socket接收缓冲区中的数据时,会多一次内存拷贝,CPU在第二次拷贝中将数据从内核空间拷贝到用户空间时,此时的用户空间站在JVM角度是堆外内存,所以还需要将堆外内存中的数据拷贝到堆内内存中。这就是第三次内存拷贝。
同理当我们在JVM中发起IO调用向Socket发送缓冲区写入数据时,JVM会将IO数据先拷贝到堆外内存,然后才能发起系统IO调用。
那为什么操作系统不直接使用JVM的堆内内存进行IO操作呢?
因为JVM的内存布局和操作系统分配的内存是不一样的,操作系统不可能按照JVM规范来读写数据,所以就需要第三次拷贝中间做个转换将堆外内存中的数据拷贝到JVM堆中。
所以基于上述内容,在使用JVM堆内内存时会产生以下两点性能影响:
JVM在垃圾回收堆内内存时,会发生stop the world导致应用程序卡顿。在进行IO操作的时候,会多产生一次由堆外内存到堆内内存的拷贝。
基于以上两点使用JVM堆内内存对性能造成的影响,于是对性能有卓越追求的Netty采用堆外内存也就是DirectBuffer来为ByteBuffer分配内存空间。
采用堆外内存为ByteBuffer分配内存的好处就是:
堆外内存直接受操作系统的管理,不会受JVM的管理,所以JVM垃圾回收对应用程序的性能影响就没有了。网络数据到达之后直接在堆外内存上接收,进程读取网络数据时直接在堆外内存中读取,所以就避免了第三次内存拷贝。
所以Netty在进行 I/O 操作时都是使用的堆外内存,可以避免数据从 JVM 堆内存到堆外内存的拷贝。但是由于堆外内存不受JVM的管理,所以就需要额外关注对内存的使用和释放,稍有不慎就会造成内存泄露,于是Netty就引入了内存池对堆外内存进行统一管理。
PooledByteBufAllocator类的这个前缀Pooled就是内存池的意思,这个类会使用Netty的内存池为ByteBuffer分配堆外内存。
5.2 PooledByteBufAllocator的创建创建时机
在服务端NioServerSocketChannel的配置类NioServerSocketChannelConfig以及客户端NioSocketChannel的配置类NioSocketChannelConfig实例化的时候会触发PooledByteBufAllocator的创建。
public class DefaultChannelConfig implements ChannelConfig { //PooledByteBufAllocator private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT; ..........省略......}
创建出来的PooledByteBufAllocator实例保存在DefaultChannelConfig类中的ByteBufAllocator allocator字段中。
创建过程
public interface ByteBufAllocator { ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR; ..................省略............}
public final class ByteBufUtil { static final ByteBufAllocator DEFAULT_ALLOCATOR; static { String allocType = SystemPropertyUtil.get( "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled"); allocType = allocType.toLowerCase(Locale.US).trim(); ByteBufAllocator alloc; if ("unpooled".equals(allocType)) { alloc = UnpooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: {}", allocType); } else if ("pooled".equals(allocType)) { alloc = PooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: {}", allocType); } else { alloc = PooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType); } DEFAULT_ALLOCATOR = alloc; ...................省略.................. }}
从ByteBufUtil类的初始化过程我们可以看出,在为ByteBuffer分配内存的时候是否使用内存池在Netty中是可以配置的。
通过系统变量-D io.netty.allocator.type 可以配置是否使用内存池为ByteBuffer分配内存。默认情况下是需要使用内存池的。但是在安卓系统中默认是不使用内存池的。通过PooledByteBufAllocator.DEFAULT获取内存池ByteBuffer分配器。
public static final PooledByteBufAllocator DEFAULT = new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
由于本文的主线是介绍Sub Reactor处理OP_READ事件的完整过程,所以这里只介绍主线相关的内容,这里只是简单介绍下在接收数据的时候为什么会用PooledByteBufAllocator来为ByteBuffer分配内存。而内存池的架构设计比较复杂,所以笔者后面会单独写一篇关于Netty内存管理的文章。
总结
本文介绍了Sub Reactor线程在处理OP_READ事件的整个过程。并深入剖析了AdaptiveRecvByteBufAllocator类动态调整ByteBuffer容量的原理。
同时也介绍了Netty为什么会使用堆外内存来为ByteBuffer分配内存,并由此引出了Netty的内存池分配器PooledByteBufAllocator 。
在介绍AdaptiveRecvByteBufAllocator类和PooledByteBufAllocator一起组合实现动态地为ByteBuffer分配容量的时候,笔者不禁想起了多年前看过的《Effective Java》中第16条 复合优先于继承。
Netty在这里也遵循了这条军规,首先两个类设计的都是单一的功能。
AdaptiveRecvByteBufAllocator类只负责动态的调整ByteBuffer容量,并不管具体的内存分配。PooledByteBufAllocator类负责具体的内存分配,用内存池的方式。
这样设计的就比较灵活,具体内存分配的工作交给具体的ByteBufAllocator,可以使用内存池的分配方式PooledByteBufAllocator,也可以不使用内存池的分配方式UnpooledByteBufAllocator。具体的内存可以采用JVM堆内内存(HeapBuffer),也可以使用堆外内存(DirectBuffer)。
而AdaptiveRecvByteBufAllocator只需要关注调整它们的容量工作就可以了,而并不需要关注它们具体的内存分配方式。
最后通过io.netty.channel.RecvByteBufAllocator.Handle#allocate方法灵活组合不同的内存分配方式。这也是装饰模式的一种应用。
byteBuf = allocHandle.allocate(allocator);
好了,今天的内容就到这里,我们下篇文章见~~~~
标签: #netty接收数据不完整