龙空技术网

Netty底层核心原理剖析

so贝塔 219

前言:

当前各位老铁们对“bytebuf 读取所有数据”都比较着重,兄弟们都需要了解一些“bytebuf 读取所有数据”的相关知识。那么小编也在网上网罗了一些对于“bytebuf 读取所有数据””的相关内容,希望看官们能喜欢,兄弟们快快来学习一下吧!

一、为什么选择 Netty

Netty 是一款用于高效开发网络应用的 NIO 网络框架,它大大简化了网络应用的开发过程。我们所熟知的 TCP 和 UDP 的 Socket 服务器开发,就是一个有关 Netty 简化网络应用开发的典型案例。

既然 Netty 是网络应用框架,那我们永远绕不开以下几个核心关注点:

I/O 模型、线程模型和事件处理机制;易用性 API 接口;对数据协议、序列化的支持。

我们之所以会最终选择 Netty,是因为 Netty 围绕这些核心要点可以做到尽善尽美,其健壮性、性能、可扩展性在同领域的框架中都首屈一指。下面我们从以下三个方面一起来看看,Netty 到底有多厉害。

高性能,低延迟

经常听到这么一句话:“网络编程只要你使用了 Netty 框架,你的程序性能基本就不会差。”这句话虽然有些绝对,但是也从侧面上反映了人们对 Netty 高性能的肯定。

实现高性能的网络应用框架离不开 I/O 模型问题,在了解 Netty 高性能原理之前我们需要先储备 I/O 模型的基本知识。

I/O 请求可以分为两个阶段,分别为调用阶段和执行阶段。

第一个阶段为I/O 调用阶段,即用户进程向内核发起系统调用。第二个阶段为I/O 执行阶段。此时,内核等待 I/O 请求处理完成返回。该阶段分为两个过程:首先等待数据就绪,并写入内核缓冲区;随后将内核缓冲区数据拷贝至用户态缓冲区。

为了方便大家理解,可以看一下这张图:

接下来我们来回顾一下 Linux 的 5 种主要 I/O 模式,并看下各种 I/O 模式的优劣势都在哪里?

同步阻塞 I/O(BIO)

如上图所表现的那样,应用进程向内核发起 I/O 请求,发起调用的线程一直等待内核返回结果。一次完整的 I/O 请求称为BIO(Blocking IO,阻塞 I/O),所以 BIO 在实现异步操作时,只能使用多线程模型,一个请求对应一个线程。但是,线程的资源是有限且宝贵的,创建过多的线程会增加线程切换的开销。

同步非阻塞 I/O(NIO)

在刚介绍完 BIO 的网络模型之后,NIO 自然就很好理解了。

如上图所示,应用进程向内核发起 I/O 请求后不再会同步等待结果,而是会立即返回,通过轮询的方式获取请求结果。NIO 相比 BIO 虽然大幅提升了性能,但是轮询过程中大量的系统调用导致上下文切换开销很大。所以,单独使用非阻塞 I/O 时效率并不高,而且随着并发量的提升,非阻塞 I/O 会存在严重的性能浪费。

I/O 多路复用

多路复用实现了一个线程处理多个 I/O 句柄的操作。多路指的是多个数据通道,复用指的是使用一个或多个固定线程来处理每一个 Socket。select、poll、epoll 都是 I/O 多路复用的具体实现,线程一次 select 调用可以获取内核态中多个数据通道的数据状态。多路复用解决了同步阻塞 I/O 和同步非阻塞 I/O 的问题,是一种非常高效的 I/O 模型。

信号驱动 I/O

信号驱动 I/O 并不常用,它是一种半异步的 I/O 模型。在使用信号驱动 I/O 时,当数据准备就绪后,内核通过发送一个 SIGIO 信号通知应用进程,应用进程就可以开始读取数据了。

异步 I/O

异步 I/O 最重要的一点是从内核缓冲区拷贝数据到用户态缓冲区的过程也是由系统异步完成,应用进程只需要在指定的数组中引用数据即可。异步 I/O 与信号驱动 I/O 这种半异步模式的主要区别:信号驱动 I/O 由内核通知何时可以开始一个 I/O 操作,而异步 I/O 由内核通知 I/O 操作何时已经完成。

了解了上述五种 I/O,我们再来看 Netty 如何实现自己的 I/O 模型。Netty 的 I/O 模型是基于非阻塞 I/O 实现的,底层依赖的是 JDK NIO 框架的多路复用器 Selector。一个多路复用器 Selector 可以同时轮询多个 Channel,采用 epoll 模式后,只需要一个线程负责 Selector 的轮询,就可以接入成千上万的客户端。

在 I/O 多路复用的场景下,当有数据处于就绪状态后,需要一个事件分发器(Event Dispather),它负责将读写事件分发给对应的读写事件处理器(Event Handler)。事件分发器有两种设计模式:Reactor 和 Proactor,Reactor 采用同步 I/O, Proactor 采用异步 I/O。

Reactor 实现相对简单,适合处理耗时短的场景,对于耗时长的 I/O 操作容易造成阻塞。Proactor 性能更高,但是实现逻辑非常复杂,目前主流的事件驱动模型还是依赖 select 或 epoll 来实现。

上图所描述的便是 Netty 所采用的主从 Reactor 多线程模型,所有的 I/O 事件都注册到一个 I/O 多路复用器上,当有 I/O 事件准备就绪后,I/O 多路复用器会将该 I/O 事件通过事件分发器分发到对应的事件处理器中。该线程模型避免了同步问题以及多线程切换带来的资源开销,真正做到高性能、低延迟。

完美弥补 Java NIO 的缺陷

在 JDK 1.4 投入使用之前,只有 BIO 一种模式。开发过程相对简单。新来一个连接就会创建一个新的线程处理。随着请求并发度的提升,BIO 很快遇到了性能瓶颈。JDK 1.4 以后开始引入了 NIO 技术,支持 select 和 poll;JDK 1.5 支持了 epoll;JDK 1.7 发布了 NIO2,支持 AIO 模型。Java 在网络领域取得了长足的进步。

既然 JDK NIO 性能已经非常优秀,为什么还要选择 Netty?这是因为 Netty 做了 JDK 该做的事,但是做得更加完备。我们一起看下 Netty 相比 JDK NIO 有哪些突出的优势。

易用性。 我们使用 JDK NIO 编程需要了解很多复杂的概念,比如 Channels、Selectors、Sockets、Buffers 等,编码复杂程度令人发指。相反,Netty 在 NIO 基础上进行了更高层次的封装,屏蔽了 NIO 的复杂性;Netty 封装了更加人性化的 API,统一的 API(阻塞/非阻塞) 大大降低了开发者的上手难度;与此同时,Netty 提供了很多开箱即用的工具,例如常用的行解码器、长度域解码器等,而这些在 JDK NIO 中都需要你自己实现。稳定性。 Netty 更加可靠稳定,修复和完善了 JDK NIO 较多已知问题,例如臭名昭著的 select 空转导致 CPU 消耗 100%,TCP 断线重连,keep-alive 检测等问题。可扩展性。 Netty 的可扩展性在很多地方都有体现,这里我主要列举其中的两点:一个是可定制化的线程模型,用户可以通过启动的配置参数选择 Reactor 线程模型;另一个是可扩展的事件驱动模型,将框架层和业务层的关注点分离。大部分情况下,开发者只需要关注 ChannelHandler 的业务逻辑实现。更低的资源消耗

作为网络通信框架,需要处理海量的网络数据,那么必然面临有大量的网络对象需要创建和销毁的问题,对于 JVM GC 并不友好。为了降低 JVM 垃圾回收的压力,Netty 主要采用了两种优化手段:

对象池复用技术。 Netty 通过复用对象,避免频繁创建和销毁带来的开销。零拷贝技术。 除了操作系统级别的零拷贝技术外,Netty 提供了更多面向用户态的零拷贝技术,例如 Netty 在 I/O 读写时直接使用 DirectBuffer,从而避免了数据在堆内存和堆外内存之间的拷贝。

因为 Netty 不仅做到了高性能、低延迟以及更低的资源消耗,还完美弥补了 Java NIO 的缺陷,所以在网络编程时越来越受到开发者们的青睐。

二、谁在使用 Netty

Netty 经过很多出名产品在线上的大规模验证,其健壮性和稳定性都被业界认可,其中典型的产品有以下几个。

服务治理:Apache Dubbo、gRPC。大数据:Hbase、Spark、Flink、Storm。搜索引擎:Elasticsearch。消息队列:RocketMQ、ActiveMQ。三、架构脉络Netty 整体结构

从图中,我们可以清晰地看出 Netty 结构一共分为三个模块:

Core 核心层

Core 核心层是 Netty 最精华的内容,它提供了底层网络通信的通用抽象和实现,包括可扩展的事件模型、通用的通信 API、支持零拷贝的 ByteBuf 等。

Protocol Support 协议支持层

协议支持层基本上覆盖了主流协议的编解码实现,如 HTTP、SSL、Protobuf、压缩、大文件传输、WebSocket、文本、二进制等主流协议,此外 Netty 还支持自定义应用层协议。Netty 丰富的协议支持降低了用户的开发成本,基于 Netty 我们可以快速开发 HTTP、WebSocket 等服务。

Transport Service 传输服务层

传输服务层提供了网络传输能力的定义和实现方法。它支持 Socket、HTTP 隧道、虚拟机管道等传输方式。Netty 对 TCP、UDP 等数据传输做了抽象和封装,用户可以更聚焦在业务逻辑实现上,而不必关系底层数据传输的细节。

Netty 的模块设计具备较高的通用性和可扩展性,它不仅是一个优秀的网络框架,还可以作为网络编程的工具箱。Netty 的设计理念非常优雅,值得我们学习借鉴。

现在,我们对 Netty 的整体结构已经有了一个大概的印象,下面我们一起看下 Netty 的逻辑架构,学习下 Netty 是如何做功能分解的。

Netty 逻辑架构

下图是 Netty 的逻辑处理架构。Netty 的逻辑处理架构为典型网络分层架构设计,共分为网络通信层、事件调度层、服务编排层,每一层各司其职。图中包含了 Netty 每一层所用到的核心组件。我将为你介绍 Netty 的每个逻辑分层中的各个核心组件以及组件之间是如何协调运作的。

网络通信层

网络通信层的职责是执行网络 I/O 的操作。它支持多种网络协议和 I/O 模型的连接操作。当网络数据读取到内核缓冲区后,会触发各种网络事件,这些网络事件会分发给事件调度层进行处理。

网络通信层的核心组件包含BootStrap、ServerBootStrap、Channel三个组件。

BootStrap ServerBootStrap

Bootstrap 是“引导”的意思,它主要负责整个 Netty 程序的启动、初始化、服务器连接等过程,它相当于一条主线,串联了 Netty 的其他核心组件。

如下图所示,Netty 中的引导器共分为两种类型:一个为用于客户端引导的 Bootstrap,另一个为用于服务端引导的 ServerBootStrap,它们都继承自抽象类 AbstractBootstrap。

Bootstrap 和 ServerBootStrap 十分相似,两者非常重要的区别在于 Bootstrap 可用于连接远端服务器,只绑定一个 EventLoopGroup。而 ServerBootStrap 则用于服务端启动绑定本地端口,会绑定两个 EventLoopGroup,这两个 EventLoopGroup 通常称为 Boss 和 Worker。

ServerBootStrap 中的 Boss 和 Worker 是什么角色呢?它们之间又是什么关系?这里的 Boss 和 Worker 可以理解为“老板”和“员工”的关系。每个服务器中都会有一个 Boss,也会有一群做事情的 Worker。Boss 会不停地接收新的连接,然后将连接分配给一个个 Worker 处理连接。

有了 Bootstrap 组件,我们可以更加方便地配置和启动 Netty 应用程序,它是整个 Netty 的入口,串接了 Netty 所有核心组件的初始化工作。

Channel

Channel 的字面意思是“通道”,它是网络通信的载体。Channel提供了基本的 API 用于网络 I/O 操作,如 register、bind、connect、read、write、flush 等。Netty 自己实现的 Channel 是以 JDK NIO Channel 为基础的,相比较于 JDK NIO,Netty 的 Channel 提供了更高层次的抽象,同时屏蔽了底层 Socket 的复杂性,赋予了 Channel 更加强大的功能,你在使用 Netty 时基本不需要再与 Java Socket 类直接打交道。

下图是 Channel 家族的图谱。AbstractChannel 是整个家族的基类,派生出 AbstractNioChannel、AbstractOioChannel、AbstractEpollChannel 等子类,每一种都代表了不同的 I/O 模型和协议类型。常用的 Channel 实现类有:

NioServerSocketChannel 异步 TCP 服务端。NioSocketChannel 异步 TCP 客户端。OioServerSocketChannel 同步 TCP 服务端。OioSocketChannel 同步 TCP 客户端。NioDatagramChannel 异步 UDP 连接。OioDatagramChannel 同步 UDP 连接。

当然 Channel 会有多种状态,如连接建立、连接注册、数据读写、连接销毁等。随着状态的变化,Channel 处于不同的生命周期,每一种状态都会绑定相应的事件回调,下面的表格我列举了 Channel 最常见的状态所对应的事件回调。

事件

说明

channelRegistered

Channel 创建后被注册到 EventLoop 上

channelUnregistered

Channel 创建后未注册或者从 EventLoop 取消注册

channelActive

Channel 处于就绪状态,可以被读写

channelInactive

Channel 处于非就绪状态

channelRead

Channel 可以从远端读取到数据

channelReadComplete

Channel 读取数据完成

有关网络通信层我就先介绍到这里,简单地总结一下。BootStrap 和 ServerBootStrap 分别负责客户端和服务端的启动,它们是非常强大的辅助工具类;Channel 是网络通信的载体,提供了与底层 Socket 交互的能力。那么 Channel 生命周期内的事件都是如何被处理的呢?那就是 Netty 事件调度层的工作职责了。

事件调度层

事件调度层的职责是通过 Reactor 线程模型对各类事件进行聚合处理,通过 Selector 主循环线程集成多种事件( I/O 事件、信号事件、定时事件等),实际的业务处理逻辑是交由服务编排层中相关的 Handler 完成。

事件调度层的核心组件包括 EventLoopGroup、EventLoop。

EventLoopGroup EventLoop

EventLoopGroup 本质是一个线程池,主要负责接收 I/O 请求,并分配线程执行处理请求。在下图中,我为你讲述了 EventLoopGroup、EventLoop 与 Channel 的关系。

从上图中,我们可以总结出 EventLoopGroup、EventLoop、Channel 的几点关系。

一个 EventLoopGroup 往往包含一个或者多个 EventLoop。EventLoop 用于处理 Channel 生命周期内的所有 I/O 事件,如 accept、connect、read、write 等 I/O 事件。EventLoop 同一时间会与一个线程绑定,每个 EventLoop 负责处理多个 Channel。每新建一个 Channel,EventLoopGroup 会选择一个 EventLoop 与其绑定。该 Channel 在生命周期内都可以对 EventLoop 进行多次绑定和解绑。

下图是 EventLoopGroup 的家族图谱。可以看出 Netty 提供了 EventLoopGroup 的多种实现,而且 EventLoop 则是 EventLoopGroup 的子接口,所以也可以把 EventLoop 理解为 EventLoopGroup,但是它只包含一个 EventLoop 。

EventLoopGroup 的实现类是 NioEventLoopGroup,NioEventLoopGroup 也是 Netty 中最被推荐使用的线程模型。NioEventLoopGroup 继承于 MultithreadEventLoopGroup,是基于 NIO 模型开发的,可以把 NioEventLoopGroup 理解为一个线程池,每个线程负责处理多个 Channel,而同一个 Channel 只会对应一个线程。

EventLoopGroup 是 Netty 的核心处理引擎,那么 EventLoopGroup 和之前课程所提到的 Reactor 线程模型到底是什么关系呢?其实 EventLoopGroup 是 Netty Reactor 线程模型的具体实现方式,Netty 通过创建不同的 EventLoopGroup 参数配置,就可以支持 Reactor 的三种线程模型:

单线程模型:EventLoopGroup 只包含一个 EventLoop,Boss 和 Worker 使用同一个EventLoopGroup;多线程模型:EventLoopGroup 包含多个 EventLoop,Boss 和 Worker 使用同一个EventLoopGroup;主从多线程模型:EventLoopGroup 包含多个 EventLoop,Boss 是主 Reactor,Worker 是从 Reactor,它们分别使用不同的 EventLoopGroup,主 Reactor 负责新的网络连接 Channel 创建,然后把 Channel 注册到从 Reactor。

在介绍完事件调度层之后,可以说 Netty 的发动机已经转起来了,事件调度层负责监听网络连接和读写操作,然后触发各种类型的网络事件,需要一种机制管理这些错综复杂的事件,并有序地执行,接下来我们便一起学习 Netty 服务编排层中核心组件的职责。

服务编排层

服务编排层的职责是负责组装各类服务,它是 Netty 的核心处理链,用以实现网络事件的动态编排和有序传播。

服务编排层的核心组件包括 ChannelPipeline、ChannelHandler、ChannelHandlerContext。

ChannelPipeline

ChannelPipeline 是 Netty 的核心编排组件,负责组装各种 ChannelHandler,实际数据的编解码以及加工处理操作都是由 ChannelHandler 完成的。ChannelPipeline 可以理解为ChannelHandler 的实例列表——内部通过双向链表将不同的 ChannelHandler 链接在一起。当 I/O 读写事件触发时,ChannelPipeline 会依次调用 ChannelHandler 列表对 Channel 的数据进行拦截和处理。

ChannelPipeline 是线程安全的,因为每一个新的 Channel 都会对应绑定一个新的 ChannelPipeline。一个 ChannelPipeline 关联一个 EventLoop,一个 EventLoop 仅会绑定一个线程。

ChannelPipeline、ChannelHandler 都是高度可定制的组件。开发者可以通过这两个核心组件掌握对 Channel 数据操作的控制权。下面我们看一下 ChannelPipeline 的结构图:

从上图可以看出,ChannelPipeline 中包含入站 ChannelInboundHandler 和出站 ChannelOutboundHandler 两种处理器,我们结合客户端和服务端的数据收发流程来理解 Netty 的这两个概念。

客户端和服务端都有各自的 ChannelPipeline。以客户端为例,数据从客户端发向服务端,该过程称为出站,反之则称为入站。数据入站会由一系列 InBoundHandler 处理,然后再以相反方向的 OutBoundHandler 处理后完成出站。我们经常使用的编码 Encoder 是出站操作,解码 Decoder 是入站操作。服务端接收到客户端数据后,需要先经过 Decoder 入站处理后,再通过 Encoder 出站通知客户端。所以客户端和服务端一次完整的请求应答过程可以分为三个步骤:客户端出站(请求数据)、服务端入站(解析数据并执行业务逻辑)、服务端出站(响应结果)。

ChannelHandler ChannelHandlerContext

在介绍 ChannelPipeline 的过程中,想必你已经对 ChannelHandler 有了基本的概念,数据的编解码工作以及其他转换工作实际都是通过 ChannelHandler 处理的。站在开发者的角度,最需要关注的就是 ChannelHandler,我们很少会直接操作 Channel,都是通过 ChannelHandler 间接完成。

下图描述了 Channel 与 ChannelPipeline 的关系,从图中可以看出,每创建一个 Channel 都会绑定一个新的 ChannelPipeline,ChannelPipeline 中每加入一个 ChannelHandler 都会绑定一个 ChannelHandlerContext。由此可见,ChannelPipeline、ChannelHandlerContext、ChannelHandler 三个组件的关系是密切相关的,那么你一定会有疑问,每个 ChannelHandler 绑定ChannelHandlerContext 的作用是什么呢?

ChannelHandlerContext 用于保存 ChannelHandler 上下文,通过 ChannelHandlerContext 我们可以知道 ChannelPipeline 和 ChannelHandler 的关联关系。ChannelHandlerContext 可以实现 ChannelHandler 之间的交互,ChannelHandlerContext 包含了 ChannelHandler 生命周期的所有事件,如 connect、bind、read、flush、write、close 等。此外,你可以试想这样一个场景,如果每个 ChannelHandler 都有一些通用的逻辑需要实现,没有 ChannelHandlerContext 这层模型抽象,你是不是需要写很多相同的代码呢?

以上便是 Netty 的逻辑处理架构,可以看出 Netty 的架构分层设计得非常合理,屏蔽了底层 NIO 以及框架层的实现细节,对于业务开发者来说,只需要关注业务逻辑的编排和实现即可。

组件关系梳理

当你了解每个 Netty 核心组件的概念后。你会好奇这些组件之间如何协作?结合客户端和服务端的交互流程,我画了一张图,为你完整地梳理一遍 Netty 内部逻辑的流转。

服务端启动初始化时有 Boss EventLoopGroup 和 Worker EventLoopGroup 两个组件,其中 Boss 负责监听网络连接事件。当有新的网络连接事件到达时,则将 Channel 注册到 Worker EventLoopGroup。Worker EventLoopGroup 会被分配一个 EventLoop 负责处理该 Channel 的读写事件。每个 EventLoop 都是单线程的,通过 Selector 进行事件循环。当客户端发起 I/O 读写事件时,服务端 EventLoop 会进行数据的读取,然后通过 Pipeline 触发各种监听器进行数据的加工处理。客户端数据会被传递到 ChannelPipeline 的第一个 ChannelInboundHandler 中,数据处理完成后,将加工完成的数据传递给下一个 ChannelInboundHandler。当数据写回客户端时,会将处理结果在 ChannelPipeline 的 ChannelOutboundHandler 中传播,最后到达客户端。

以上便是 Netty 各个组件的整体交互流程,你只需要对每个组件的工作职责有所了解,心中可以串成一条流水线即可,具体每个组件的实现原理后续课程我们会深入介绍。

Netty 源码结构

Netty 源码分为多个模块,模块之间职责划分非常清楚。如同上文整体功能模块一样,Netty 源码模块的划分也是基本契合的。

我们不仅可以使用 Netty all-in-one 的 Jar 包,也可以单独使用其中某些工具包。下面我根据 Netty 的分层结构以及实际的业务场景具体介绍 Netty 中常用的工具包。

Core 核心层模块

netty-common模块是 Netty 的核心基础包,提供了丰富的工具类,其他模块都需要依赖它。在 common 模块中,常用的包括通用工具类和自定义并发包。

通用工具类:比如定时器工具 TimerTask、时间轮 HashedWheelTimer 等。自定义并发包:比如异步模型****Future Promise、相比 JDK 增强的 FastThreadLocal 等。

在netty-buffer 模块中Netty自己实现了的一个更加完备的ByteBuf 工具类,用于网络通信中的数据载体。由于人性化的 Buffer API 设计,它已经成为 Java ByteBuffer 的完美替代品。ByteBuf 的动态性设计不仅解决了 ByteBuffer 长度固定造成的内存浪费问题,而且更安全地更改了 Buffer 的容量。此外 Netty 针对 ByteBuf 做了很多优化,例如缓存池化、减少数据拷贝的 CompositeByteBuf 等。

netty-resover模块主要提供了一些有关基础设施的解析工具,包括 IP Address、Hostname、DNS 等。

Protocol Support 协议支持层模块

netty-codec模块主要负责编解码工作,通过编解码实现原始字节数据与业务实体对象之间的相互转化。如下图所示,Netty 支持了大多数业界主流协议的编解码器,如 HTTP、HTTP2、Redis、XML 等,为开发者节省了大量的精力。此外该模块提供了抽象的编解码类 ByteToMessageDecoder 和 MessageToByteEncoder,通过继承这两个类我们可以轻松实现自定义的编解码逻辑。

netty-handler模块主要负责数据处理工作。Netty 中关于数据处理的部分,本质上是一串有序 handler 的集合。netty-handler 模块提供了开箱即用的 ChannelHandler 实现类,例如日志、IP 过滤、流量整形等,如果你需要这些功能,仅需在 pipeline 中加入相应的 ChannelHandler 即可。

Transport Service 传输服务层模块

netty-transport 模块可以说是 Netty 提供数据处理和传输的核心模块。该模块提供了很多非常重要的接口,如 Bootstrap、Channel、ChannelHandler、EventLoop、EventLoopGroup、ChannelPipeline 等。其中 Bootstrap 负责客户端或服务端的启动工作,包括创建、初始化 Channel 等;EventLoop 负责向注册的 Channel 发起 I/O 读写操作;ChannelPipeline 负责 ChannelHandler 的有序编排,这些组件在介绍 Netty 逻辑架构的时候都有所涉及。

四、Reactor线程模型

网络框架的设计离不开 I/O 线程模型,线程模型的优劣直接决定了系统的吞吐量、可扩展性、安全性等。目前主流的网络框架几乎都采用了 I/O 多路复用的方案。Reactor 模式作为其中的事件分发器,负责将读写事件分发给对应的读写事件处理者。大名鼎鼎的 Java 并发包作者 Doug Lea,在 Scalable I/O in Java 一文中阐述了服务端开发中 I/O 模型的演进过程。Netty 中三种 Reactor 线程模型也来源于这篇经典文章。下面我们对这三种 Reactor 线程模型做一个详细的分析。

单线程模型

上图描述了 Reactor 的单线程模型结构,在 Reactor 单线程模型中,所有 I/O 操作(包括连接建立、数据读写、事件分发等),都是由一个线程完成的。单线程模型逻辑简单,缺陷也十分明显:

一个线程支持处理的连接数非常有限,CPU 很容易打满,性能方面有明显瓶颈;当多个事件被同时触发时,只要有一个事件没有处理完,其他后面的事件就无法执行,这就会造成消息积压及请求超时;线程在处理 I/O 事件时,Select 无法同时处理连接建立、事件分发等操作;如果 I/O 线程一直处于满负荷状态,很可能造成服务端节点不可用。多线程模型

由于单线程模型有性能方面的瓶颈,多线程模型作为解决方案就应运而生了。Reactor 多线程模型将业务逻辑交给多个线程进行处理。除此之外,多线程模型其他的操作与单线程模型是类似的,例如读取数据依然保留了串行化的设计。当客户端有数据发送至服务端时,Select 会监听到可读事件,数据读取完毕后提交到业务线程池中并发处理。

主从多线程模型

主从多线程模型由多个 Reactor 线程组成,每个 Reactor 线程都有独立的 Selector 对象。MainReactor 仅负责处理客户端连接的 Accept 事件,连接建立成功后将新创建的连接对象注册至 SubReactor。再由 SubReactor 分配线程池中的 I/O 线程与其连接绑定,它将负责连接生命周期内所有的 I/O 事件。

Netty 推荐使用主从多线程模型,这样就可以轻松达到成千上万规模的客户端连接。在海量客户端并发请求的场景下,主从多线程模式甚至可以适当增加 SubReactor 线程的数量,从而利用多核能力提升系统的吞吐量。

介绍了上述三种 Reactor 线程模型,再结合它们各自的架构图,我们能大致总结出 Reactor 线程模型运行机制的四个步骤,分别为连接注册、事件轮询、事件分发、任务处理,如下图所示。

连接注册:Channel 建立后,注册至 Reactor 线程中的 Selector 选择器。事件轮询:轮询 Selector 选择器中已注册的所有 Channel 的 I/O 事件。事件分发:为准备就绪的 I/O 事件分配相应的处理线程。任务处理:Reactor 线程还负责任务队列中的非 I/O 任务,每个 Worker 线程从各自维护的任务队列中取出任务异步执行。

以上介绍了 Reactor 线程模型的演进过程和基本原理,Netty 也同样遵循 Reactor 线程模型的运行机制,下面我们来了解一下 Netty 是如何实现 Reactor 线程模型的。

Netty EventLoop 实现原理EventLoop 是什么

EventLoop 这个概念其实并不是 Netty 独有的,它是一种事件等待和处理的程序模型,可以解决多线程资源消耗高的问题。例如 Node.js 就采用了 EventLoop 的运行机制,不仅占用资源低,而且能够支撑了大规模的流量访问。

下图展示了 EventLoop 通用的运行模式。每当事件发生时,应用程序都会将产生的事件放入事件队列当中,然后 EventLoop 会轮询从队列中取出事件执行或者将事件分发给相应的事件监听者执行。事件执行的方式通常分为立即执行、延后执行、定期执行几种。

Netty 如何实现 EventLoop

在 Netty 中 EventLoop 可以理解为

Reactor 线程模型的事件处理引擎每个 EventLoop 线程都维护一个 Selector 选择器和任务队列 taskQueue它主要负责处理 I/O 事件、普通任务和定时任务

Netty 中推荐使用 NioEventLoop 作为实现类,那么 Netty 是如何实现 NioEventLoop 的呢?首先我们来看 NioEventLoop 最核心的 run() 方法源码,本节课我们不会对源码做深入的分析,只是先了解 NioEventLoop 的实现结构。

protected void run() {    for (;;) {        try {            try {                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {                case SelectStrategy.CONTINUE:                    continue;                case SelectStrategy.BUSY_WAIT:                case SelectStrategy.SELECT:                    select(wakenUp.getAndSet(false)); // 轮询 I/O 事件if (wakenUp.get()) {                        selector.wakeup();                    }                default:                }            } catch (IOException e) {                rebuildSelector0();                handleLoopException(e);                continue;            }            cancelledKeys = 0;            needsToSelectAgain = false;            final int ioRatio = this.ioRatio;            if (ioRatio == 100) {                try {                    processSelectedKeys(); // 处理 I/O 事件                } finally {                    runAllTasks(); // 处理所有任务                }            } else {                final long ioStartTime = System.nanoTime();                try {                    processSelectedKeys(); // 处理 I/O 事件                } finally {                    final long ioTime = System.nanoTime() - ioStartTime;                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio); // 处理完 I/O 事件,再处理异步任务队列                }            }        } catch (Throwable t) {            handleLoopException(t);        }        try {            if (isShuttingDown()) {                closeAll();                if (confirmShutdown()) {                    return;                }            }        } catch (Throwable t) {            handleLoopException(t);        }    }}

上述源码的结构比较清晰,NioEventLoop 每次循环的处理流程都包含事件轮询 select、事件处理 processSelectedKeys、任务处理 runAllTasks 几个步骤,是典型的 Reactor 线程模型的运行机制。而且 Netty 提供了一个参数 ioRatio,可以调整 I/O 事件处理和任务处理的时间比例。下面我们将着重从事件处理任务处理两个核心部分出发,详细介绍 Netty EventLoop 的实现原理。

事件处理机制

结合 Netty 的整体架构,我们一起看下 EventLoop 的事件流转图,以便更好地理解 Netty EventLoop 的设计原理。NioEventLoop 的事件处理机制采用的是无锁串行化的设计思路

BossEventLoopGroupWorkerEventLoopGroup 包含一个或者多个 NioEventLoop。BossEventLoopGroup 负责监听客户端的 Accept 事件,当事件触发时,将事件注册至 WorkerEventLoopGroup 中的一个 NioEventLoop 上。每新建一个 Channel, 只选择一个 NioEventLoop 与其绑定。所以说 Channel 生命周期的所有事件处理都是线程独立的,不同的 NioEventLoop 线程之间不会发生任何交集。NioEventLoop 完成数据读取后,会调用绑定的 ChannelPipeline 进行事件传播,ChannelPipeline 也是线程安全的,数据会被传递到 ChannelPipeline 的第一个 ChannelHandler 中。数据处理完成后,将加工完成的数据再传递给下一个 ChannelHandler,整个过程是串行化执行,不会发生线程上下文切换的问题。

NioEventLoop 无锁串行化的设计不仅使系统吞吐量达到最大化,而且降低了用户开发业务逻辑的难度,不需要花太多精力关心线程安全问题。虽然单线程执行避免了线程切换,但是它的缺陷就是不能执行时间过长的 I/O 操作,一旦某个 I/O 事件发生阻塞,那么后续的所有 I/O 事件都无法执行,甚至造成事件积压。在使用 Netty 进行程序开发时,我们一定要对 ChannelHandler 的实现逻辑有充分的风险意识。

NioEventLoop 线程的可靠性至关重要,一旦 NioEventLoop 发生阻塞或者陷入空轮询,就会导致整个系统不可用。在 JDK 中, Epoll 的实现是存在漏洞的,即使 Selector 轮询的事件列表为空,NIO 线程一样可以被唤醒,导致 CPU 100% 占用。这就是臭名昭著的 JDK epoll 空轮询的 Bug。Netty 作为一个高性能、高可靠的网络框架,需要保证 I/O 线程的安全性。那么它是如何解决 JDK epoll 空轮询的 Bug 呢?实际上 Netty 并没有从根源上解决该问题,而是巧妙地规避了这个问题。

我们抛开其他细枝末节,直接定位到事件轮询 select() 方法中的最后一部分代码,一起看下 Netty 是如何解决 epoll 空轮询的 Bug。

long time = System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {    selectCnt = 1;} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {    selector = selectRebuildSelector(selectCnt);    selectCnt = 1;    break;}

Netty 提供了一种检测机制判断线程是否可能陷入空轮询,具体的实现方式如下:

每次执行 Select 操作之前记录当前时间 currentTimeNanos。time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos,如果事件轮询的持续时间大于等于 timeoutMillis,那么说明是正常的,否则表明阻塞时间并未达到预期,可能触发了空轮询的 Bug。Netty 引入了计数变量 selectCnt。在正常情况下,selectCnt 会重置,否则会对 selectCnt 自增计数。当 selectCnt 达到 SELECTOR_AUTO_REBUILD_THRESHOLD(默认512) 阈值时,会触发重建 Selector 对象。

Netty 采用这种方法巧妙地规避了 JDK Bug。异常的 Selector 中所有的 SelectionKey 会重新注册到新建的 Selector 上,重建完成之后异常的 Selector 就可以废弃了。

任务处理机制

NioEventLoop 不仅负责处理 I/O 事件,还要兼顾执行任务队列中的任务。任务队列遵循 FIFO 规则,可以保证任务执行的公平性。NioEventLoop 处理的任务类型基本可以分为三类。

普通任务:通过 NioEventLoop 的 execute() 方法向任务队列 taskQueue 中添加任务。例如 Netty 在写数据时会封装 WriteAndFlushTask 提交给 taskQueue。taskQueue 的实现类是多生产者单消费者队列 MpscChunkedArrayQueue,在多线程并发添加任务时,可以保证线程安全。定时任务:通过调用 NioEventLoop 的 schedule() 方法向定时任务队列 scheduledTaskQueue 添加一个定时任务,用于周期性执行该任务。例如,心跳消息发送等。定时任务队列 scheduledTaskQueue 采用优先队列 PriorityQueue 实现。尾部队列:tailTasks 相比于普通任务队列优先级较低,在每次执行完 taskQueue 中任务后会去获取尾部队列中任务执行。尾部任务并不常用,主要用于做一些收尾工作,例如统计事件循环的执行时间、监控信息上报等。

下面结合任务处理 runAllTasks 的源码结构,分析下 NioEventLoop 处理任务的逻辑,源码实现如下:

protected boolean runAllTasks(long timeoutNanos) {    // 1. 合并定时任务到普通任务队列    fetchFromScheduledTaskQueue();    // 2. 从普通任务队列中取出任务    Runnable task = pollTask();    if (task == null) {        afterRunningAllTasks();        return false;    }    // 3. 计算任务处理的超时时间final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;    long runTasks = 0;    long lastExecutionTime;    for (;;) {        // 4. 安全执行任务        safeExecute(task);        runTasks ++;        // 5. 每执行 64 个任务检查一下是否超时if ((runTasks & 0x3F) == 0) {            lastExecutionTime = ScheduledFutureTask.nanoTime();            if (lastExecutionTime >= deadline) {                break;            }        }        task = pollTask();        if (task == null) {            lastExecutionTime = ScheduledFutureTask.nanoTime();            break;        }    }    // 6. 收尾工作    afterRunningAllTasks();    this.lastExecutionTime = lastExecutionTime;    return true;}

我在代码中以注释的方式标注了具体的实现步骤,可以分为 6 个步骤。

fetchFromScheduledTaskQueue 函数:将定时任务从 scheduledTaskQueue 中取出,聚合放入普通任务队列 taskQueue 中,只有定时任务的截止时间小于当前时间才可以被合并。从普通任务队列 taskQueue 中取出任务。计算任务执行的最大超时时间。safeExecute 函数:安全执行任务,实际直接调用的 Runnable 的 run() 方法。每执行 64 个任务进行超时时间的检查,如果执行时间大于最大超时时间,则立即停止执行任务,避免影响下一轮的 I/O 事件的处理。最后获取尾部队列中的任务执行。EventLoop 最佳实践

在日常开发中用好 EventLoop 至关重要,这里结合实际工作中的经验给出一些 EventLoop 的最佳实践方案。

网络连接建立过程中三次握手、安全认证的过程会消耗不少时间。这里建议采用 Boss 和 Worker 两个 EventLoopGroup,有助于分担 Reactor 线程的压力。由于 Reactor 线程模式适合处理耗时短的任务场景,对于耗时较长的 ChannelHandler 可以考虑维护一个业务线程池,将编解码后的数据封装成 Task 进行异步处理,避免 ChannelHandler 阻塞而造成 EventLoop 不可用。如果业务逻辑执行时间较短,建议直接在 ChannelHandler 中执行。例如编解码操作,这样可以避免过度设计而造成架构的复杂性。不宜设计过多的 ChannelHandler。对于系统性能和可维护性都会存在问题,在设计业务架构的时候,需要明确业务分层和 Netty 分层之间的界限。不要一味地将业务逻辑都添加到 ChannelHandler 中。五、ChannelPipeline服务编排ChannelPipeline 概述

Pipeline 的字面意思是管道、流水线。它在 Netty 中起到的作用,和一个工厂的流水线类似。原始的网络字节流经过 Pipeline ,被一步步加工包装,最后得到加工后的成品。经过前面课程核心组件的初步学习,我们已经对 ChannelPipeline 有了初步的印象:它是 Netty 的核心处理链,用以实现网络事件的动态编排和有序传播。

今天我们将从以下几个方面一起探讨 ChannelPipeline 的实现原理:

ChannelPipeline 内部结构;ChannelHandler 接口设计;ChannelPipeline 事件传播机制;ChannelPipeline 异常传播机制。ChannelPipeline 内部结构

首先我们要理清楚 ChannelPipeline 的内部结构是什么样子,这样才能理解 ChannelPipeline 的处理流程。ChannelPipeline 作为 Netty 的核心编排组件,负责调度各种类型的 ChannelHandler,实际数据的加工处理操作则是由 ChannelHandler 完成的。

ChannelPipeline 可以看作是 ChannelHandler 的容器载体,它是由一组 ChannelHandler 实例组成的,内部通过双向链表将不同的 ChannelHandler 链接在一起,如下图所示。当有 I/O 读写事件触发时,ChannelPipeline 会依次调用 ChannelHandler 列表对 Channel 的数据进行拦截和处理。

由上图可知,每个 Channel 会绑定一个 ChannelPipeline,每一个 ChannelPipeline 都包含多个 ChannelHandlerContext,所有 ChannelHandlerContext 之间组成了双向链表。又因为每个 ChannelHandler 都对应一个 ChannelHandlerContext,所以实际上 ChannelPipeline 维护的是它与 ChannelHandlerContext 的关系。那么你可能会有疑问,为什么这里会多一层 ChannelHandlerContext 的封装呢?

其实这是一种比较常用的编程思想。ChannelHandlerContext 用于保存 ChannelHandler 上下文;ChannelHandlerContext 则包含了 ChannelHandler 生命周期的所有事件,如 connect、bind、read、flush、write、close 等。可以试想一下,如果没有 ChannelHandlerContext 的这层封装,那么我们在做 ChannelHandler 之间传递的时候,前置后置的通用逻辑就要在每个 ChannelHandler 里都实现一份。这样虽然能解决问题,但是代码结构的耦合,会非常不优雅。

根据网络数据的流向,ChannelPipeline 分为入站 ChannelInboundHandler 和出站 ChannelOutboundHandler 两种处理器。在客户端与服务端通信的过程中,数据从客户端发向服务端的过程叫出站,反之称为入站。数据先由一系列 InboundHandler 处理后入站,然后再由相反方向的 OutboundHandler 处理完成后出站,如下图所示。我们经常使用的解码器 Decoder 就是入站操作,编码器 Encoder 就是出站操作。服务端接收到客户端数据需要先经过 Decoder 入站处理后,再通过 Encoder 出站通知客户端。

接下来我们详细分析下 ChannelPipeline 双向链表的构造,ChannelPipeline 的双向链表分别维护了 HeadContext 和 TailContext 的头尾节点。我们自定义的 ChannelHandler 会插入到 Head 和 Tail 之间,这两个节点在 Netty 中已经默认实现了,它们在 ChannelPipeline 中起到了至关重要的作用。首先我们看下 HeadContext 和 TailContext 的继承关系,如下图所示。

HeadContext 既是 Inbound 处理器,也是 Outbound 处理器。它分别实现了 ChannelInboundHandler 和 ChannelOutboundHandler。网络数据写入操作的入口就是由 HeadContext 节点完成的。HeadContext 作为 Pipeline 的头结点负责读取数据并开始传递 InBound 事件,当数据处理完成后,数据会反方向经过 Outbound 处理器,最终传递到 HeadContext,所以 HeadContext 又是处理 Outbound 事件的最后一站。此外 HeadContext 在传递事件之前,还会执行一些前置操作。

TailContext 只实现了 ChannelInboundHandler 接口。它会在 ChannelInboundHandler 调用链路的最后一步执行,主要用于终止 Inbound 事件传播,例如释放 Message 数据资源等。TailContext 节点作为 OutBound 事件传播的第一站,仅仅是将 OutBound 事件传递给上一个节点。

从整个 ChannelPipeline 调用链路来看,如果由 Channel 直接触发事件传播,那么调用链路将贯穿整个 ChannelPipeline。然而也可以在其中某一个 ChannelHandlerContext 触发同样的方法,这样只会从当前的 ChannelHandler 开始执行事件传播,该过程不会从头贯穿到尾,在一定场景下,可以提高程序性能。

ChannelHandler 接口设计

在学习 ChannelPipeline 事件传播机制之前,我们需要了解 I/O 事件的生命周期。整个 ChannelHandler 是围绕 I/O 事件的生命周期所设计的,例如建立连接、读数据、写数据、连接销毁等。ChannelHandler 有两个重要的子接口ChannelInboundHandlerChannelOutboundHandler,分别拦截入站和出站的各种 I/O 事件

ChannelInboundHandler 的事件回调方法与触发时机。

事件回调方法

触发时机

channelRegistered

Channel 被注册到 EventLoop

channelUnregistered

Channel 从 EventLoop 中取消注册

channelActive

Channel 处于就绪状态,可以被读写

channelInactive

Channel 处于非就绪状态Channel 可以从远端读取到数据

channelRead

Channel 可以从远端读取到数据

channelReadComplete

Channel 读取数据完成

userEventTriggered

用户事件触发时

channelWritabilityChanged

Channel 的写状态发生变化

ChannelOutboundHandler 的事件回调方法与触发时机。

ChannelOutboundHandler 的事件回调方法非常清晰,直接通过 ChannelOutboundHandler 的接口列表可以看到每种操作所对应的回调方法,如下图所示。这里每个回调方法都是在相应操作执行之前触发,在此就不多做赘述了。此外 ChannelOutboundHandler 中绝大部分接口都包含ChannelPromise 参数,以便于在操作完成时能够及时获得通知。

事件传播机制

在上文中我们介绍了 ChannelPipeline 可分为入站 ChannelInboundHandler 和出站 ChannelOutboundHandler 两种处理器,与此对应传输的事件类型可以分为Inbound 事件Outbound 事件

我们通过一个代码示例,一起体验下 ChannelPipeline 的事件传播机制。

serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {    @Override    public void initChannel(SocketChannel ch) {        ch.pipeline()                .addLast(new SampleInBoundHandler("SampleInBoundHandlerA", false))                .addLast(new SampleInBoundHandler("SampleInBoundHandlerB", false))                .addLast(new SampleInBoundHandler("SampleInBoundHandlerC", true));        ch.pipeline()                .addLast(new SampleOutBoundHandler("SampleOutBoundHandlerA"))                .addLast(new SampleOutBoundHandler("SampleOutBoundHandlerB"))                .addLast(new SampleOutBoundHandler("SampleOutBoundHandlerC"));    }}public class SampleInBoundHandler extends ChannelInboundHandlerAdapter {    private final String name;    private final boolean flush;    public SampleInBoundHandler(String name, boolean flush) {        this.name = name;        this.flush = flush;    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("InBoundHandler: " + name);        if (flush) {            ctx.channel().writeAndFlush(msg);        } else {            super.channelRead(ctx, msg);        }    }}public class SampleOutBoundHandler extends ChannelOutboundHandlerAdapter {    private final String name;    public SampleOutBoundHandler(String name) {        this.name = name;    }    @Override    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {        System.out.println("OutBoundHandler: " + name);        super.write(ctx, msg, promise);    }}

通过 Pipeline 的 addLast 方法分别添加了三个 InboundHandler 和 OutboundHandler,添加顺序都是 A -> B -> C,下图可以表示初始化后 ChannelPipeline 的内部结构。

当客户端向服务端发送请求时,会触发 SampleInBoundHandler 调用链的 channelRead 事件。经过 SampleInBoundHandler 调用链处理完成后,在 SampleInBoundHandlerC 中会调用 writeAndFlush 方法向客户端写回数据,此时会触发 SampleOutBoundHandler 调用链的 write 事件。最后我们看下代码示例的控制台输出:

由此可见,Inbound 事件和 Outbound 事件的传播方向是不一样的。Inbound 事件的传播方向为 Head -> Tail,而 Outbound 事件传播方向是 Tail -> Head,两者恰恰相反。在 Netty 应用编程中一定要理清楚事件传播的顺序。推荐你在系统设计时模拟客户端和服务端的场景画出 ChannelPipeline 的内部结构图,以避免搞混调用关系。

异常传播机制

ChannelPipeline 事件传播的实现采用了经典的责任链模式,调用链路环环相扣。那么如果有一个节点处理逻辑异常会出现什么现象呢?我们通过修改 SampleInBoundHandler 的实现来模拟业务逻辑异常:

public class SampleInBoundHandler extends ChannelInboundHandlerAdapter {    private final String name;    private final boolean flush;    public SampleInBoundHandler(String name, boolean flush) {        this.name = name;        this.flush = flush;    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) {        System.out.println("InBoundHandler: " + name);        if (flush) {            ctx.channel().writeAndFlush(msg);        } else {            throw new RuntimeException("InBoundHandler: " + name);        }    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        System.out.println("InBoundHandlerException: " + name);        ctx.fireExceptionCaught(cause);    }}

在 channelRead 事件处理中,第一个 A 节点就会抛出 RuntimeException。同时我们重写了 ChannelInboundHandlerAdapter 中的 exceptionCaught 方法,只是在开头加上了控制台输出,方便观察异常传播的行为。下面看一下代码运行的控制台输出结果:

由输出结果可以看出 ctx.fireExceptionCaugh 会将异常按顺序从 Head 节点传播到 Tail 节点。如果用户没有对异常进行拦截处理,最后将由 Tail 节点统一处理,在 TailContext 源码中可以找到具体实现:

protected void onUnhandledInboundException(Throwable cause) {    try {        logger.warn(                "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +                        "It usually means the last handler in the pipeline did not handle the exception.",                cause);    } finally {        ReferenceCountUtil.release(cause);    }}

虽然 Netty 中 TailContext 提供了兜底的异常处理逻辑,但是在很多场景下,并不能满足我们的需求。假如你需要拦截指定的异常类型,并做出相应的异常处理,应该如何实现呢?我们接着往下看。

异常处理的最佳实践

在 Netty 应用开发的过程中,良好的异常处理机制会让排查问题的过程事半功倍。所以推荐用户对异常进行统一拦截,然后根据实际业务场景实现更加完善的异常处理机制。通过异常传播机制的学习,我们应该可以想到最好的方法是在 ChannelPipeline 自定义处理器的末端添加统一的异常处理器,此时 ChannelPipeline 的内部结构如下图所示。

用户自定义的异常处理器代码示例如下:

public class ExceptionHandler extends ChannelDuplexHandler {    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        if (cause instanceof RuntimeException) {            System.out.println("Handle Business Exception Success.");        }    }}

加入统一的异常处理器后,可以看到异常已经被优雅地拦截并处理掉了。这也是 Netty 推荐的最佳异常处理实践。

六、拆包/粘包为什么有拆包/粘包

TCP 传输协议是面向流的,没有数据包界限。客户端向服务端发送数据时,可能将一个完整的报文拆分成多个小报文进行发送,也可能将多个报文合并成一个大的报文进行发送。因此就有了拆包和粘包。

为什么会出现拆包/粘包现象呢?在网络通信的过程中,每次可以发送的数据包大小是受多种因素限制的,如 MTU 传输单元大小、MSS 最大分段大小、滑动窗口等。如果一次传输的网络包数据大小超过传输单元大小,那么我们的数据可能会拆分为多个数据包发送出去。如果每次请求的网络包数据都很小,一共请求了 10000 次,TCP 并不会分别发送 10000 次。因为 TCP 采用的 Nagle 算法对此作出了优化。如果你是一位网络新手,可能对这些概念并不非常清楚。那我们先了解下计算机网络中 MTU、MSS、Nagle 这些基础概念以及它们为什么会造成拆包/粘包问题。

MTU 最大传输单元和 MSS 最大分段大小

MTU(Maxitum Transmission Unit) 是链路层一次最大传输数据的大小。MTU 一般来说大小为 1500 byte。MSS(Maximum Segement Size) 是指 TCP 最大报文段长度,它是传输层一次发送最大数据的大小。如下图所示,MTU 和 MSS 一般的计算关系为:MSS = MTU - IP 首部 - TCP首部,如果 MSS + TCP 首部 + IP 首部 > MTU,那么数据包将会被拆分为多个发送。这就是拆包现象。

滑动窗口

滑动窗口是 TCP 传输层用于流量控制的一种有效措施,也被称为通告窗口。滑动窗口是数据接收方设置的窗口大小,随后接收方会把窗口大小告诉发送方,以此限制发送方每次发送数据的大小,从而达到流量控制的目的。这样数据发送方不需要每发送一组数据就阻塞等待接收方确认,允许发送方同时发送多个数据分组,每次发送的数据都会被限制在窗口大小内。由此可见,滑动窗口可以大幅度提升网络吞吐量。

那么 TCP 报文是怎么确保数据包按次序到达且不丢数据呢?首先,所有的数据帧都是有编号的,TCP 并不会为每个报文段都回复 ACK 响应,它会对多个报文段回复一次 ACK。假设有三个报文段 A、B、C,发送方先发送了B、C,接收方则必须等待 A 报文段到达,如果一定时间内仍未等到 A 报文段,那么 B、C 也会被丢弃,发送方会发起重试。如果已接收到 A 报文段,那么将会回复发送方一次 ACK 确认。

Nagle 算法

Nagle 算法于 1984 年被福特航空和通信公司定义为 TCP/IP 拥塞控制方法。它主要用于解决频繁发送小数据包而带来的网络拥塞问题。试想如果每次需要发送的数据只有 1 字节,加上 20 个字节 IP Header 和 20 个字节 TCP Header,每次发送的数据包大小为 41 字节,但是只有 1 字节是有效信息,这就造成了非常大的浪费。Nagle 算法可以理解为批量发送,也是我们平时编程中经常用到的优化思路,它是在数据未得到确认之前先写入缓冲区,等待数据确认或者缓冲区积攒到一定大小再把数据包发送出去。

Linux 在默认情况下是开启 Nagle 算法的,在大量小数据包的场景下可以有效地降低网络开销。但如果你的业务场景每次发送的数据都需要获得及时响应,那么 Nagle 算法就不能满足你的需求了,因为 Nagle 算法会有一定的数据延迟。你可以通过 Linux 提供的 TCP_NODELAY 参数禁用 Nagle 算法。Netty 中为了使数据传输延迟最小化,就默认禁用了 Nagle 算法,这一点与 Linux 操作系统的默认行为是相反的。

拆包/粘包的解决方案

在客户端和服务端通信的过程中,服务端一次读到的数据大小是不确定的。如上图所示,拆包/粘包可能会出现以下五种情况:

服务端恰巧读到了两个完整的数据包 A 和 B,没有出现拆包/粘包问题;服务端接收到 A 和 B 粘在一起的数据包,服务端需要解析出 A 和 B;服务端收到完整的 A 和 B 的一部分数据包 B-1,服务端需要解析出完整的 A,并等待读取完整的 B 数据包;服务端接收到 A 的一部分数据包 A-1,此时需要等待接收到完整的 A 数据包;数据包 A 较大,服务端需要多次才可以接收完数据包 A。

由于拆包/粘包问题的存在,数据接收方很难界定数据包的边界在哪里,很难识别出一个完整的数据包。所以需要提供一种机制来识别数据包的界限,这也是解决拆包/粘包的唯一方法:定义应用层的通信协议。下面我们一起看下主流协议的解决方案。

消息长度固定

每个数据报文都需要一个固定的长度。当接收方累计读取到固定长度的报文后,就认为已经获得一个完整的消息。当发送方的数据小于固定长度时,则需要空位补齐。

+----+------+------+---+----+| AB | CDEF | GHIJ | K | LM |+----+------+------+---+----+

假设我们的固定长度为 4 字节,那么如上所示的 5 条数据一共需要发送 4 个报文:

+------+------+------+------+| ABCD | EFGH | IJKL | M000 |+------+------+------+------+

消息定长法使用非常简单,但是缺点也非常明显,无法很好设定固定长度的值,如果长度太大会造成字节浪费,长度太小又会影响消息传输,所以在一般情况下消息定长法不会被采用。

特定分隔符

既然接收方无法区分消息的边界,那么我们可以在每次发送报文的尾部加上特定分隔符,接收方就可以根据特殊分隔符进行消息拆分。以下报文根据特定分隔符 \n 按行解析,即可得到 AB、CDEF、GHIJ、K、LM 五条原始报文。

+-------------------------+| AB\nCDEF\nGHIJ\nK\nLM\n |+-------------------------+

由于在发送报文时尾部需要添加特定分隔符,所以对于分隔符的选择一定要避免和消息体中字符相同,以免冲突。否则可能出现错误的消息拆分。比较推荐的做法是将消息进行编码,例如 base64 编码,然后可以选择 64 个编码字符之外的字符作为特定分隔符。特定分隔符法在消息协议足够简单的场景下比较高效,例如大名鼎鼎的 Redis 在通信过程中采用的就是换行分隔符。

消息长度 + 消息内容

消息头     消息体+--------+----------+| Length |  Content |+--------+----------+

消息长度 + 消息内容是项目开发中最常用的一种协议,如上展示了该协议的基本格式。消息头中存放消息的总长度,例如使用 4 字节的 int 值记录消息的长度,消息体实际的二进制的字节数据。接收方在解析数据时,首先读取消息头的长度字段 Len,然后紧接着读取长度为 Len 的字节数据,该数据即判定为一个完整的数据报文。依然以上述提到的原始字节数据为例,使用该协议进行编码后的结果如下所示:

+-----+-------+-------+----+-----+| 2AB | 4CDEF | 4GHIJ | 1K | 2LM |+-----+-------+-------+----+-----+

消息长度 + 消息内容的使用方式非常灵活,且不会存在消息定长法和特定分隔符法的明显缺陷。当然在消息头中不仅只限于存放消息的长度,而且可以自定义其他必要的扩展字段,例如消息版本、算法类型等。

七、通信协议设计

所谓协议,就是通信双方事先商量好的接口暗语,在 TCP 网络编程中,发送方和接收方的数据包格式都是二进制,发送方将对象转化成二进制流发送给接收方,接收方获得二进制数据后需要知道如何解析成对象,所以协议是双方能够正常通信的基础

目前市面上已经有不少通用的协议,例如 HTTP、HTTPS、JSON-RPC、FTP、IMAP、Protobuf 等。通用协议兼容性好,易于维护,各种异构系统之间可以实现无缝对接。如果在满足业务场景以及性能需求的前提下,推荐采用通用协议的方案。相比通用协议,自定义协议主要有以下优点。

极致性能:通用的通信协议考虑了很多兼容性的因素,必然在性能方面有所损失。扩展性:自定义的协议相比通用协议更好扩展,可以更好地满足自己的业务需求。安全性:通用协议是公开的,很多漏洞已经很多被黑客攻破。自定义协议更加安全,因为黑客需要先破解你的协议内容。

那么如何设计自定义的通信协议呢?这个答案见仁见智,但是设计通信协议有经验方法可循。结合实战经验我们一起看下一个完备的网络协议需要具备哪些基本要素。

魔数

魔数是通信双方协商的一个暗号,通常采用固定的几个字节表示。魔数的作用是防止任何人随便向服务器的端口上发送数据。服务端在接收到数据时会解析出前几个固定字节的魔数,然后做正确性比对。如果和约定的魔数不匹配,则认为是非法数据,可以直接关闭连接或者采取其他措施以增强系统的安全防护。魔数的思想在压缩算法、Java Class 文件等场景中都有所体现,例如 Class 文件开头就存储了魔数 0xCAFEBABE,在加载 Class 文件时首先会验证魔数的正确性。

协议版本号

随着业务需求的变化,协议可能需要对结构或字段进行改动,不同版本的协议对应的解析方法也是不同的。所以在生产级项目中强烈建议预留协议版本号这个字段。

序列化算法

序列化算法字段表示数据发送方应该采用何种方法将请求的对象转化为二进制,以及如何再将二进制转化为对象,如 JSON、Hessian、Java 自带序列化等。

报文类型

在不同的业务场景中,报文可能存在不同的类型。例如在 RPC 框架中有请求、响应、心跳等类型的报文,在 IM 即时通信的场景中有登陆、创建群聊、发送消息、接收消息、退出群聊等类型的报文。

长度域字段

长度域字段代表请求数据的长度,接收方根据长度域字段获取一个完整的报文。

请求数据

请求数据通常为序列化之后得到的二进制流,每种请求数据的内容是不一样的。

状态

状态字段用于标识请求是否正常。一般由被调用方设置。例如一次 RPC 调用失败,状态字段可被服务提供方设置为异常状态。

保留字段

保留字段是可选项,为了应对协议升级的可能性,可以预留若干字节的保留字段,以备不时之需。

通过以上协议基本要素的学习,我们可以得到一个较为通用的协议示例:

+---------------------------------------------------------------+| 魔数 2byte | 协议版本号 1byte | 序列化算法 1byte | 报文类型 1byte  |+---------------------------------------------------------------+| 状态 1byte |        保留字段 4byte     |      数据长度 4byte     | +---------------------------------------------------------------+|                   数据内容 (长度不定)                          |+---------------------------------------------------------------+
Netty 如何实现自定义通信协议

在学习完如何设计协议之后,我们又该如何在 Netty 中实现自定义的通信协议呢?其实 Netty 作为一个非常优秀的网络通信框架,已经为我们提供了非常丰富的编解码抽象基类,帮助我们更方便地基于这些抽象基类扩展实现自定义协议。

首先我们看下 Netty 中编解码器是如何分类的。

Netty 常用编码器类型:

MessageToByteEncoder 对象编码成字节流;MessageToMessageEncoder 一种消息类型编码成另外一种消息类型。

Netty 常用解码器类型:

ByteToMessageDecoder/ReplayingDecoder 将字节流解码为消息对象;MessageToMessageDecoder 将一种消息类型解码为另外一种消息类型。

编解码器可以分为一次解码器二次解码器,一次解码器用于解决 TCP 拆包/粘包问题,按协议解析后得到的字节数据。如果你需要对解析后的字节数据做对象模型的转换,这时候便需要用到二次解码器,同理编码器的过程是反过来的。

一次编解码器:MessageToByteEncoder/ByteToMessageDecoder。二次编解码器:MessageToMessageEncoder/MessageToMessageDecoder。

下面我们对 Netty 中常用的抽象编解码类进行详细的介绍。

抽象编码类

通过抽象编码类的继承图可以看出,编码类是 ChanneOutboundHandler 的抽象类实现,具体操作的是 Outbound 出站数据。

MessageToByteEncoder

MessageToByteEncoder 用于将对象编码成字节流,MessageToByteEncoder 提供了唯一的 encode 抽象方法,我们只需要实现encode 方法即可完成自定义编码。那么encode() 方法是在什么时候被调用的呢?我们一起看下MessageToByteEncoder 的核心源码片段,如下所示。

@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {    ByteBuf buf = null;    try {        if (acceptOutboundMessage(msg)) { // 1. 消息类型是否匹配@SuppressWarnings("unchecked")            I cast = (I) msg;            buf = allocateBuffer(ctx, cast, preferDirect); // 2. 分配 ByteBuf 资源try {                encode(ctx, cast, buf); // 3. 执行 encode 方法完成数据编码            } finally {                ReferenceCountUtil.release(cast);            }            if (buf.isReadable()) {                ctx.write(buf, promise); // 4. 向后传递写事件            } else {                buf.release();                ctx.write(Unpooled.EMPTY_BUFFER, promise);            }            buf = null;        } else {            ctx.write(msg, promise);        }    } catch (EncoderException e) {        throw e;    } catch (Throwable e) {        throw new EncoderException(e);    } finally {        if (buf != null) {            buf.release();        }    }}

MessageToByteEncoder 重写了 ChanneOutboundHandler 的 write() 方法,其主要逻辑分为以下几个步骤:

acceptOutboundMessage 判断是否有匹配的消息类型,如果匹配需要执行编码流程,如果不匹配直接继续传递给下一个 ChannelOutboundHandler;分配 ByteBuf 资源,默认使用堆外内存;调用子类实现的 encode 方法完成数据编码,一旦消息被成功编码,会通过调用 ReferenceCountUtil.release(cast) 自动释放;如果 ByteBuf 可读,说明已经成功编码得到数据,然后写入 ChannelHandlerContext 交到下一个节点;如果 ByteBuf 不可读,则释放 ByteBuf 资源,向下传递空的 ByteBuf 对象。

编码器实现非常简单,不需要关注拆包/粘包问题。如下例子,展示了如何将字符串类型的数据写入到 ByteBuf 实例,ByteBuf 实例将传递给 ChannelPipeline 链表中的下一个 ChannelOutboundHandler。

public class StringToByteEncoder extends MessageToByteEncoder<String> {        @Override        protected void encode(ChannelHandlerContext channelHandlerContext, String data, ByteBuf byteBuf) throws Exception {            byteBuf.writeBytes(data.getBytes());        }}
MessageToMessageEncoder

MessageToMessageEncoder 与 MessageToByteEncoder 类似,同样只需要实现 encode 方法。与 MessageToByteEncoder 不同的是,MessageToMessageEncoder 是将一种格式的消息转换为另外一种格式的消息。其中第二个 Message 所指的可以是任意一个对象,如果该对象是 ByteBuf 类型,那么基本上和 MessageToByteEncoder 的实现原理是一致的。此外 MessageToByteEncoder 的输出结果是对象列表,编码后的结果属于中间对象,最终仍然会转化成 ByteBuf 进行传输。

MessageToMessageEncoder 常用的实现子类有 StringEncoder、LineEncoder、Base64Encoder 等。以 StringEncoder 为例看下 MessageToMessageEncoder 的用法。源码示例如下:将 CharSequence 类型(String、StringBuilder、StringBuffer 等)转换成 ByteBuf 类型,结合 StringDecoder 可以直接实现 String 类型数据的编解码。

@Overrideprotected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {    if (msg.length() == 0) {        return;    }    out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset));}
抽象解码类

同样,我们先看下抽象解码类的继承关系图。解码类是 ChanneInboundHandler 的抽象类实现,操作的是 Inbound 入站数据。解码器实现的难度要远大于编码器,因为解码器需要考虑拆包/粘包问题。由于接收方有可能没有接收到完整的消息,所以解码框架需要对入站的数据做缓冲操作,直至获取到完整的消息。

抽象解码类 ByteToMessageDecoder。

首先,我们看下 ByteToMessageDecoder 定义的抽象方法:

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;    protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {        if (in.isReadable()) {            decodeRemovalReentryProtection(ctx, in, out);        }    }}

decode() 是用户必须实现的抽象方法,在该方法在调用时需要传入接收的数据 ByteBuf,及用来添加编码后消息的 List。由于 TCP 粘包问题,ByteBuf 中可能包含多个有效的报文,或者不够一个完整的报文。Netty 会重复回调 decode() 方法,直到没有解码出新的完整报文可以添加到 List 当中,或者 ByteBuf 没有更多可读取的数据为止。如果此时 List 的内容不为空,那么会传递给 ChannelPipeline 中的下一个ChannelInboundHandler。

此外 ByteToMessageDecoder 还定义了 decodeLast() 方法。为什么抽象解码器要比编码器多一个 decodeLast() 方法呢?因为 decodeLast 在 Channel 关闭后会被调用一次,主要用于处理 ByteBuf 最后剩余的字节数据。Netty 中 decodeLast 的默认实现只是简单调用了 decode() 方法。如果有特殊的业务需求,则可以通过重写 decodeLast() 方法扩展自定义逻辑。

ByteToMessageDecoder 还有一个抽象子类是 ReplayingDecoder。它封装了缓冲区的管理,在读取缓冲区数据时,你无须再对字节长度进行检查。因为如果没有足够长度的字节数据,ReplayingDecoder 将终止解码操作。ReplayingDecoder 的性能相比直接使用 ByteToMessageDecoder 要慢,大部分情况下并不推荐使用 ReplayingDecoder。

抽象解码类 MessageToMessageDecoder。

MessageToMessageDecoder 与 ByteToMessageDecoder 作用类似,都是将一种消息类型的编码成另外一种消息类型。与 ByteToMessageDecoder 不同的是 MessageToMessageDecoder 并不会对数据报文进行缓存,它主要用作转换消息模型。比较推荐的做法是使用 ByteToMessageDecoder 解析 TCP 协议,解决拆包/粘包问题。解析得到有效的 ByteBuf 数据,然后传递给后续的 MessageToMessageDecoder 做数据对象的转换,具体流程如下图所示。

通信协议实战

在上述通信协议设计的小节内容中,我们提到了协议的基本要素并给出了一个较为通用的协议示例。下面我们通过 Netty 的编辑码框架实现该协议的解码器,加深我们对 Netty 编解码框架的理解。

在实现协议编码器之前,我们首先需要清楚一个问题:如何判断 ByteBuf 是否存在完整的报文?最常用的做法就是通过读取消息长度 dataLength 进行判断。如果 ByteBuf 的可读数据长度小于 dataLength,说明 ByteBuf 还不够获取一个完整的报文。在该协议前面的消息头部分包含了魔数、协议版本号、数据长度等固定字段,共 14 个字节。固定字段长度和数据长度可以作为我们判断消息完整性的依据,具体编码器实现逻辑示例如下:

/*+---------------------------------------------------------------+| 魔数 2byte | 协议版本号 1byte | 序列化算法 1byte | 报文类型 1byte  |+---------------------------------------------------------------+| 状态 1byte |        保留字段 4byte     |      数据长度 4byte     | +---------------------------------------------------------------+|                   数据内容 (长度不定)                          |+---------------------------------------------------------------+ */ @Override public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {    // 判断 ByteBuf 可读取字节if (in.readableBytes() < 14) {         return;    }    in.markReaderIndex(); // 标记 ByteBuf 读指针位置    in.skipBytes(2); // 跳过魔数    in.skipBytes(1); // 跳过协议版本号byte serializeType = in.readByte();    in.skipBytes(1); // 跳过报文类型    in.skipBytes(1); // 跳过状态字段    in.skipBytes(4); // 跳过保留字段int dataLength = in.readInt();    if (in.readableBytes() < dataLength) {        in.resetReaderIndex(); // 重置 ByteBuf 读指针位置return;    }    byte[] data = new byte[dataLength];    in.readBytes(data);    SerializeService serializeService = getSerializeServiceByType(serializeType);    Object obj = serializeService.deserialize(data);    if (obj != null) {        out.add(obj);    }}
八、解码器固定长度解码器 FixedLengthFrameDecoder

固定长度解码器 FixedLengthFrameDecoder 非常简单,直接通过构造函数设置固定长度的大小 frameLength,无论接收方一次获取多大的数据,都会严格按照 frameLength 进行解码。如果累积读取到长度大小为 frameLength 的消息,那么解码器认为已经获取到了一个完整的消息。如果消息长度小于 frameLength,FixedLengthFrameDecoder 解码器会一直等后续数据包的到达,直至获得完整的消息。下面我们通过一个例子感受一下使用 Netty 实现固定长度解码是多么简单。

public class EchoServer {    public void startEchoServer(int port) throws Exception {        EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap b = new ServerBootstrap();            b.group(bossGroup, workerGroup)                    .channel(NioServerSocketChannel.class)                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Overridepublic void initChannel(SocketChannel ch) {                            ch.pipeline().addLast(new FixedLengthFrameDecoder(10));                            ch.pipeline().addLast(new EchoServerHandler());                        }                    });            ChannelFuture f = b.bind(port).sync();            f.channel().closeFuture().sync();        } finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }    public static void main(String[] args) throws Exception {        new EchoServer().startEchoServer(8088);    }}@Sharablepublic class EchoServerHandler extends ChannelInboundHandlerAdapter {    @Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {        System.out.println("Receive client : [" + ((ByteBuf) msg).toString(CharsetUtil.UTF_8) + "]");    }}

在上述服务端的代码中使用了固定 10 字节的解码器,并在解码之后通过 EchoServerHandler 打印结果。我们可以启动服务端,通过 telnet 命令像服务端发送数据,观察代码输出的结果。

客户端输入:

telnet localhost 8088Trying ::1...Connected to localhost.Escape character is '^]'.1234567890123456789012

服务端输出:

Receive client : [1234567890]Receive client : [12345678]
特殊分隔符解码器 DelimiterBasedFrameDecoder

使用特殊分隔符解码器 DelimiterBasedFrameDecoder 之前我们需要了解以下几个属性的作用。

delimiters

delimiters 指定特殊分隔符,通过写入 ByteBuf 作为参数传入。delimiters 的类型是 ByteBuf 数组,所以我们可以同时指定多个分隔符,但是最终会选择长度最短的分隔符进行消息拆分。

例如接收方收到的数据为:

+--------------+| ABC\nDEF\r\n |+--------------+

如果指定的多个分隔符为 \n 和 \r\n,DelimiterBasedFrameDecoder 会退化成使用 LineBasedFrameDecoder 进行解析,那么会解码出两个消息。

+-----+-----+| ABC | DEF |+-----+-----+

如果指定的特定分隔符只有 \r\n,那么只会解码出一个消息:

+----------+| ABC\nDEF |+----------+
maxLength

maxLength 是报文最大长度的限制。如果超过 maxLength 还没有检测到指定分隔符,将会抛出 TooLongFrameException。可以说 maxLength 是对程序在极端情况下的一种保护措施

failFast

failFast 与 maxLength 需要搭配使用,通过设置 failFast 可以控制抛出 TooLongFrameException 的时机,可以说 Netty 在细节上考虑得面面俱到。如果 failFast=true,那么在超出 maxLength 会立即抛出 TooLongFrameException,不再继续进行解码。如果 failFast=false,那么会等到解码出一个完整的消息后才会抛出 TooLongFrameException。

stripDelimiter

stripDelimiter 的作用是判断解码后得到的消息是否去除分隔符。如果 stripDelimiter=false,特定分隔符为 \n,那么上述数据包解码出的结果为:

+-------+---------+| ABC\n | DEF\r\n |+-------+---------+

下面我们还是结合代码示例学习 DelimiterBasedFrameDecoder 的用法,依然以固定编码器小节中使用的代码为基础稍做改动,引入特殊分隔符解码器 DelimiterBasedFrameDecoder:

b.group(bossGroup, workerGroup)    .channel(NioServerSocketChannel.class)    .childHandler(new ChannelInitializer<SocketChannel>() {        @Overridepublic void initChannel(SocketChannel ch) {            ByteBuf delimiter = Unpooled.copiedBuffer("&".getBytes());            ch.pipeline().addLast(new DelimiterBasedFrameDecoder(10, true, true, delimiter));            ch.pipeline().addLast(new EchoServerHandler());        }    });

我们依然通过 telnet 模拟客户端发送数据,观察代码输出的结果,可以发现由于 maxLength 设置的只有 10,所以在解析到第三个消息时抛出异常。

客户端输入:

telnet localhost 8088Trying ::1...Connected to localhost.Escape character is '^]'.hello&world&1234567890ab

服务端输出:

Receive client : [hello]Receive client : [world]九月 25, 2020 8:46:01 下午 io.netty.channel.DefaultChannelPipeline onUnhandledInboundException警告: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.io.netty.handler.codec.TooLongFrameException: frame length exceeds 10: 13 - discarded        at io.netty.handler.codec.DelimiterBasedFrameDecoder.fail(DelimiterBasedFrameDecoder.java:302)        at io.netty.handler.codec.DelimiterBasedFrameDecoder.decode(DelimiterBasedFrameDecoder.java:268)        at io.netty.handler.codec.DelimiterBasedFrameDecoder.decode(DelimiterBasedFrameDecoder.java:218)
长度域解码器 LengthFieldBasedFrameDecoder

长度域解码器 LengthFieldBasedFrameDecoder 是解决 TCP 拆包/粘包问题最常用的解码器。它基本上可以覆盖大部分基于长度拆包场景,开源消息中间件 RocketMQ 就是使用 LengthFieldBasedFrameDecoder 进行解码的。LengthFieldBasedFrameDecoder 相比 FixedLengthFrameDecoder 和 DelimiterBasedFrameDecoder 要复杂一些,接下来我们就一起学习下这个强大的解码器。

首先我们同样先了解 LengthFieldBasedFrameDecoder 中的几个重要属性,这里我主要把它们分为两个部分:长度域解码器特有属性以及与其他解码器(如特定分隔符解码器)的相似的属性

长度域解码器特有属性。

// 长度字段的偏移量,也就是存放长度数据的起始位置private final int lengthFieldOffset; // 长度字段所占用的字节数private final int lengthFieldLength; /* * 消息长度的修正值 * * 在很多较为复杂一些的协议设计中,长度域不仅仅包含消息的长度,而且包含其他的数据,如版本号、数据类型、数据状态等,那么这时候我们需要使用 lengthAdjustment 进行修正 *  * lengthAdjustment = 包体的长度值 - 长度域的值 * */private final int lengthAdjustment; // 解码后需要跳过的初始字节数,也就是消息内容字段的起始位置private final int initialBytesToStrip;// 长度字段结束的偏移量,lengthFieldEndOffset = lengthFieldOffset + lengthFieldLengthprivate final int lengthFieldEndOffset;
与固定长度解码器和特定分隔符解码器相似的属性。
private final int maxFrameLength; // 报文最大限制长度private final boolean failFast; // 是否立即抛出 TooLongFrameException,与 maxFrameLength 搭配使用private boolean discardingTooLongFrame; // 是否处于丢弃模式private long tooLongFrameLength; // 需要丢弃的字节数private long bytesToDiscard; // 累计丢弃的字节数

下面我们结合具体的示例来解释下每种参数的组合,其实在 Netty LengthFieldBasedFrameDecoder 源码的注释中已经描述得非常详细,一共给出了 7 个场景示例,理解了这些示例基本上可以真正掌握 LengthFieldBasedFrameDecoder 的参数用法。

示例 1:典型的基于消息长度 + 消息内容的解码。

BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)+--------+----------------+      +--------+----------------+| Length | Actual Content |----->| Length | Actual Content || 0x000C | "HELLO, WORLD" |      | 0x000C | "HELLO, WORLD" |+--------+----------------+      +--------+----------------+

上述协议是最基本的格式,报文只包含消息长度 Length 和消息内容 Content 字段,其中 Length 为 16 进制表示,共占用 2 字节,Length 的值 0x000C 代表 Content 占用 12 字节。该协议对应的解码器参数组合如下:

lengthFieldOffset = 0,因为 Length 字段就在报文的开始位置。lengthFieldLength = 2,协议设计的固定长度。lengthAdjustment = 0,Length 字段只包含消息长度,不需要做任何修正。initialBytesToStrip = 0,解码后内容依然是 Length + Content,不需要跳过任何初始字节。

示例 2:解码结果需要截断。

BEFORE DECODE (14 bytes)         AFTER DECODE (12 bytes)+--------+----------------+      +----------------+| Length | Actual Content |----->| Actual Content || 0x000C | "HELLO, WORLD" |      | "HELLO, WORLD" |+--------+----------------+      +----------------+

示例 2 和示例 1 的区别在于解码后的结果只包含消息内容,其他的部分是不变的。该协议对应的解码器参数组合如下:

lengthFieldOffset = 0,因为 Length 字段就在报文的开始位置。lengthFieldLength = 2,协议设计的固定长度。lengthAdjustment = 0,Length 字段只包含消息长度,不需要做任何修正。initialBytesToStrip = 2,跳过 Length 字段的字节长度,解码后 ByteBuf 中只包含 Content字段。

示例 3:长度字段包含消息长度和消息内容所占的字节。

BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)+--------+----------------+      +--------+----------------+| Length | Actual Content |----->| Length | Actual Content || 0x000E | "HELLO, WORLD" |      | 0x000E | "HELLO, WORLD" |+--------+----------------+      +--------+----------------+

与前两个示例不同的是,示例 3 的 Length 字段包含 Length 字段自身的固定长度以及 Content 字段所占用的字节数,Length 的值为 0x000E(2 + 12 = 14 字节),在 Length 字段值(14 字节)的基础上做 lengthAdjustment(-2)的修正,才能得到真实的 Content 字段长度,所以对应的解码器参数组合如下:

lengthFieldOffset = 0,因为 Length 字段就在报文的开始位置。lengthFieldLength = 2,协议设计的固定长度。lengthAdjustment = -2,长度字段为 14 字节,需要减 2 才是拆包所需要的长度。initialBytesToStrip = 0,解码后内容依然是 Length + Content,不需要跳过任何初始字节。

示例 4:基于长度字段偏移的解码。

BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)+----------+----------+----------------+      +----------+----------+----------------+| Header 1 |  Length  | Actual Content |----->| Header 1 |  Length  | Actual Content ||  0xCAFE  | 0x00000C | "HELLO, WORLD" |      |  0xCAFE  | 0x00000C | "HELLO, WORLD" |+----------+----------+----------------+      +----------+----------+----------------+

示例 4 中 Length 字段不再是报文的起始位置,Length 字段的值为 0x00000C,表示 Content 字段占用 12 字节,该协议对应的解码器参数组合如下:

lengthFieldOffset = 2,需要跳过 Header 1 所占用的 2 字节,才是 Length 的起始位置。lengthFieldLength = 3,协议设计的固定长度。lengthAdjustment = 0,Length 字段只包含消息长度,不需要做任何修正。initialBytesToStrip = 0,解码后内容依然是完整的报文,不需要跳过任何初始字节。

示例 5:长度字段与内容字段不再相邻。

BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)+----------+----------+----------------+      +----------+----------+----------------+|  Length  | Header 1 | Actual Content |----->|  Length  | Header 1 | Actual Content || 0x00000C |  0xCAFE  | "HELLO, WORLD" |      | 0x00000C |  0xCAFE  | "HELLO, WORLD" |+----------+----------+----------------+      +----------+----------+----------------+

示例 5 中的 Length 字段之后是 Header 1,Length 与 Content 字段不再相邻。Length 字段所表示的内容略过了 Header 1 字段,所以也需要通过 lengthAdjustment 修正才能得到 Header + Content 的内容。示例 5 所对应的解码器参数组合如下:

lengthFieldOffset = 0,因为 Length 字段就在报文的开始位置。lengthFieldLength = 3,协议设计的固定长度。lengthAdjustment = 2,由于 Header + Content 一共占用 2 + 12 = 14 字节,所以 Length 字段值(12 字节)加上 lengthAdjustment(2 字节)才能得到 Header + Content 的内容(14 字节)。initialBytesToStrip = 0,解码后内容依然是完整的报文,不需要跳过任何初始字节。

示例 6:基于长度偏移和长度修正的解码。

BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)+------+--------+------+----------------+      +------+----------------+| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content || 0xCA | 0x000C | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |+------+--------+------+----------------+      +------+----------------+

示例 6 中 Length 字段前后分为别 HDR1 和 HDR2 字段,各占用 1 字节,所以既需要做长度字段的偏移,也需要做 lengthAdjustment 修正,具体修正的过程与 示例 5 类似。对应的解码器参数组合如下:

lengthFieldOffset = 1,需要跳过 HDR1 所占用的 1 字节,才是 Length 的起始位置。lengthFieldLength = 2,协议设计的固定长度。lengthAdjustment = 1,由于 HDR2 + Content 一共占用 1 + 12 = 13 字节,所以 Length 字段值(12 字节)加上 lengthAdjustment(1)才能得到 HDR2 + Content 的内容(13 字节)。initialBytesToStrip = 3,解码后跳过 HDR1 和 Length 字段,共占用 3 字节。

示例 7:长度字段包含除 Content 外的多个其他字段。

BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)+------+--------+------+----------------+      +------+----------------+| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content || 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |+------+--------+------+----------------+      +------+----------------+

示例 7 与 示例 6 的区别在于 Length 字段记录了整个报文的长度,包含 Length 自身所占字节、HDR1 、HDR2 以及 Content 字段的长度,解码器需要知道如何进行 lengthAdjustment 调整,才能得到 HDR2 和 Content 的内容。所以我们可以采用如下的解码器参数组合:

lengthFieldOffset = 1,需要跳过 HDR1 所占用的 1 字节,才是 Length 的起始位置。lengthFieldLength = 2,协议设计的固定长度。lengthAdjustment = -3,Length 字段值(16 字节)需要减去 HDR1(1 字节) 和 Length 自身所占字节长度(2 字节)才能得到 HDR2 和 Content 的内容(1 + 12 = 13 字节)。initialBytesToStrip = 3,解码后跳过 HDR1 和 Length 字段,共占用 3 字节。

以上 7 种示例涵盖了 LengthFieldBasedFrameDecoder 大部分的使用场景,你是否学会了呢?最后留一个小任务,在上一节课程中我们设计了一个较为通用的协议,如下所示。如何使用长度域解码器 LengthFieldBasedFrameDecoder 完成该协议的解码呢?抓紧自己尝试下吧。

+---------------------------------------------------------------+| 魔数 2byte | 协议版本号 1byte | 序列化算法 1byte | 报文类型 1byte  |+---------------------------------------------------------------+| 状态 1byte |        保留字段 4byte     |      数据长度 4byte     | +---------------------------------------------------------------+|                   数据内容 (长度不定)                          |+---------------------------------------------------------------+
九、writeAndFlush的工作原理Pipeline 事件传播回顾

在介绍 writeAndFlush的工作原理之前,我们首先回顾下 Pipeline 的事件传播机制,因为他们是息息相关的。根据网络数据的流向,ChannelPipeline 分为入站 ChannelInboundHandler 和出站 ChannelOutboundHandler 两种处理器,如下图所示。

当我们从客户端向服务端发送请求,或者服务端向客户端响应请求结果都属于出站处理器 ChannelOutboundHandler 的行为,所以当我们调用 writeAndFlush 时,数据一定会在 Pipeline 中进行传播。

在这里我首先抛出几个问题,学完本节课后可以用于检验下自己是否真的理解了 writeAndFlush 的原理。

writeAndFlush 是如何触发事件传播的?数据是怎样写到 Socket 底层的?为什么会有 write 和 flush 两个动作?执行 flush 之前数据是如何存储的?writeAndFlush 是同步还是异步?它是线程安全的吗?writeAndFlush 事件传播分析

为了便于我们分析 writeAndFlush 的事件传播流程,首先我们通过代码模拟一个最简单的数据出站场景,服务端在接收到客户端的请求后,将响应结果编码后写回客户端。

以下是服务端的启动类,分别注册了三个 ChannelHandler:固定长度解码器 FixedLengthFrameDecoder响应结果编码器 ResponseSampleEncoder业务逻辑处理器 RequestSampleHandler

public class EchoServer {    public void startEchoServer(int port) throws Exception {        EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap b = new ServerBootstrap();            b.group(bossGroup, workerGroup)                    .channel(NioServerSocketChannel.class)                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Overridepublic void initChannel(SocketChannel ch) {                            ch.pipeline().addLast(new FixedLengthFrameDecoder(10));                            ch.pipeline().addLast(new ResponseSampleEncoder());                            ch.pipeline().addLast(new RequestSampleHandler());                        }                    });            ChannelFuture f = b.bind(port).sync();            f.channel().closeFuture().sync();        } finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }    public static void main(String[] args) throws Exception {        new EchoServer().startEchoServer(8088);    }}

其中固定长度解码器 FixedLengthFrameDecoder 是 Netty 自带的解码器,在这里就不做赘述了。下面我们分别看下另外两个 ChannelHandler 的具体实现。

响应结果编码器 ResponseSampleEncoder 用于将服务端的处理结果进行编码,具体的实现逻辑如下:

public class ResponseSampleEncoder extends MessageToByteEncoder<ResponseSample> {    @Overrideprotected void encode(ChannelHandlerContext ctx, ResponseSample msg, ByteBuf out) {        if (msg != null) {            out.writeBytes(msg.getCode().getBytes());            out.writeBytes(msg.getData().getBytes());            out.writeLong(msg.getTimestamp());        }    }}

RequestSampleHandler 主要负责客户端的数据处理,并通过调用 ctx.channel().writeAndFlush 向客户端返回 ResponseSample 对象,其中包含返回码、响应数据以及时间戳。

public class RequestSampleHandler extends ChannelInboundHandlerAdapter {    @Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {        String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);        ResponseSample response = new ResponseSample("OK", data, System.currentTimeMillis());        ctx.channel().writeAndFlush(response);    }}

通过以上的代码示例我们可以描绘出 Pipeline 的链表结构,如下图所示。

那么当 RequestSampleHandler 调用 writeAndFlush 时,数据是如何在 Pipeline 中传播、处理并向客户端发送的呢?下面我们结合该场景对 writeAndFlush 的处理流程做深入的分析。

既然 writeAndFlush 是特有的出站操作,那么我们猜测它是从 Pipeline 的 Tail 节点开始传播的,然后一直向前传播到 Head 节点。我们跟进去 ctx.channel().writeAndFlush 的源码,如下所示,发现 DefaultChannelPipeline 类中果然是调用的 Tail 节点 writeAndFlush 方法。

@Overridepublic final ChannelFuture writeAndFlush(Object msg) {    return tail.writeAndFlush(msg);}

继续跟进 tail.writeAndFlush 的源码,最终会定位到 AbstractChannelHandlerContext 中的 write 方法。该方法是 writeAndFlush 的核心逻辑,具体见以下源码。

private void write(Object msg, boolean flush, ChannelPromise promise) {    // ...... 省略部分非核心代码 ......    // 找到 Pipeline 链表中下一个 Outbound 类型的 ChannelHandler 节点    final AbstractChannelHandlerContext next = findContextOutbound(flush ?            (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);    final Object m = pipeline.touch(msg, next);    EventExecutor executor = next.executor();    // 判断当前线程是否是 NioEventLoop 中的线程    if (executor.inEventLoop()) {        if (flush) {            // 因为 flush == true,所以流程走到这里            next.invokeWriteAndFlush(m, promise);        } else {            next.invokeWrite(m, promise);        }    } else {        final AbstractWriteTask task;        if (flush) {            task = WriteAndFlushTask.newInstance(next, m, promise);        }  else {            task = WriteTask.newInstance(next, m, promise);        }        if (!safeExecute(executor, task, promise, m)) {            task.cancel();        }    }}

首先我们确认下方法的入参,因为我们需要执行 flush 动作,所以 flush == true;write 方法还需要 ChannelPromise 参数,可见写操作是个异步的过程。AbstractChannelHandlerContext 会默认初始化一个 ChannelPromise 完成该异步操作,ChannelPromise 内部持有当前的 Channel 和 EventLoop,此外你可以向 ChannelPromise 中注册回调监听 listener 来获得异步操作的结果。

write 方法的核心逻辑主要分为三个重要步骤,我已经以注释的形式在源码中标注出来。下面我们将结合上文中的 EchoServer 代码示例详细分析 write 方法的执行机制。

第一步,调用 findContextOutbound 方法找到 Pipeline 链表中下一个 Outbound 类型的 ChannelHandler。在我们模拟的场景中下一个 Outbound 节点是 ResponseSampleEncoder。

第二步,通过 inEventLoop 方法判断当前线程的身份标识,如果当前线程和 EventLoop 分配给当前 Channel 的线程是同一个线程的话,那么所提交的任务将被立即执行。否则当前的操作将被封装成一个 Task 放入到 EventLoop 的任务队列,稍后执行。所以 writeAndFlush 是否是线程安全的呢,你心里有答案了吗?

第三步,因为 flush== true,将会直接执行 next.invokeWriteAndFlush(m, promise) 这行代码,我们跟进去源码。发现最终会它会执行下一个 ChannelHandler 节点的 write 方法,那么流程又回到了 到 AbstractChannelHandlerContext 中重复执行 write 方法,继续寻找下一个 Outbound 节点。

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {    if (invokeHandler()) {        invokeWrite0(msg, promise);        invokeFlush0();    } else {        writeAndFlush(msg, promise);    }}private void invokeWrite0(Object msg, ChannelPromise promise) {    try {        ((ChannelOutboundHandler) handler()).write(this, msg, promise);    } catch (Throwable t) {        notifyOutboundHandlerException(t, promise);    }}

为什么 ResponseSampleEncoder 中重写的是 encode 方法,而不是 write 方法?encode 方法又是什么时机被执行的呢?这就回到了《Netty 如何实现自定义通信协议》课程中所介绍的 MessageToByteEncoder 源码。因为我们在实现编码器的时候都会继承 MessageToByteEncoder 抽象类,MessageToByteEncoder 重写了 ChanneOutboundHandler 的 write 方法,其中会调用子类实现的 encode 方法完成数据编码,在这里我们不再赘述。

到目前为止,writeAndFlush 的事件传播流程已经分析完毕,可以看出 Netty 的 Pipeline 设计非常精妙,调用 writeAndFlush 时数据是在 Outbound 类型的 ChannelHandler 节点之间进行传播,那么最终数据是如何写到 Socket 底层的呢?我们一起继续向下分析吧。

写 Buffer 队列

通过上述场景示例分析,我们知道数据将会在 Pipeline 中一直寻找 Outbound 节点并向前传播,直到 Head 节点结束,由 Head 节点完成最后的数据发送。所以 Pipeline 中的 Head 节点在完成 writeAndFlush 过程中扮演着重要的角色。我们直接看下 Head 节点的 write 方法源码:

// HeadContext # write@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {    unsafe.write(msg, promise);}// AbstractChannel # AbstractUnsafe # write@Overridepublic final void write(Object msg, ChannelPromise promise) {    assertEventLoop();    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;    if (outboundBuffer == null) {        safeSetFailure(promise, newClosedChannelException(initialCloseCause));        ReferenceCountUtil.release(msg);        return;    }    int size;    try {        msg = filterOutboundMessage(msg); // 过滤消息        size = pipeline.estimatorHandle().size(msg);        if (size < 0) {            size = 0;        }    } catch (Throwable t) {        safeSetFailure(promise, t);        ReferenceCountUtil.release(msg);        return;    }    outboundBuffer.addMessage(msg, size, promise); // 向 Buffer 中添加数据}

可以看出 Head 节点是通过调用 unsafe 对象完成数据写入的,unsafe 对应的是 NioSocketChannelUnsafe 对象实例,最终调用到 AbstractChannel 中的 write 方法,该方法有两个重要的点需要指出:

filterOutboundMessage 方法会对待写入的 msg 进行过滤,如果 msg 使用的不是 DirectByteBuf,那么它会将 msg 转换成 DirectByteBuf。ChannelOutboundBuffer 可以理解为一个缓存结构,从源码最后一行 outboundBuffer.addMessage 可以看出是在向这个缓存中添加数据,所以 ChannelOutboundBuffer 才是理解数据发送的关键。

writeAndFlush 主要分为两个步骤,write 和 flush。通过上面的分析可以看出只调用 write 方法,数据并不会被真正发送出去,而是存储在 ChannelOutboundBuffer 的缓存内。下面我们重点分析一下 ChannelOutboundBuffer 的内部构造,跟进一下 addMessage 的源码:

public void addMessage(Object msg, int size, ChannelPromise promise) {    Entry entry = Entry.newInstance(msg, size, total(msg), promise);    if (tailEntry == null) {        flushedEntry = null;    } else {        Entry tail = tailEntry;        tail.next = entry;    }    tailEntry = entry;    if (unflushedEntry == null) {        unflushedEntry = entry;    }    incrementPendingOutboundBytes(entry.pendingSize, false);}

ChannelOutboundBuffer 缓存是一个链表结构,每次传入的数据都会被封装成一个 Entry 对象添加到链表中。ChannelOutboundBuffer 包含三个非常重要的指针:第一个被写到缓冲区的节点 flushedEntry、第一个未被写到缓冲区的节点 unflushedEntry和最后一个节点 tailEntry。

在初始状态下这三个指针都指向 NULL,当我们每次调用 write 方法是,都会调用 addMessage 方法改变这三个指针的指向,可以参考下图理解指针的移动过程会更加形象。

第一次调用 write,因为链表里只有一个数据,所以 unflushedEntry 和 tailEntry 指针都指向第一个添加的数据 msg1。flushedEntry 指针在没有触发 flush 动作时会一直指向 NULL。

第二次调用 write,tailEntry 指针会指向新加入的 msg2,unflushedEntry 保持不变。

第 N 次调用 write,tailEntry 指针会不断指向新加入的 msgN,unflushedEntry 依然保持不变,unflushedEntry 和 tailEntry 指针之间的数据都是未写入 Socket 缓冲区的。

以上便是写 Buffer 队列写入数据的实现原理,但是我们不可能一直向缓存中写入数据,所以 addMessage 方法中每次写入数据后都会调用 incrementPendingOutboundBytes 方法判断缓存的水位线,具体源码如下。

private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;private void incrementPendingOutboundBytes(long size, boolean invokeLater) {    if (size == 0) {        return;    }    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);    // 判断缓存大小是否超过高水位线if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {        setUnwritable(invokeLater);    }}

incrementPendingOutboundBytes 的逻辑非常简单,每次添加数据时都会累加数据的字节数,然后判断缓存大小是否超过所设置的高水位线 64KB,如果超过了高水位,那么 Channel 会被设置为不可写状态。直到缓存的数据大小低于低水位线 32KB 以后,Channel 才恢复成可写状态。

有关写数据的逻辑已经分析完了,那么执行 flush 动作缓存又会是什么变化呢?我们接下来一起看下 flush 的工作原理吧。

刷新 Buffer 队列

当执行完 write 写操作之后,invokeFlush0 会触发 flush 动作,与 write 方法类似,flush 方法同样会从 Tail 节点开始传播到 Head 节点,同样我们跟进下 HeadContext 的 flush 源码:

// HeadContext # flush@Overridepublic void flush(ChannelHandlerContext ctx) {    unsafe.flush();}// AbstractChannel # flush@Overridepublic final void flush() {    assertEventLoop();    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;    if (outboundBuffer == null) {        return;    }    outboundBuffer.addFlush();    flush0();}

可以看出 flush 的核心逻辑主要分为两个步骤:addFlush 和 flush0,下面我们逐一对它们进行分析。

首先看下 addFlush 方法的源码:

// ChannelOutboundBuffer # addFlushpublic void addFlush() {    Entry entry = unflushedEntry;    if (entry != null) {        if (flushedEntry == null) {            flushedEntry = entry;        }        do {            flushed ++;            if (!entry.promise.setUncancellable()) {                int pending = entry.cancel();                // 减去待发送的数据,如果总字节数低于低水位,那么 Channel 将变为可写状态                decrementPendingOutboundBytes(pending, false, true);            }            entry = entry.next;        } while (entry != null);        unflushedEntry = null;    }}

addFlush 方法同样也会操作 ChannelOutboundBuffer 缓存数据。在执行 addFlush 方法时,缓存中的指针变化又是如何呢?如下图所示,我们在写入流程的基础上继续进行分析。

此时 flushedEntry 指针有所改变,变更为 unflushedEntry 指针所指向的数据,然后 unflushedEntry 指针指向 NULL,flushedEntry 指针指向的数据才会被真正发送到 Socket 缓冲区。

在 addFlush 源码中 decrementPendingOutboundBytes 与之前 addMessage 源码中的 incrementPendingOutboundBytes 是相对应的。decrementPendingOutboundBytes 主要作用是减去待发送的数据字节,如果缓存的大小已经小于低水位,那么 Channel 会恢复为可写状态。

addFlush 的大体流程我们已经介绍完毕,接下来便是第二步负责发送数据的 flush0 方法。同样我们跟进 flush0 的源码,定位出 flush0 的核心调用链路:

// AbstractNioUnsafe # flush0@Overrideprotected final void flush0() {    if (!isFlushPending()) {        super.flush0();    }}// AbstractNioByteChannel # doWrite@Overrideprotected void doWrite(ChannelOutboundBuffer in) throws Exception {    int writeSpinCount = config().getWriteSpinCount();    do {        Object msg = in.current();        if (msg == null) {            clearOpWrite();            return;        }        writeSpinCount -= doWriteInternal(in, msg);    } while (writeSpinCount > 0);    incompleteWrite(writeSpinCount < 0);}

实际 flush0 的调用层次很深,但其实核心的逻辑在于 AbstractNioByteChannel 的 doWrite 方法,该方法负责将数据真正写入到 Socket 缓冲区。doWrite 方法的处理流程主要分为三步:

第一,根据配置获取自旋锁的次数 writeSpinCount。那么你的疑问就来了,这个自旋锁的次数主要是用来干什么的呢?当我们向 Socket 底层写数据的时候,如果每次要写入的数据量很大,是不可能一次将数据写完的,所以只能分批写入。Netty 在不断调用执行写入逻辑的时候,EventLoop 线程可能一直在等待,这样有可能会阻塞其他事件处理。所以这里自旋锁的次数相当于控制一次写入数据的最大的循环执行次数,如果超过所设置的自旋锁次数,那么写操作将会被暂时中断。

第二,根据自旋锁次数重复调用 doWriteInternal 方法发送数据,每成功发送一次数据,自旋锁的次数 writeSpinCount 减 1,当 writeSpinCount 耗尽,那么 doWrite 操作将会被暂时中断。doWriteInternal 的源码涉及 JDK NIO 底层,在这里我们不再深入展开,它的主要作用在于删除缓存中的链表节点以及调用底层 API 发送数据,有兴趣的同学可以自行研究。

第三,调用 incompleteWrite 方法确保数据能够全部发送出去,因为自旋锁次数的限制,可能数据并没有写完,所以需要继续 OP_WRITE 事件;如果数据已经写完,清除 OP_WRITE 事件即可。

至此,整个 writeAndFlush 的工作原理已经全部分析完了,整个过程的调用层次比较深,我整理了 writeAndFlush 的时序图,如下所示,帮助大家梳理 writeAndFlush 的调用流程,加深对上述知识点的理解。

十、堆外内存

在 Java 中对象都是在堆内分配的,通常我们说的JVM 内存也就指的堆内内存堆内内存完全被JVM 虚拟机所管理,JVM 有自己的垃圾回收算法,对于使用者来说不必关心对象的内存如何回收。

堆外内存与堆内内存相对应,对于整个机器内存而言,除堆内内存以外部分即为堆外内存,如下图所示。堆外内存不受 JVM 虚拟机管理,直接由操作系统管理。

堆外内存和堆内内存各有利弊,这里我针对其中重要的几点进行说明。

堆内内存由 JVM GC 自动回收内存,降低了 Java 用户的使用心智,但是 GC 是需要时间开销成本的,堆外内存由于不受 JVM 管理,所以在一定程度上可以降低 GC 对应用运行时带来的影响。堆外内存需要手动释放,这一点跟 C/C++ 很像,稍有不慎就会造成应用程序内存泄漏,当出现内存泄漏问题时排查起来会相对困难。当进行网络 I/O 操作、文件读写时,堆内内存都需要转换为堆外内存,然后再与底层设备进行交互,这一点在介绍 writeAndFlush 的工作原理中也有提到,所以直接使用堆外内存可以减少一次内存拷贝。堆外内存可以实现进程之间、JVM 多实例之间的数据共享。

由此可以看出,如果你想实现高效的 I/O 操作、缓存常用的对象、降低 JVM GC 压力,堆外内存是一个非常不错的选择。

堆外内存的分配

Java 中堆外内存的分配方式有两种:ByteBuffer#allocateDirectUnsafe#allocateMemory

首先我们介绍下 Java NIO 包中的 ByteBuffer 类的分配方式,使用方式如下:

// 分配 10M 堆外内存ByteBuffer buffer = ByteBuffer.allocateDirect(10 * 1024 * 1024); 

跟进 ByteBuffer.allocateDirect 源码,发现其中直接调用的 DirectByteBuffer 构造函数:

DirectByteBuffer(int cap) {    super(-1, 0, cap, cap);    boolean pa = VM.isDirectMemoryPageAligned();    int ps = Bits.pageSize();    long size = Math.max(1L, (long)cap + (pa ? ps : 0));    Bits.reserveMemory(size, cap);    long base = 0;    try {        base = unsafe.allocateMemory(size);    } catch (OutOfMemoryError x) {        Bits.unreserveMemory(size, cap);        throw x;    }    unsafe.setMemory(base, size, (byte) 0);    if (pa && (base % ps != 0)) {        address = base + ps - (base & (ps - 1));    } else {        address = base;    }    cleaner = Cleaner.create(this, new Deallocator(base, size, cap));    att = null;}

如下图所示,描述了 DirectByteBuffer 的内存引用情况,方便你更好地理解上述源码的初始化过程。在堆内存放的 DirectByteBuffer 对象并不大,仅仅包含堆外内存的地址、大小等属性,同时还会创建对应的 Cleaner 对象,通过 ByteBuffer 分配的堆外内存不需要手动回收,它可以被 JVM 自动回收。当堆内的 DirectByteBuffer 对象被 GC 回收时,Cleaner 就会用于回收对应的堆外内存。

从 DirectByteBuffer 的构造函数中可以看出,真正分配堆外内存的逻辑还是通过 unsafe.allocateMemory(size),接下来我们一起认识下 Unsafe 这个神秘的工具类。

Unsafe 是一个非常不安全的类,它用于执行内存访问、分配、修改等敏感操作,可以越过 JVM 限制的枷锁。Unsafe 最初并不是为开发者设计的,使用它时虽然可以获取对底层资源的控制权,但也失去了安全性的保证,所以使用 Unsafe 一定要慎重。Netty 中依赖了 Unsafe 工具类,是因为 Netty 需要与底层 Socket 进行交互,Unsafe 在提升 Netty 的性能方面起到了一定的帮助。

在 Java 中是不能直接使用 Unsafe 的,但是我们可以通过反射获取 Unsafe 实例,使用方式如下所示。

private static Unsafe unsafe = null;static {    try {        Field getUnsafe = Unsafe.class.getDeclaredField("theUnsafe");        getUnsafe.setAccessible(true);        unsafe = (Unsafe) getUnsafe.get(null);    } catch (NoSuchFieldException | IllegalAccessException e) {        e.printStackTrace();    }}

获得 Unsafe 实例后,我们可以通过 allocateMemory 方法分配堆外内存,allocateMemory 方法返回的是内存地址,使用方法如下所示:

// 分配 10M 堆外内存long address = unsafe.allocateMemory(10 * 1024 * 1024);

与 DirectByteBuffer 不同的是,Unsafe#allocateMemory 所分配的内存必须自己手动释放,否则会造成内存泄漏,这也是 Unsafe 不安全的体现。Unsafe 同样提供了内存释放的操作:

unsafe.freeMemory(address);

到目前为止,我们了解了堆外内存分配的两种方式,对于 Java 开发者而言,常用的是 ByteBuffer.allocateDirect 分配方式,我们平时常说的堆外内存泄漏都与该分配方式有关,接下来我们一起看看使用 ByteBuffer 分配的堆外内存如何被 JVM 回收,这对我们排查堆外内存泄漏问题有较大的帮助。

堆外内存的回收

我们试想这么一种场景,因为 DirectByteBuffer 对象有可能长时间存在于堆内内存,所以它很可能晋升到 JVM 的老年代,所以这时候 DirectByteBuffer 对象的回收需要依赖 Old GC 或者 Full GC 才能触发清理。如果长时间没有 Old GC 或者 Full GC 执行,那么堆外内存即使不再使用,也会一直在占用内存不释放,很容易将机器的物理内存耗尽,这是相当危险的。

那么在使用 DirectByteBuffer 时我们如何避免物理内存被耗尽呢?因为 JVM 并不知道堆外内存是不是已经不足了,所以我们最好通过 JVM 参数 -XX:MaxDirectMemorySize 指定堆外内存的上限大小,当堆外内存的大小超过该阈值时,就会触发一次 Full GC 进行清理回收,如果在 Full GC 之后还是无法满足堆外内存的分配,那么程序将会抛出 OOM 异常。

此外在 ByteBuffer.allocateDirect 分配的过程中,如果没有足够的空间分配堆外内存,在 Bits.reserveMemory 方法中也会主动调用 System.gc() 强制执行 Full GC,但是在生产环境一般都是设置了 -XX:+DisableExplicitGC,System.gc() 是不起作用的,所以依赖 System.gc() 并不是一个好办法。

通过前面堆外内存分配方式的介绍,我们知道 DirectByteBuffer 在初始化时会创建一个 Cleaner 对象,它会负责堆外内存的回收工作,那么 Cleaner 是如何与 GC 关联起来的呢?

Java 对象有四种引用方式:强引用 StrongReference、软引用 SoftReference、弱引用 WeakReference 和虚引用 PhantomReference。其中 PhantomReference 是最不常用的一种引用方式,Cleaner 就属于 PhantomReference 的子类,如以下源码所示,PhantomReference 不能被单独使用,需要与引用队列 ReferenceQueue 联合使用。

public class Cleaner extends java.lang.ref.PhantomReference<java.lang.Object> {    private static final java.lang.ref.ReferenceQueue<java.lang.Object> dummyQueue;    private static sun.misc.Cleaner first;    private sun.misc.Cleaner next;    private sun.misc.Cleaner prev;    private final java.lang.Runnable thunk;    public void clean() {}}

首先我们看下,当初始化堆外内存时,内存中的对象引用情况如下图所示,first 是 Cleaner 类中的静态变量,Cleaner 对象在初始化时会加入 Cleaner 链表中。DirectByteBuffer 对象包含堆外内存的地址、大小以及 Cleaner 对象的引用,ReferenceQueue 用于保存需要回收的 Cleaner 对象。

当发生 GC 时,DirectByteBuffer 对象被回收,内存中的对象引用情况发生了如下变化:

此时 Cleaner 对象不再有任何引用关系,在下一次 GC 时,该 Cleaner 对象将被添加到 ReferenceQueue 中,并执行 clean() 方法。clean() 方法主要做两件事情:

将 Cleaner 对象从 Cleaner 链表中移除;调用 unsafe.freeMemory 方法清理堆外内存。

至此,堆外内存的回收已经介绍完了,下次再排查内存泄漏问题的时候先回顾下这些最基本的知识,做到心中有数。

十一、ByteBuf

我们首先介绍下 JDK NIO 的 ByteBuffer,才能知道 ByteBuffer 有哪些缺陷和痛点。下图展示了 ByteBuffer 的内部结构:

从图中可知,ByteBuffer 包含以下四个基本属性:

mark:为某个读取过的关键位置做标记,方便回退到该位置;position:当前读取的位置;limit:buffer 中有效的数据长度大小;capacity:初始化时的空间容量。

以上四个基本属性的关系是:mark <= position <= limit <= capacity。结合 ByteBuffer 的基本属性,不难理解它在使用上的一些缺陷。

第一,ByteBuffer 分配的长度是固定的,无法动态扩缩容,所以很难控制需要分配多大的容量。如果分配太大容量,容易造成内存浪费;如果分配太小,存放太大的数据会抛出 BufferOverflowException 异常。在使用 ByteBuffer 时,为了避免容量不足问题,你必须每次在存放数据的时候对容量大小做校验,如果超出 ByteBuffer 最大容量,那么需要重新开辟一个更大容量的 ByteBuffer,将已有的数据迁移过去。整个过程相对烦琐,对开发者而言是非常不友好的。

第二,ByteBuffer 只能通过 position 获取当前可操作的位置,因为读写共用的 position 指针,所以需要频繁调用 flip、rewind 方法切换读写状态,开发者必须很小心处理 ByteBuffer 的数据读写,稍不留意就会出错。

ByteBuffer 作为网络通信中高频使用的数据载体,显然不能够满足 Netty 的需求,Netty 重新实现了一个性能更高、易用性更强的 ByteBuf,相比于 ByteBuffer 它提供了很多非常酷的特性:

容量可以按需动态扩展,类似于 StringBuffer;读写采用了不同的指针,读写模式可以随意切换,不需要调用 flip 方法;通过内置的复合缓冲类型可以实现零拷贝;支持引用计数;支持缓存池。

这里我们只是对 ByteBuf 有一个简单的了解,接下来我们就一起看下 ByteBuf 是如何实现的吧。

ByteBuf 内部结构

同样我们看下 ByteBuf 的内部结构,与 ByteBuffer 做一个对比。

从图中可以看出,ByteBuf 包含三个指针:读指针 readerIndex写指针 writeIndex最大容量 maxCapacity,根据指针的位置又可以将 ByteBuf 内部结构可以分为四个部分:

第一部分是废弃字节,表示已经丢弃的无效字节数据。

第二部分是可读字节,表示 ByteBuf 中可以被读取的字节内容,可以通过 writeIndex - readerIndex 计算得出。从 ByteBuf 读取 N 个字节,readerIndex 就会自增 N,readerIndex 不会大于 writeIndex,当 readerIndex == writeIndex 时,表示 ByteBuf 已经不可读。

第三部分是可写字节,向 ByteBuf 中写入数据都会存储到可写字节区域。向 ByteBuf 写入 N 字节数据,writeIndex 就会自增 N,当 writeIndex 超过 capacity,表示 ByteBuf 容量不足,需要扩容。

第四部分是可扩容字节,表示 ByteBuf 最多还可以扩容多少字节,当 writeIndex 超过 capacity 时,会触发 ByteBuf 扩容,最多扩容到 maxCapacity 为止,超过 maxCapacity 再写入就会出错。

由此可见,Netty 重新设计的 ByteBuf 有效地区分了可读、可写以及可扩容数据,解决了 ByteBuffer 无法扩容以及读写模式切换烦琐的缺陷。接下来,我们一起学习下 ByteBuf 的核心 API,你可以把它当作 ByteBuffer 的替代品单独使用。

引用计数

ByteBuf 是基于引用计数设计的,它实现了 ReferenceCounted 接口,ByteBuf 的生命周期是由引用计数所管理。只要引用计数大于 0,表示 ByteBuf 还在被使用;当 ByteBuf 不再被其他对象所引用时,引用计数为 0,那么代表该对象可以被释放。

当新创建一个 ByteBuf 对象时,它的初始引用计数为 1,当 ByteBuf 调用 release() 后,引用计数减 1,所以不要误以为调用了 release() 就会保证 ByteBuf 对象一定会被回收。你可以结合以下的代码示例做验证:

ByteBuf buffer = ctx.alloc().directbuffer();assert buffer.refCnt() == 1;buffer.release();assert buffer.refCnt() == 0;

引用计数对于 Netty 设计缓存池化有非常大的帮助,当引用计数为 0,该 ByteBuf 可以被放入到对象池中,避免每次使用 ByteBuf 都重复创建,对于实现高性能的内存管理有着很大的意义。

此外 Netty 可以利用引用计数的特点实现内存泄漏检测工具。JVM 并不知道 Netty 的引用计数是如何实现的,当 ByteBuf 对象不可达时,一样会被 GC 回收掉,但是如果此时 ByteBuf 的引用计数不为 0,那么该对象就不会释放或者被放入对象池,从而发生了内存泄漏。Netty 会对分配的 ByteBuf 进行抽样分析,检测 ByteBuf 是否已经不可达且引用计数大于 0,判定内存泄漏的位置并输出到日志中,你需要关注日志中 LEAK 关键字。

ByteBuf 分类

ByteBuf 有多种实现类,每种都有不同的特性,下图是 ByteBuf 的家族图谱,可以划分为三个不同的维度:Heap/DirectPooled/UnpooledUnsafe/非 Unsafe,我逐一介绍这三个维度的不同特性。

Heap/Direct 就是堆内和堆外内存。Heap 指的是在 JVM 堆内分配,底层依赖的是字节数据;Direct 则是堆外内存,不受 JVM 限制,分配方式依赖 JDK 底层的 ByteBuffer。

Pooled/Unpooled 表示池化还是非池化内存。Pooled 是从预先分配好的内存中取出,使用完可以放回 ByteBuf 内存池,等待下一次分配。而 Unpooled 是直接调用系统 API 去申请内存,确保能够被 JVM GC 管理回收。

Unsafe/非 Unsafe 的区别在于操作方式是否安全。 Unsafe 表示每次调用 JDK 的 Unsafe 对象操作物理内存,依赖 offset + index 的方式操作数据。非 Unsafe 则不需要依赖 JDK 的 Unsafe 对象,直接通过数组下标的方式操作数据。

ByteBuf 核心 API

我会分为指针操作数据读写内存管理三个方面介绍 ByteBuf 的核心 API。在开始讲解 API 的使用方法之前,先回顾下之前我们实现的自定义解码器,以便于加深对 ByteBuf API 的理解。

public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {    // 判断 ByteBuf 可读取字节    if (in.readableBytes() < 14) {         return;    }    in.markReaderIndex(); // 标记 ByteBuf 读指针位置    in.skipBytes(2); // 跳过魔数    in.skipBytes(1); // 跳过协议版本号    byte serializeType = in.readByte();    in.skipBytes(1); // 跳过报文类型    in.skipBytes(1); // 跳过状态字段    in.skipBytes(4); // 跳过保留字段    int dataLength = in.readInt();    if (in.readableBytes() < dataLength) {        in.resetReaderIndex(); // 重置 ByteBuf 读指针位置        return;    }    byte[] data = new byte[dataLength];    in.readBytes(data);    SerializeService serializeService = getSerializeServiceByType(serializeType);    Object obj = serializeService.deserialize(data);    if (obj != null) {        out.add(obj);    }}
指针操作 APIreaderIndex() & writeIndex()

readerIndex() 返回的是当前的读指针的 readerIndex 位置,writeIndex() 返回的当前写指针 writeIndex 位置。

markReaderIndex() & resetReaderIndex()

markReaderIndex() 用于保存 readerIndex 的位置,resetReaderIndex() 则将当前 readerIndex 重置为之前保存的位置。

这对 API 在实现协议解码时最为常用,例如在上述自定义解码器的源码中,在读取协议内容长度字段之前,先使用 markReaderIndex() 保存了 readerIndex 的位置,如果 ByteBuf 中可读字节数小于长度字段的值,则表示 ByteBuf 还没有一个完整的数据包,此时直接使用 resetReaderIndex() 重置 readerIndex 的位置。

此外对应的写指针操作还有 markWriterIndex() 和 resetWriterIndex(),与读指针的操作类似,我就不再一一赘述了。

数据读写 APIisReadable()

isReadable() 用于判断 ByteBuf 是否可读,如果 writerIndex 大于 readerIndex,那么 ByteBuf 是可读的,否则是不可读状态。

readableBytes()

readableBytes() 可以获取 ByteBuf 当前可读取的字节数,可以通过 writerIndex - readerIndex 计算得到。

readBytes(byte[] dst) & writeBytes(byte[] src)

readBytes() 和 writeBytes() 是两个最为常用的方法。readBytes() 是将 ByteBuf 的数据读取相应的字节到字节数组 dst 中,readBytes() 经常结合 readableBytes() 一起使用,dst 字节数组的大小通常等于 readableBytes() 的大小。

readByte() & writeByte(int value)

readByte() 是从 ByteBuf 中读取一个字节,相应的 readerIndex + 1;同理 writeByte 是向 ByteBuf 写入一个字节,相应的 writerIndex + 1。类似的 Netty 提供了 8 种基础数据类型的读取和写入,例如 readChar()、readShort()、readInt()、readLong()、writeChar()、writeShort()、writeInt()、writeLong() 等,在这里就不详细展开了。

getByte(int index) & setByte(int index, int value)

与 readByte() 和 writeByte() 相对应的还有 getByte() 和 setByte(),get/set 系列方法也提供了 8 种基础类型的读写,那么这两个系列的方法有什么区别呢?read/write 方法在读写时会改变readerIndex 和 writerIndex 指针,而 get/set 方法则不会改变指针位置。

内存管理 APIrelease() & retain()

之前已经介绍了引用计数的基本概念,每调用一次 release() 引用计数减 1,每调用一次 retain() 引用计数加 1。

slice() & duplicate()

slice() 等同于 slice(buffer.readerIndex(), buffer.readableBytes()),默认截取 readerIndex 到 writerIndex 之间的数据,最大容量 maxCapacity 为原始 ByteBuf 的可读取字节数,底层分配的内存、引用计数都与原始的 ByteBuf 共享。

duplicate() 与 slice() 不同的是,duplicate()截取的是整个原始 ByteBuf 信息,底层分配的内存、引用计数也是共享的。如果向 duplicate() 分配出来的 ByteBuf 写入数据,那么都会影响到原始的 ByteBuf 底层数据。

copy()

copy() 会从原始的 ByteBuf 中拷贝所有信息,所有数据都是独立的,向 copy() 分配的 ByteBuf 中写数据不会影响原始的 ByteBuf。

到底为止,ByteBuf 的核心 API 我们基本已经介绍完了,ByteBuf 读写指针分离的小设计,确实带来了很多实用和便利的功能,在开发的过程中不必再去想着 flip、rewind 这种头疼的操作了。

ByteBuf 实战演练

学习完 ByteBuf 的内部构造以及核心 API 之后,我们下面通过一个简单的示例演示一下 ByteBuf 应该如何使用,代码如下所示。

public class ByteBufTest {    public static void main(String[] args) {        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(6, 10);        printByteBufInfo("ByteBufAllocator.buffer(5, 10)", buffer);        buffer.writeBytes(new byte[]{1, 2});        printByteBufInfo("write 2 Bytes", buffer);        buffer.writeInt(100);        printByteBufInfo("write Int 100", buffer);        buffer.writeBytes(new byte[]{3, 4, 5});        printByteBufInfo("write 3 Bytes", buffer);        byte[] read = new byte[buffer.readableBytes()];        buffer.readBytes(read);        printByteBufInfo("readBytes(" + buffer.readableBytes() + ")", buffer);        printByteBufInfo("BeforeGetAndSet", buffer);        System.out.println("getInt(2): " + buffer.getInt(2));        buffer.setByte(1, 0);        System.out.println("getByte(1): " + buffer.getByte(1));        printByteBufInfo("AfterGetAndSet", buffer);    }    private static void printByteBufInfo(String step, ByteBuf buffer) {        System.out.println("------" + step + "-----");        System.out.println("readerIndex(): " + buffer.readerIndex());        System.out.println("writerIndex(): " + buffer.writerIndex());        System.out.println("isReadable(): " + buffer.isReadable());        System.out.println("isWritable(): " + buffer.isWritable());        System.out.println("readableBytes(): " + buffer.readableBytes());        System.out.println("writableBytes(): " + buffer.writableBytes());        System.out.println("maxWritableBytes(): " + buffer.maxWritableBytes());        System.out.println("capacity(): " + buffer.capacity());        System.out.println("maxCapacity(): " + buffer.maxCapacity());    }}

程序的输出结果在此我就不贴出了,建议你可以先尝试思考 readerIndex、writerIndex 是如何改变的,然后再动手跑下上述代码,验证结果是否正确。

结合代码示例,我们总结一下 ByteBuf API 使用时的注意点:

write 系列方法会改变 writerIndex 位置,当 writerIndex 等于 capacity 的时候,Buffer 置为不可写状态;向不可写 Buffer 写入数据时,Buffer 会尝试扩容,但是扩容后 capacity 最大不能超过 maxCapacity,如果写入的数据超过 maxCapacity,程序会直接抛出异常;read 系列方法会改变 readerIndex 位置,get/set 系列方法不会改变 readerIndex/writerIndex 位置。十二、jemalloc内存分配器

jemalloc 是由 Jason Evans 在 FreeBSD 项目中引入的新一代内存分配器。它是一个通用的 malloc 实现,侧重于减少内存碎片和提升高并发场景下内存的分配效率,其目标是能够替代 malloc。jemalloc 应用十分广泛,在 Firefox、Redis、Rust、Netty 等出名的产品或者编程语言中都有大量使用。具体细节可以参考 Jason Evans 发表的论文 [《A Scalable Concurrent malloc Implementation for FreeBSD》]

除了 jemalloc 之外,业界还有一些著名的内存分配器实现,例如 ptmalloc 和 tcmalloc。我们对这三种内存分配器做一个简单的对比:

ptmalloc 是基于 glibc 实现的内存分配器,它是一个标准实现,所以兼容性较好。pt 表示 per thread 的意思。当然 ptmalloc 确实在多线程的性能优化上下了很多功夫。由于过于考虑性能问题,多线程之间内存无法实现共享,只能每个线程都独立使用各自的内存,所以在内存开销上是有很大浪费的。

tcmalloc 出身于 Google,全称是 thread-caching malloc,所以 tcmalloc 最大的特点是带有线程缓存,tcmalloc 非常出名,目前在 Chrome、Safari 等知名产品中都有所应有。tcmalloc 为每个线程分配了一个局部缓存,对于小对象的分配,可以直接由线程局部缓存来完成,对于大对象的分配场景,tcmalloc 尝试采用自旋锁来减少多线程的锁竞争问题。

jemalloc 借鉴了 tcmalloc 优秀的设计思路,所以在架构设计方面两者有很多相似之处,同样都包含 thread cache 的特性。但是 jemalloc 在设计上比 ptmalloc 和 tcmalloc 都要复杂,jemalloc 将内存分配粒度划分为 Small、Large、Huge 三个分类,并记录了很多 meta 数据,所以在空间占用上要略多于 tcmalloc,不过在大内存分配的场景,jemalloc 的内存碎片要少于 tcmalloc。tcmalloc 内部采用红黑树管理内存块和分页,Huge 对象通过红黑树查找索引数据可以控制在指数级时间。

由此可见,虽然几个内存分配器的侧重点不同,但是它们的核心目标是一致的:

高效的内存分配和回收,提升单线程或者多线程场景下的性能。减少内存碎片,包括内部碎片和外部碎片,提高内存的有效利用率。

那么这里又涉及一个概念,什么是内存碎片呢?Linux 中物理内存会被划分成若干个 4K 大小的内存页 Page,物理内存的分配和回收都是基于 Page 完成的,Page 内产生的内存碎片称为内部碎片,Page 之间产生的内存碎片称为外部碎片。

首先讲下内部碎片,因为内存是按 Page 进行分配的,即便我们只需要很小的内存,操作系统至少也会分配 4K 大小的 Page,单个 Page 内只有一部分字节都被使用,剩余的字节形成了内部碎片,如下图所示。

外部碎片与内部碎片相反,是在分配较大内存块时产生的。我们试想一下,当需要分配大内存块的时候,操作系统只能通过分配连续的 Page 才能满足要求,在程序不断运行的过程中,这些 Page 被频繁的回收并重新分配,Page 之间就会出现小的空闲内存块,这样就形成了外部碎片,如下图所示。

上述我们介绍了内存分配器的一些背景知识,它们是操作系统以及高性能组件的必备神器,如果你对内存管理有兴趣,jemalloc 和 tcmalloc 都是非常推荐学习的。

常用内存分配器算法

在学习 jemalloc 的实现原理之前,我们先了解下最常用的内存分配器算法:动态内存分配伙伴算法Slab 算法,这将对于我们理解 jemalloc 大有裨益。

动态内存分配

动态内存分配(Dynamic memory allocation)又称为堆内存分配,后面简称 DMA,操作系统根据程序运行过程中的需求即时分配内存,且分配的内存大小就是程序需求的大小。在大部分场景下,只有在程序运行的时候才知道所需要分配的内存大小,如果提前分配可能会分配的大小无法把控,分配太大会浪费空间,分配太小会无法使用。

DMA 是从一整块内存中按需分配,对于分配出的内存会记录元数据,同时还会使用空闲分区链维护空闲内存,便于在内存分配时查找可用的空闲分区,常用的有三种查找策略:

第一种是⾸次适应算法(first fit),空闲分区链以地址递增的顺序将空闲分区以双向链表的形式连接在一起,从空闲分区链中找到第一个满足分配条件的空闲分区,然后从空闲分区中划分出一块可用内存给请求进程,剩余的空闲分区仍然保留在空闲分区链中。如下图所示,P1 和 P2 的请求可以在内存块 A 中完成分配。该算法每次都从低地址开始查找,造成低地址部分会不断被分配,同时也会产生很多小的空闲分区。

第二种是循环首次适应算法(next fit),该算法是由首次适应算法的变种,循环首次适应算法不再是每次从链表的开始进行查找,而是从上次找到的空闲分区的下⼀个空闲分区开始查找。如下图所示,P1 请求在内存块 A 完成分配,然后再为 P2 分配内存时,是直接继续向下寻找可用分区,最终在 B 内存块中完成分配。该算法相比⾸次适应算法空闲分区的分布更加均匀,而且查找的效率有所提升,但是正因为如此会造成空闲分区链中大的空闲分区会越来越少。

第三种是最佳适应算法(best fit),空闲分区链以空闲分区大小递增的顺序将空闲分区以双向链表的形式连接在一起,每次从空闲分区链的开头进行查找,这样第一个满足分配条件的空间分区就是最优解。如下图所示,在 A 内存块分配完 P1 请求后,空闲分区链重新按分区大小进行排序,再为 P2 请求查找满足条件的空闲分区。该算法的空间利用率更高,但同样也会留下很多较难利用的小空闲分区,由于每次分配完需要重新排序,所以会有造成性能损耗。

伙伴算法

伙伴算法是一种非常经典的内存分配算法,它采用了分离适配的设计思想,将物理内存按照 2 的次幂进行划分,内存分配时也是按照 2 的次幂大小进行按需分配,例如 4KB、 8KB、16KB 等。假设我们请求分配的内存大小为 10KB,那么会按照 16KB 分配。

伙伴算法相对比较复杂,我们结合下面这张图来讲解它的分配原理。

伙伴算法把内存划分为 11 组不同的 2 次幂大小的内存块集合,每组内存块集合都用双向链表连接。链表中每个节点的内存块大小分别为 1、2、4、8、16、32、64、128、256、512 和 1024 个连续的 Page,例如第一组链表的节点为 2^0 个连续 Page,第二组链表的节点为 2^1 个连续 Page,以此类推。

假设我们需要分配 10K 大小的内存块,看下伙伴算法的具体分配过程:

首先需要找到存储 2^4 连续 Page 所对应的链表,即数组下标为 4;查找 2^4 链表中是否有空闲的内存块,如果有则分配成功;如果 2^4 链表不存在空闲的内存块,则继续沿数组向上查找,即定位到数组下标为 5 的链表,链表中每个节点存储 2^5 的连续 Page;如果 2^5 链表中存在空闲的内存块,则取出该内存块并将它分割为 2 个 2^4 大小的内存块,其中一块分配给进程使用,剩余的一块链接到 2^4 链表中。

以上是伙伴算法的分配过程,那么释放内存时候伙伴算法又会发生什么行为呢?当进程使用完内存归还时,需要检查其伙伴块的内存是否释放,所谓伙伴块是不仅大小相同,而且两个块的地址是连续的,其中低地址的内存块起始地址必须为 2 的整数次幂。如果伙伴块是空闲的,那么就会将两个内存块合并成更大的块,然后重复执行上述伙伴块的检查机制。直至伙伴块是非空闲状态,那么就会将该内存块按照实际大小归还到对应的链表中。频繁的合并会造成 CPU 浪费,所以并不是每次释放都会触发合并操作,当链表中的内存块个数小于某个阈值时,并不会触发合并操作。

由此可见,伙伴算法有效地减少了外部碎片,但是有可能会造成非常严重的内部碎片,最严重的情况会带来 50% 的内存碎片。

Slab 算法

因为伙伴算法都是以 Page 为最小管理单位,在小内存的分配场景,伙伴算法并不适用,如果每次都分配一个 Page 岂不是非常浪费内存,因此 Slab 算法应运而生了。Slab 算法在伙伴算法的基础上,对小内存的场景专门做了优化,采用了内存池的方案,解决内部碎片问题。

Linux 内核使用的就是 Slab 算法,因为内核需要频繁地分配小内存,所以 Slab 算法提供了一种高速缓存机制,使用缓存存储内核对象,当内核需要分配内存时,基本上可以通过缓存中获取。此外 Slab 算法还可以支持通用对象的初始化操作,避免对象重复初始化的开销。下图是 Slab 算法的结构图,Slab 算法实现起来非常复杂,本文只做一个简单的了解。

在 Slab 算法中维护着大小不同的 Slab 集合,在最顶层是 cache_chain,cache_chain 中维护着一组 kmem_cache 引用,kmem_cache 负责管理一块固定大小的对象池。通常会提前分配一块内存,然后将这块内存划分为大小相同的 slot,不会对内存块再进行合并,同时使用位图 bitmap 记录每个 slot 的使用情况。

kmem_cache 中包含三个 Slab 链表:完全分配使用 slab_full部分分配使用 slab_partial完全空闲 slabs_empty,这三个链表负责内存的分配和释放。每个链表中维护的 Slab 都是一个或多个连续 Page,每个 Slab 被分配多个对象进行存储。Slab 算法是基于对象进行内存管理的,它把相同类型的对象分为一类。当分配内存时,从 Slab 链表中划分相应的内存单元;当释放内存时,Slab 算法并不会丢弃已经分配的对象,而是将它保存在缓存中,当下次再为对象分配内存时,直接会使用最近释放的内存块。

单个 Slab 可以在不同的链表之间移动,例如当一个 Slab 被分配完,就会从 slab_partial 移动到 slabs_full,当一个 Slab 中有对象被释放后,就会从 slab_full 再次回到 slab_partial,所有对象都被释放完的话,就会从 slab_partial 移动到 slab_empty。

至此,三种最常用的内存分配算法已经介绍完了,优秀的内存分配算法都是在性能和内存利用率之间寻找平衡点,我们今天的主角 jemalloc 就是非常典型的例子。

jemalloc 架构设计

在了解了常用的内存分配算法之后,再理解 jemalloc 的架构设计会相对轻松一些。下图是 jemalloc 的架构图,我们一起学习下它的核心设计理念。

上图中涉及 jemalloc 的几个核心概念,例如 arena、bin、chunk、run、region、tcache 等,我们下面逐一进行介绍。

arena 是 jemalloc 最重要的部分,内存由一定数量的 arenas 负责管理。每个用户线程都会被绑定到一个 arena 上,线程采用 round-robin 轮询的方式选择可用的 arena 进行内存分配,为了减少线程之间的锁竞争,默认每个 CPU 会分配 4 个 arena。

bin 用于管理不同档位的内存单元,每个 bin 管理的内存大小是按分类依次递增。因为 jemalloc 中小内存的分配是基于 Slab 算法完成的,所以会产生不同类别的内存块。

chunk 是负责管理用户内存块的数据结构,chunk 以 Page 为单位管理内存,默认大小是 4M,即 1024 个连续 Page。每个 chunk 可被用于多次小内存的申请,但是在大内存分配的场景下只能分配一次。

run 实际上是 chunk 中的一块内存区域,每个 bin 管理相同类型的 run,最终通过操作 run 完成内存分配。run 结构具体的大小由不同的 bin 决定,例如 8 字节的 bin 对应的 run 只有一个 Page,可以从中选取 8 字节的块进行分配。

region 是每个 run 中的对应的若干个小内存块,每个 run 会将划分为若干个等长的 region,每次内存分配也是按照 region 进行分发。

tcache 是每个线程私有的缓存,用于 small 和 large 场景下的内存分配,每个 tcahe 会对应一个 arena,tcache 本身也会有一个 bin 数组,称为tbin。与 arena 中 bin 不同的是,它不会有 run 的概念。tcache 每次从 arena 申请一批内存,在分配内存时首先在 tcache 查找,从而避免锁竞争,如果分配失败才会通过 run 执行内存分配。

jemalloc 的几个核心的概念介绍完了,我们再重新梳理下它们之间的关系:

内存是由一定数量的 arenas 负责管理,线程均匀分布在 arenas 当中;每个 arena 都包含一个 bin 数组,每个 bin 管理不同档位的内存块;每个 arena 被划分为若干个 chunks,每个 chunk 又包含若干个 runs,每个 run 由连续的 Page 组成,run 才是实际分配内存的操作对象;每个 run 会被划分为一定数量的 regions,在小内存的分配场景,region 相当于用户内存;每个 tcache 对应 一个 arena,tcache 中包含多种类型的 bin。

接下来我们分析下 jemalloc 的整体内存分配和释放流程,主要分为 SamllLargeHuge 三种场景。

首先讲下 Samll 场景,如果请求分配内存的大小小于 arena 中的最小的 bin,那么优先从线程中对应的 tcache 中进行分配。首先确定查找对应的 tbin 中是否存在缓存的内存块,如果存在则分配成功,否则找到 tbin 对应的 arena,从 arena 中对应的 bin 中分配 region 保存在 tbin 的 avail 数组中,最终从 availl 数组中选取一个地址进行内存分配,当内存释放时也会将被回收的内存块进行缓存。

Large 场景的内存分配与 Samll 类似,如果请求分配内存的大小大于 arena 中的最小的 bin,但是不大于 tcache 中能够缓存的最大块,依然会通过 tcache 进行分配,但是不同的是此时会分配 chunk 以及所对应的 run,从 chunk 中找到相应的内存空间进行分配。内存释放时也跟 samll 场景类似,会把释放的内存块缓存在 tacache 的 tbin 中。此外还有一种情况,当请求分配内存的大小大于tcache 中能够缓存的最大块,但是不大于 chunk 的大小,那么将不会采用 tcache 机制,直接在 chunk 中进行内存分配。

Huge 场景,如果请求分配内存的大小大于 chunk 的大小,那么直接通过 mmap 进行分配,调用 munmap 进行回收。

到底为止,jemalloc 的基础知识介绍完毕,你需要花点时间消化它,这对于后面学习 Netty 的内存管理很有帮助。

十三、内存管理

Netty 作为一款高性能的网络框架,需要处理海量的字节数据,而且 Netty 默认提供了池化对象的内存分配,使用完后归还到内存池,所以一套高性能的内存管理机制是 Netty 必不可少的。在上节课中我们介绍了原生 jemalloc 的基本原理,而 Netty 高性能的内存管理也是借鉴 jemalloc 实现的,它同样需要解决两个经典的核心问题:

在单线程或者多线程的场景下,如何高效地进行内存分配和回收?如何减少内存碎片,提高内存的有效利用率?内存规格介绍

Netty 保留了内存规格分类的设计理念,不同大小的内存块采用的分配策略是不同的,具体内存规格的分类情况如下图所示。

上图中 Tiny 代表 0 ~ 512B 之间的内存块,Samll 代表 512B ~ 8K 之间的内存块,Normal 代表 8K ~ 16M 的内存块,Huge 代表大于 16M 的内存块。在 Netty 中定义了一个 SizeClass 类型的枚举,用于描述上图中的内存规格类型,分别为 Tiny、Small 和 Normal。但是图中 Huge 并未在代码中定义,当分配大于 16M 时,可以归类为 Huge 场景,Netty 会直接使用非池化的方式进行内存分配。

Netty 在每个区域内又定义了更细粒度的内存分配单位,分别为 Chunk、Page、Subpage,我们将逐一对其进行介绍。

Chunk 是 Netty 向操作系统申请内存的单位,所有的内存分配操作也是基于 Chunk 完成的,Chunk 可以理解为 Page 的集合,每个 Chunk 默认大小为 16M。

Page 是 Chunk 用于管理内存的单位,Netty 中的 Page 的大小为 8K,不要与 Linux 中的内存页 Page 相混淆了。假如我们需要分配 64K 的内存,需要在 Chunk 中选取 8 个 Page 进行分配。

Subpage 负责 Page 内的内存分配,假如我们分配的内存大小远小于 Page,直接分配一个 Page 会造成严重的内存浪费,所以需要将 Page 划分为多个相同的子块进行分配,这里的子块就相当于 Subpage。按照 Tiny 和 Small 两种内存规格,SubPage 的大小也会分为两种情况。在 Tiny 场景下,最小的划分单位为 16B,按 16B 依次递增,16B、32B、48B ...... 496B;在 Small 场景下,总共可以划分为 512B、1024B、2048B、4096B 四种情况。Subpage 没有固定的大小,需要根据用户分配的缓冲区大小决定,例如分配 1K 的内存时,Netty 会把一个 Page 等分为 8 个 1K 的 Subpage。

了解了 Netty 不同粒度的内存的分配单位后,我们接下来看看 Netty 中的 jemalloc 是如何实现的。

Netty 内存池架构设计

Netty 中的内存池可以看作一个 Java 版本的 jemalloc 实现,并结合 JVM 的诸多特性做了部分优化。如下图所示,我们首先从全局视角看下 Netty 内存池的整体布局,对它有一个宏观的认识。

基于上图的内存池模型,Netty 抽象出一些核心组件,如 PoolArena、PoolChunk、PoolChunkList、PoolSubpage、PoolThreadCache、MemoryRegionCache 等,可以看出与 jemalloc 中的核心概念有些是类似的,接下来我们逐一进行介绍。

PoolArena

Netty 借鉴了 jemalloc 中 Arena 的设计思想,采用固定数量的多个 Arena 进行内存分配,Arena 的默认数量与 CPU 核数有关,通过创建多个 Arena 来缓解资源竞争问题,从而提高内存分配效率。线程在首次申请分配内存时,会通过 round-robin 的方式轮询 Arena 数组,选择一个固定的 Arena,在线程的生命周期内只与该 Arena 打交道,所以每个线程都保存了 Arena 信息,从而提高访问效率。

根据分配内存的类型,ByteBuf 可以分为 Heap 和 Direct,同样 PoolArena 抽象类提供了 HeapArena 和 DirectArena 两个子类。首先看下 PoolArena 的数据结构,如下图所示。

PoolArena 的数据结构包含两个 PoolSubpage 数组和六个 PoolChunkList,两个 PoolSubpage 数组分别存放 Tiny 和 Small 类型的内存块,六个 PoolChunkList 分别存储不同利用率的 Chunk,构成一个双向循环链表。

之前我们介绍了 Netty 内存规格的分类,PoolArena 对应实现了 Subpage 和 Chunk 中的内存分配,其 中 PoolSubpage 用于分配小于 8K 的内存,PoolChunkList 用于分配大于 8K 的内存。

PoolSubpage 也是按照 Tiny 和 Small 两种内存规格,设计了tinySubpagePools 和 smallSubpagePools 两个数组,根据关于 Subpage 的介绍,我们知道 Tiny 场景下,内存单位最小为 16B,按 16B 依次递增,共 32 种情况,Small 场景下共分为 512B、1024B、2048B、4096B 四种情况,分别对应两个数组的长度大小,每种粒度的内存单位都由一个 PoolSubpage 进行管理。假如我们分配 20B 大小的内存空间,也会向上取整找到 32B 的 PoolSubpage 节点进行分配。

PoolChunkList 用于 Chunk 场景下的内存分配,PoolArena 中初始化了六个 PoolChunkList,分别为 qInit、q000、q025、q050、q075、q100,这与 jemalloc 中 run 队列思路是一致的,它们分别代表不同的内存使用率,如下所示:

qInit,内存使用率为 0 ~ 25% 的 Chunk。q000,内存使用率为 1 ~ 50% 的 Chunk。q025,内存使用率为 25% ~ 75% 的 Chunk。q050,内存使用率为 50% ~ 100% 的 Chunk。q075,内存使用率为 75% ~ 100% 的 Chunk。q100,内存使用率为 100% 的 Chunk。

六种类型的 PoolChunkList 除了 qInit,它们之间都形成了双向链表,如下图所示。

随着 Chunk 内存使用率的变化,Netty 会重新检查内存的使用率并放入对应的 PoolChunkList,所以 PoolChunk 会在不同的 PoolChunkList 移动。

我在刚开始学习 PoolChunkList 的时候的一个疑问就是,qInit 和 q000 为什么需要设计成两个,是否可以合并成一个?其实它们各有用处。

qInit 用于存储初始分配的 PoolChunk,因为在第一次内存分配时,PoolChunkList 中并没有可用的 PoolChunk,所以需要新创建一个 PoolChunk 并添加到 qInit 列表中。qInit 中的 PoolChunk 即使内存被完全释放也不会被回收,避免 PoolChunk 的重复初始化工作。

q000 则用于存放内存使用率为 1 ~ 50% 的 PoolChunk,q000 中的 PoolChunk 内存被完全释放后,PoolChunk 从链表中移除,对应分配的内存也会被回收。

还有一点需要注意的是,在分配大于 8K 的内存时,其链表的访问顺序是 q050->q025->q000->qInit->q075,遍历检查 PoolChunkList 中是否有 PoolChunk 可以用于内存分配,源码如下:

private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {    if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||        q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||        q075.allocate(buf, reqCapacity, normCapacity)) {        return;    }    PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);    boolean success = c.allocate(buf, reqCapacity, normCapacity);    assert success;    qInit.add(c);}

这里你或许有了疑问,为什么会优先选择 q050,而不是从 q000 开始呢?

可以说这是一个折中的选择,在频繁分配内存的场景下,如果从 q000 开始,会有大部分的 PoolChunk 面临频繁的创建和销毁,造成内存分配的性能降低。如果从 q050 开始,会使 PoolChunk 的使用率范围保持在中间水平,降低了 PoolChunk 被回收的概率,从而兼顾了性能。

PoolArena 是 Netty 内存分配中非常重要的部分,我们花了较多篇幅进行讲解,对之后理解内存分配的实现原理会有所帮助。

PoolChunkList

PoolChunkList 负责管理多个 PoolChunk 的生命周期,同一个 PoolChunkList 中存放内存使用率相近的 PoolChunk,这些 PoolChunk 同样以双向链表的形式连接在一起,PoolChunkList 的结构如下图所示。因为 PoolChunk 经常要从 PoolChunkList 中删除,并且需要在不同的 PoolChunkList 中移动,所以双向链表是管理 PoolChunk 时间复杂度较低的数据结构。

每个 PoolChunkList 都有内存使用率的上下限:minUsage 和 maxUsage,当 PoolChunk 进行内存分配后,如果使用率超过 maxUsage,那么 PoolChunk 会从当前 PoolChunkList 移除,并移动到下一个 PoolChunkList。同理,PoolChunk 中的内存发生释放后,如果使用率小于 minUsage,那么 PoolChunk 会从当前 PoolChunkList 移除,并移动到前一个 PoolChunkList。

回过头再看下 Netty 初始化的六个 PoolChunkList,每个 PoolChunkList 的上下限都有交叉重叠的部分,如下图所示。因为 PoolChunk 需要在 PoolChunkList 不断移动,如果每个 PoolChunkList 的内存使用率的临界值都是恰好衔接的,例如 1 ~ 50%、50% ~ 75%,那么如果 PoolChunk 的使用率一直处于 50% 的临界值,会导致 PoolChunk 在两个 PoolChunkList 不断移动,造成性能损耗。

PoolChunk

Netty 内存的分配和回收都是基于 PoolChunk 完成的,PoolChunk 是真正存储内存数据的地方,每个 PoolChunk 的默认大小为 16M,首先我们看下 PoolChunk 数据结构的定义:

final class PoolChunk<T> implements PoolChunkMetric {    final PoolArena<T> arena;    final T memory; // 存储的数据    private final byte[] memoryMap; // 满二叉树中的节点是否被分配,数组大小为 4096    private final byte[] depthMap; // 满二叉树中的节点高度,数组大小为 4096    private final PoolSubpage<T>[] subpages; // PoolChunk 中管理的 2048 个 8K 内存块    private int freeBytes; // 剩余的内存大小    PoolChunkList<T> parent;    PoolChunk<T> prev;    PoolChunk<T> next;    // 省略其他代码}

PoolChunk 可以理解为 Page 的集合,Page 只是一种抽象的概念,实际在 Netty 中 Page 所指的是 PoolChunk 所管理的子内存块,每个子内存块采用 PoolSubpage 表示。Netty 会使用伙伴算法将 PoolChunk 分配成 2048 个 Page,最终形成一颗满二叉树,二叉树中所有子节点的内存都属于其父节点管理,如下图所示。

结合 PoolChunk 的结构图,我们介绍一下 PoolChunk 中几个重要的属性:

depthMap 用于存放节点所对应的高度。例如第 2048 个节点 depthMap[1025] = 10。

memoryMap 用于记录二叉树节点的分配信息,memoryMap 初始值与 depthMap 是一样的,随着节点被分配,不仅节点的值会改变,而且会递归遍历更新其父节点的值,父节点的值取两个子节点中最小的值。

subpages 对应上图中 PoolChunk 内部的 Page0、Page1、Page2 ...... Page2047,Netty 中并没有 Page 的定义,直接使用 PoolSubpage 表示。当分配的内存小于 8K 时,PoolChunk 中的每个 Page 节点会被划分成为更小粒度的内存块进行管理,小内存块同样以 PoolSubpage 管理。从图中可以看出,小内存的分配场景下,会首先找到对应的 PoolArena ,然后根据计算出对应的 tinySubpagePools 或者 smallSubpagePools 数组对应的下标,如果对应数组元素所包含的 PoolSubpage 链表不存在任何节点,那么将创建新的 PoolSubpage 加入链表中。

PoolSubpage

目前大家对 PoolSubpage 应该有了一些认识,在小内存分配的场景下,即分配的内存大小小于一个 Page 8K,会使用 PoolSubpage 进行管理。首先看下 PoolSubpage 的定义:

final class PoolSubpage<T> implements PoolSubpageMetric {    final PoolChunk<T> chunk;    private final int memoryMapIdx; // 对应满二叉树节点的下标    private final int runOffset; // PoolSubpage 在 PoolChunk 中 memory 的偏移量    private final long[] bitmap; // 记录每个小内存块的状态// 与 PoolArena 中 tinySubpagePools 或 smallSubpagePools 中元素连接成双向链表    PoolSubpage<T> prev;    PoolSubpage<T> next;    int elemSize; // 每个小内存块的大小    private int maxNumElems; // 最多可以存放多少小内存块:8K/elemSize    private int numAvail; // 可用于分配的内存块个数    // 省略其他代码}

PoolSubpage 中每个属性的含义都比较清晰易懂,我都以注释的形式标出,在这里就不一一赘述了,只指出其中比较重点的两个知识点:

第一个就是 PoolSubpage 是如何记录内存块的使用状态的呢?PoolSubpage 通过位图 bitmap 记录子内存是否已经被使用,bit 的取值为 0 或者 1,如下图所示。

第二个就是 PoolSubpage 和 PoolArena 之间是如何联系起来的?

通过之前的介绍,我们知道 PoolArena 在创建时会初始化 tinySubpagePools 和 smallSubpagePools 两个 PoolSubpage 数组,数组的大小分别为 32 和 4。

假如我们现在需要分配 20B 大小的内存,会向上取整为 32B,从满二叉树的第 11 层找到一个 PoolSubpage 节点,并把它等分为 8KB/32B = 256B 个小内存块,然后找到这个 PoolSubpage 节点对应的 PoolArena,将 PoolSubpage 节点与 tinySubpagePools[1] 对应的 head 节点连接成双向链表,形成下图所示的结构。

下次再有 32B 规格的内存分配时,会直接查找 PoolArena 中 tinySubpagePools[1] 元素的 next 节点是否存在可用的 PoolSubpage,如果存在将直接使用该 PoolSubpage 执行内存分配,从而提高了内存分配效率,其他内存规格的分配原理类似。

PoolThreadCache & MemoryRegionCache

PoolThreadCache 顾名思义,对应的是 jemalloc 中本地线程缓存的意思。那么 PoolThreadCache 是如何被使用的呢?它可以缓存哪些类型的数据呢?

当内存释放时,与 jemalloc 一样,Netty 并没有将缓存归还给 PoolChunk,而是使用 PoolThreadCache 缓存起来,当下次有同样规格的内存分配时,直接从 PoolThreadCache 取出使用即可。PoolThreadCache 缓存 Tiny、Small、Normal 三种类型的数据,而且根据堆内和堆外内存的类型进行了区分,如 PoolThreadCache 的源码定义所示:

final class PoolThreadCache {    final PoolArena<byte[]> heapArena;    final PoolArena<ByteBuffer> directArena;    private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;    private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;    private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;    private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;    private final MemoryRegionCache<byte[]>[] normalHeapCaches;    private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;    // 省略其他代码}

PoolThreadCache 中有一个重要的数据结构:MemoryRegionCache。MemoryRegionCache 有三个重要的属性,分别为 queue,sizeClass 和 size,下图是不同内存规格所对应的 MemoryRegionCache 属性取值范围。

MemoryRegionCache 实际就是一个队列,当内存释放时,将内存块加入队列当中,下次再分配同样规格的内存时,直接从队列中取出空闲的内存块。

PoolThreadCache 将不同规格大小的内存都使用单独的 MemoryRegionCache 维护,如下图所示,图中的每个节点都对应一个 MemoryRegionCache,例如 Tiny 场景下对应的 32 种内存规格会使用 32 个 MemoryRegionCache 维护,所以 PoolThreadCache 源码中 Tiny、Small、Normal 类型的 MemoryRegionCache 数组长度分别为 32、4、3。

到此为止,Netty 中内存管理所涉及的核心组件都介绍完毕,推荐你回头再梳理一遍 jemalloc 的核心概念,与 Netty 做一个简单的对比,思路会更加清晰。

内存分配实现原理

Netty 中负责线程分配的组件有两个:PoolArenaPoolThreadCache。PoolArena 是多个线程共享的,每个线程会固定绑定一个 PoolArena,PoolThreadCache 是每个线程私有的缓存空间,如下图所示。

在上节课中,我们介绍了 PoolChunk、PoolSubpage、PoolChunkList,它们都是 PoolArena 中所用到的概念。PoolArena 中管理的内存单位为 PoolChunk,每个 PoolChunk 会被划分为 2048 个 8K 的 Page。在申请的内存大于 8K 时,PoolChunk 会以 Page 为单位进行内存分配。当申请的内存大小小于 8K 时,会由 PoolSubpage 管理更小粒度的内存分配。

PoolArena 分配的内存被释放后,不会立即会还给 PoolChunk,而且会缓存在本地私有缓存 PoolThreadCache 中,在下一次进行内存分配时,会优先从 PoolThreadCache 中查找匹配的内存块。

由此可见,Netty 中不同的内存规格采用的分配策略是不同的,我们主要分为以下三个场景逐一进行分析。

分配内存大于 8K 时,PoolChunk 中采用的 Page 级别的内存分配策略。分配内存小于 8K 时,由 PoolSubpage 负责管理的内存分配策略。分配内存小于 8K 时,为了提高内存分配效率,由 PoolThreadCache 本地线程缓存提供的内存分配。PoolChunk 中 Page 级别的内存分配

每个 PoolChunk 默认大小为 16M,PoolChunk 是通过伙伴算法管理多个 Page,每个 PoolChunk 被划分为 2048 个 Page,最终通过一颗满二叉树实现,我们再一起回顾下 PoolChunk 的二叉树结构,如下图所示。

假如用户需要依次申请 8K、16K、8K 的内存,通过这里例子我们详细描述下 PoolChunk 如何分配 Page 级别的内存,方便大家理解伙伴算法的原理。

首先看下分配逻辑 allocateRun 的源码,如下所示。PoolChunk 分配 Page 主要分为三步:首先根据分配内存大小计算二叉树所在节点的高度,然后查找对应高度中是否存在可用节点,如果分配成功则减去已分配的内存大小得到剩余可用空间。

private long allocateRun(int normCapacity) {    // 根据分配内存大小计算二叉树对应的节点高度    int d = maxOrder - (log2(normCapacity) - pageShifts);    // 查找对应高度中是否存在可用节点    int id = allocateNode(d);    if (id < 0) {        return id;    }    // 减去已分配的内存大小    freeBytes -= runLength(id);    return id;}

结合 PoolChunk 的二叉树结构以及 allocateRun 源码我们开始分析模拟的示例:

第一次分配 8K 大小的内存时,通过 d = maxOrder - (log2(normCapacity) - pageShifts) 计算得到二叉树所在节点高度为 11,其中 maxOrder 为二叉树的最大高度,normCapacity 为 8K,pageShifts 默认值为 13,因为只有当申请内存大小大于 2^13 = 8K 时才会使用 allocateRun 分配内存。然后从第 11 层查找可用的 Page,下标为 2048 的节点可以被用于分配内存,即 Page[0] 被分配使用,此时赋值 memoryMap[2048] = 12,表示该节点已经不可用,然后递归更新父节点的值,父节点的值取两个子节点的最小值,memoryMap[1024] = 11,memoryMap[512] = 10,以此类推直至 memoryMap[1] = 1,更新后的二叉树分配结果如下图所示。

第二次分配 16K 大小内存时,计算得到所需节点的高度为 10。此时 1024 节点已经分配了一个 8K 内存,不再满足条件,继续寻找到 1025 节点。1025 节点并未使用过,满足分配条件,于是将 1025 节点的两个子节点 2050 和 2051 全部分配出去,并赋值 memoryMap[2050] = 12,memoryMap[2051] = 12,再次递归更新父节点的值,更新后的二叉树分配结果如下图所示。

第三次再次分配 8K 大小的内存时,依然从二叉树第 11 层开始查找,2048 已经被使用,2049 可以被分配,赋值 memoryMap[2049] = 12,并递归更新父节点值,memoryMap[1024] = 12,memoryMap[512] = 12,以此类推直至 memoryMap[1] = 1,最终的二叉树分配结果如下图所示。

至此,PoolChunk 中 Page 级别的内存分配已经介绍完了,可以看出伙伴算法尽可能保证了分配内存地址的连续性,有效地降低了内存碎片。

Subpage 级别的内存分配

为了提高内存分配的利用率,在分配小于 8K 的内存时,PoolChunk 不在分配单独的 Page,而是将 Page 划分为更小的内存块,由 PoolSubpage 进行管理。

首先我们看下 PoolSubpage 的创建过程,由于分配的内存小于 8K,所以走到了 allocateSubpage 源码中:

private long allocateSubpage(int normCapacity) {    // 根据内存大小找到 PoolArena 中 subpage 数组对应的头结点    PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);    int d = maxOrder; // 因为分配内存小于 8K,所以从满二叉树最底层开始查找    synchronized (head) {        int id = allocateNode(d); // 在满二叉树中找到一个可用的节点        if (id < 0) {            return id;        }        final PoolSubpage<T>[] subpages = this.subpages; // 记录哪些 Page 被转化为 Subpage        final int pageSize = this.pageSize;         freeBytes -= pageSize;        int subpageIdx = subpageIdx(id); // pageId 到 subpageId 的转化,例如 pageId=2048 对应的 subpageId=0        PoolSubpage<T> subpage = subpages[subpageIdx];        if (subpage == null) {            // 创建 PoolSubpage,并切分为相同大小的子内存块,然后加入 PoolArena 对应的双向链表中            subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);            subpages[subpageIdx] = subpage;        } else {            subpage.init(head, normCapacity);        }        return subpage.allocate(); // 执行内存分配并返回内存地址    }}

假如我们需要分配 20B 大小的内存,一起分析下上述源码的执行过程:

因为 20B 小于 512B,属于 Tiny 场景,按照内存规格的分类 20B 需要向上取整到 32B。根据内存规格的大小找到 PoolArena 中 tinySubpagePools 数组对应的头结点,32B 对应的 tinySubpagePools[1]。在满二叉树中寻找可用的节点用于内存分配,因为我们分配的内存小于 8K,所以直接从二叉树的最底层开始查找。假如 2049 节点是可用的,那么返回的 id = 2049。找到可用节点后,因为 pageIdx 是从叶子节点 2048 开始记录索引,而 subpageIdx 需要从 0 开始的,所以需要将 pageIdx 转化为 subpageIdx,例如 2048 对应的 subpageIdx = 0,2049 对应的 subpageIdx = 1,以此类推。如果 PoolChunk 中 subpages 数组的 subpageIdx 下标对应的 PoolSubpage 不存在,那么将创建一个新的 PoolSubpage,并将 PoolSubpage 切分为相同大小的子内存块,示例对应的子内存块大小为 32B,最后将新创建的 PoolSubpage 节点与 tinySubpagePools[1] 对应的 head 节点连接成双向链表。最后 PoolSubpage 执行内存分配并返回内存地址。

接下来我们跟进一下 subpage.allocate() 源码,看下 PoolSubpage 是如何执行内存分配的,源码如下:

long allocate() {    if (elemSize == 0) {        return toHandle(0);    }    if (numAvail == 0 || !doNotDestroy) {        return -1;    }    final int bitmapIdx = getNextAvail(); // 在 bitmap 中找到第一个索引段,然后将该 bit 置为 1    int q = bitmapIdx >>> 6; // 定位到 bitmap 的数组下标    int r = bitmapIdx & 63; // 取到节点对应一个 long 类型中的二进制位    assert (bitmap[q] >>> r & 1) == 0;    bitmap[q] |= 1L << r;    if (-- numAvail == 0) {        removeFromPool(); // 如果 PoolSubpage 没有可分配的内存块,从 PoolArena 双向链表中删除    }    return toHandle(bitmapIdx);}

PoolSubpage 通过位图 bitmap 记录每个内存块是否已经被使用。在上述的示例中,8K/32B = 256,因为每个 long 有 64 位,所以需要 256/64 = 4 个 long 类型的即可描述全部的内存块分配状态,因此 bitmap 数组的长度为 4,从 bitmap[0] 开始记录,每分配一个内存块,就会移动到 bitmap[0] 中的下一个二进制位,直至 bitmap[0] 的所有二进制位都赋值为 1,然后继续分配 bitmap[1],以此类推。当我们使用 2049 节点进行内存分配时,bitmap[0] 中的二进制位如下图所示:

当 bitmap 分成成功后,PoolSubpage 会将可用节点的个数 numAvail 减 1,当 numAvail 降为 0 时,表示 PoolSubpage 已经没有可分配的内存块,此时需要从 PoolArena 中 tinySubpagePools[1] 的双向链表中删除。

至此,整个 PoolChunk 中 Subpage 的内存分配过程已经完成了,可见 PoolChunk 的伙伴算法几乎贯穿了整个流程,位图 bitmap 的设计也是非常巧妙的,不仅节省了内存空间,而且加快了定位内存块的速度。

PoolThreadCache 的内存分配

上节课已经介绍了 PoolThreadCache 的基本概念,我们知道 PoolArena 分配的内存被释放时,Netty 并没有将缓存归还给 PoolChunk,而是使用 PoolThreadCache 缓存起来,当下次有同样规格的内存分配时,直接从 PoolThreadCache 取出使用即可。所以下面我们从 PoolArena#allocate() 的源码中看下 PoolThreadCache 是如何使用的。

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {    final int normCapacity = normalizeCapacity(reqCapacity);    if (isTinyOrSmall(normCapacity)) { // capacity < pageSizeint tableIdx;        PoolSubpage<T>[] table;        boolean tiny = isTiny(normCapacity);        if (tiny) { // < 512if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {                return;            }            tableIdx = tinyIdx(normCapacity);            table = tinySubpagePools;        } else {            if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {                return;            }            tableIdx = smallIdx(normCapacity);            table = smallSubpagePools;        }        // 省略其他代码    }    if (normCapacity <= chunkSize) {        if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {            return;        }        synchronized (this) {            allocateNormal(buf, reqCapacity, normCapacity);            ++allocationsNormal;        }    } else {        allocateHuge(buf, reqCapacity);    }}

从源码中可以看出在分配 Tiny、Small 和 Normal 类型的内存时,都会尝试先从 PoolThreadCache 中进行分配,源码结构比较清晰,我们整体梳理一遍流程:

对申请的内存大小做向上取整,例如 20B 的内存大小会取整为 32B。当申请的内存大小小于 8K 时,分为 Tiny 和 Small 两种情况,分别都会优先尝试从 PoolThreadCache 分配内存,如果 PoolThreadCache 分配失败,才会走 PoolArena 的分配流程。当申请的内存大小大于 8K,但是小于 Chunk 的默认大小 16M,属于 Normal 的内存分配,也会优先尝试从 PoolThreadCache 分配内存,如果 PoolThreadCache 分配失败,才会走 PoolArena 的分配流程。当申请的内存大小大于 Chunk 的 16M,则不会经过 PoolThreadCache,直接进行分配。

PoolThreadCache 具体分配内存的过程使用到了一个重要的数据结构 MemoryRegionCache,关于 MemoryRegionCache 的概念你可以回顾下上节课的内容,在这里我就不再赘述了。假如我们现在需要分配 32B 大小的堆外内存,会从 MemoryRegionCache 数组 tinySubPageDirectCaches[1] 中取出对应的 MemoryRegionCache 节点,尝试从 MemoryRegionCache 的队列中取出可用的内存块。

内存回收实现原理

通过之前的介绍我们知道,当用户线程释放内存时会将内存块缓存到本地线程的私有缓存 PoolThreadCache 中,这样在下次分配内存时会提高分配效率,但是当内存块被用完一次后,再没有分配需求,那么一直驻留在内存中又会造成浪费。接下来我们就看下 Netty 是如何实现内存释放的呢?直接跟进下 PoolThreadCache 的源码。

private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {    if (cache == null) {        return false;    }    // 默认每执行 8192 次 allocate(),就会调用一次 trim() 进行内存整理    boolean allocated = cache.allocate(buf, reqCapacity);    if (++ allocations >= freeSweepAllocationThreshold) {        allocations = 0;        trim();    }    return allocated;}void trim() {    trim(tinySubPageDirectCaches);    trim(smallSubPageDirectCaches);    trim(normalDirectCaches);    trim(tinySubPageHeapCaches);    trim(smallSubPageHeapCaches);    trim(normalHeapCaches);}

从源码中可以看出,Netty 记录了 allocate() 的执行次数,默认每执行 8192 次,就会触发 PoolThreadCache 调用一次 trim() 进行内存整理,会对 PoolThreadCache 中维护的六个 MemoryRegionCache 数组分别进行整理。我们继续跟进 trim 的源码,定位到核心逻辑。

public final void trim() {    int free = size - allocations;    allocations = 0;    // We not even allocated all the number that are    if (free > 0) {        free(free, false);    }}private int free(int max, boolean finalizer) {    int numFreed = 0;    for (; numFreed < max; numFreed++) {        Entry<T> entry = queue.poll();        if (entry != null) {            freeEntry(entry, finalizer);        } else {            // all clearedreturn numFreed;        }    }    return numFreed;}

通过 size - allocations 衡量内存分配执行的频繁程度,其中 size 为该 MemoryRegionCache 对应的内存规格大小,size 为固定值,例如 Tiny 类型默认为 512。allocations 表示 MemoryRegionCache 距离上一次内存整理已经发生了多少次 allocate 调用,当调用次数小于 size 时,表示 MemoryRegionCache 中缓存的内存块并不常用,从队列中取出内存块依次释放。

此外 Netty 在线程退出的时候还会回收该线程的所有内存,PoolThreadCache 重载了 finalize() 方法,在销毁前执行缓存回收的逻辑,对应源码如下:

@Overrideprotected void finalize() throws Throwable {    try {        super.finalize();    } finally {        free(true);    }}void free(boolean finalizer) {    if (freed.compareAndSet(false, true)) {        int numFreed = free(tinySubPageDirectCaches, finalizer) +                free(smallSubPageDirectCaches, finalizer) +                free(normalDirectCaches, finalizer) +                free(tinySubPageHeapCaches, finalizer) +                free(smallSubPageHeapCaches, finalizer) +                free(normalHeapCaches, finalizer);        if (numFreed > 0 && logger.isDebugEnabled()) {            logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed,                    Thread.currentThread().getName());        }        if (directArena != null) {            directArena.numThreadCaches.getAndDecrement();        }        if (heapArena != null) {            heapArena.numThreadCaches.getAndDecrement();        }    }}

线程销毁时 PoolThreadCache 会依次释放所有 MemoryRegionCache 中的内存数据,其中 free 方法的核心逻辑与之前内存整理 trim 中释放内存的过程是一致的,有兴趣的同学可以自行翻阅源码。

到此为止,整个 Netty 内存池的分配和释放原理我们已经分析完了,其中巧妙的设计思路以及源码细节的实现,都是非常值得我们学习的宝贵资源。

十四、Recycler

Recycler 对象池。在刚接触到 Netty 对象池这个概念时,你是不是也会有类似的疑问:

对象池和内存池有什么区别?它们有什么联系吗?实现对象池的方法有很多,Netty 也是自己实现的吗?是如何实现的?对象池在实践中我们应该怎么使用?

我们通过一个例子直观感受下 Recycler 如何使用,假设我们有一个 User 类,需要实现 User 对象的复用,具体实现代码如下:

public class UserCache {    private static final Recycler<User> userRecycler = new Recycler<User>() {        @Override        protected User newObject(Handle<User> handle) {            return new User(handle);        }    };    static final class User {        private String name;        private Recycler.Handle<User> handle;        public void setName(String name) {            this.name = name;        }        public String getName() {            return name;        }        public User(Recycler.Handle<User> handle) {            this.handle = handle;        }        public void recycle() {            handle.recycle(this);        }    }    public static void main(String[] args) {        User user1 = userRecycler.get(); // 1、从对象池获取 User 对象        user1.setName("hello"); // 2、设置 User 对象的属性        user1.recycle(); // 3、回收对象到对象池        User user2 = userRecycler.get(); // 4、从对象池获取对象        System.out.println(user2.getName());        System.out.println(user1 == user2);    }}

控制台的输出结果如下:

hellotrue

代码示例中定义了对象池实例 userRecycler,其中实现了 newObject() 方法,如果对象池没有可用的对象,会调用该方法新建对象。此外需要创建 Recycler.Handle 对象与 User 对象进行绑定,这样我们就可以通过 userRecycler.get() 从对象池中获取 User 对象,如果对象不再使用,通过调用 User 类实现的 recycle() 方法即可完成回收对象到对象池。

Recycler 的使用方式是不是特别简单,我们可以单独把它当作工具类在项目中使用。

Recycler 的设计理念

对象池与内存池的都是为了提高 Netty 的并发处理能力,我们知道 Java 中频繁地创建和销毁对象的开销是很大的,所以很多人会将一些通用对象缓存起来,当需要某个对象时,优先从对象池中获取对象实例。通过重用对象,不仅避免频繁地创建和销毁所带来的性能损耗,而且对 JVM GC 是友好的,这就是对象池的作用。

Recycler 是 Netty 提供的自定义实现的轻量级对象回收站,借助 Recycler 可以完成对象的获取和回收。既然 Recycler 是 Netty 自己实现的对象池,那么它是如何设计的呢?首先看下 Recycler 的内部结构,如下图所示:

通过 Recycler 的 UML 图可以看出,一共包含四个核心组件:StackWeakOrderQueueLinkDefaultHandle,接下来我们逐一进行介绍。

首先我们先看下整个 Recycler 的内部结构中各个组件的关系,可以通过下面这幅图进行描述。

第一个核心组件是 Stack,Stack 是整个对象池的顶层数据结构,描述了整个对象池的构造,用于存储当前本线程回收的对象。在多线程的场景下,Netty 为了避免锁竞争问题,每个线程都会持有各自的对象池,内部通过 FastThreadLocal 来实现每个线程的私有化。FastThreadLocal 你可以理解为 Java 里的 ThreadLocal,后续会有专门的课程介绍它。

我们有必要先学习下 Stack 的数据结构,先看下 Stack 的源码定义:

static final class Stack<T> {    final Recycler<T> parent; // 所属的     Recyclerfinal WeakReference<Thread> threadRef; // 所属线程的弱引用    final AtomicInteger availableSharedCapacity; // 异线程回收对象时,其他线程能保存的被回收对象的最大个数    final int maxDelayedQueues; // WeakOrderQueue最大个数    private final int maxCapacity; // 对象池的最大大小,默认最大为 4k    private final int ratioMask; // 控制对象的回收比率,默认只回收 1/8 的对象    private DefaultHandle<?>[] elements; // 存储缓存数据的数组    private int size; // 缓存的 DefaultHandle 对象个数    private int handleRecycleCount = -1;     // WeakOrderQueue 链表的三个重要节点    private WeakOrderQueue cursor, prev;    private volatile WeakOrderQueue head;    // 省略其他代码}

对应上面 Recycler 的内部结构图,Stack 包用于存储缓存数据的 DefaultHandle 数组,以及维护了 WeakOrderQueue 链表中的三个重要节点,关于 WeakOrderQueue 相关概念我们之后再详细介绍。

除此之外,Stack 其他的重要属性我在源码中已经全部以注释的形式标出,大部分已经都非常清楚,其中 availableSharedCapacity 是比较难理解的,每个 Stack 会维护一个 WeakOrderQueue 的链表,每个 WeakOrderQueue 节点会保存非当前线程的其他线程所释放的对象,例如图中 ThreadA 表示当前线程,WeakOrderQueue 的链表存储着 ThreadB、ThreadC 等其他线程释放的对象。

availableSharedCapacity 的初始化方式为 new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY)),默认大小为 16K,其他线程在回收对象时,最多可以回收 ThreadA 创建的对象个数不能超过 availableSharedCapacity。还有一个疑问就是既然 Stack 是每个线程私有的,为什么 availableSharedCapacity 还需要用 AtomicInteger 呢?因为 ThreadB、ThreadC 等多个线程可能都会创建 ThreadA 的 WeakOrderQueue,存在同时操作 availableSharedCapacity 的情况。

第二个要介绍的组件是 WeakOrderQueue,WeakOrderQueue 用于存储其他线程回收到当前线程所分配的对象,并且在合适的时机,Stack 会从异线程的 WeakOrderQueue 中收割对象。如上图所示,ThreadB 回收到 ThreadA 所分配的内存时,就会被放到 ThreadA 的 WeakOrderQueue 当中。

第三个组件是 Link,每个 WeakOrderQueue 中都包含一个 Link 链表,回收对象都会被存在 Link 链表中的节点上,每个 Link 节点默认存储 16 个对象,当每个 Link 节点存储满了会创建新的 Link 节点放入链表尾部。

第四个组件是 DefaultHandle,DefaultHandle 实例中保存了实际回收的对象,Stack 和 WeakOrderQueue 都使用 DefaultHandle 存储回收的对象。在 Stack 中包含一个 elements 数组,该数组保存的是 DefaultHandle 实例。DefaultHandle 中每个 Link 节点所存储的 16 个对象也是使用 DefaultHandle 表示的。

到此为止,我们已经介绍完 Recycler 的内存结构,对 Recycler 有了初步的认识。Recycler 作为一个高性能的对象池,在多线程的场景下,Netty 是如何保证 Recycler 高效地分配和回收对象的呢?接下来我们一起看下 Recycler 对象获取和回收的原理。

从 Recycler 中获取对象

前面我们介绍了 Recycler 如何使用,从代码示例中可以看出,从对象池中获取对象的入口是在 Recycler#get() 方法,直接定位到源码:

public final T get() {    if (maxCapacityPerThread == 0) {        return newObject((Handle<T>) NOOP_HANDLE);    }    Stack<T> stack = threadLocal.get(); // 获取当前线程缓存的 Stack    DefaultHandle<T> handle = stack.pop(); // 从 Stack 中弹出一个 DefaultHandle 对象    if (handle == null) {        handle = stack.newHandle();        handle.value = newObject(handle); // 创建的对象并保存到 DefaultHandle    }    return (T) handle.value;}

Recycler#get() 方法的逻辑非常清晰,首先通过 FastThreadLocal 获取当前线程的唯一栈缓存 Stack,然后尝试从栈顶弹出 DefaultHandle 对象实例,如果 Stack 中没有可用的 DefaultHandle 对象实例,那么会调用 newObject 生成一个新的对象,完成 handle 与用户对象和 Stack 的绑定。

那么 Stack 是如何从 elements 数组中弹出 DefaultHandle 对象实例的呢?只是从 elements 数组中取出一个实例吗?我们一起跟进下 stack.pop() 的源码:

DefaultHandle<T> pop() {    int size = this.size;    if (size == 0) {        // 就尝试从其他线程回收的对象中转移一些到 elements 数组当中        if (!scavenge()) {            return null;        }        size = this.size;    }    size --;    DefaultHandle ret = elements[size]; // 将实例从栈顶弹出    elements[size] = null;    if (ret.lastRecycledId != ret.recycleId) {        throw new IllegalStateException("recycled multiple times");    }    ret.recycleId = 0;    ret.lastRecycledId = 0;    this.size = size;    return ret;}

如果 Stack 的 elements 数组中有可用的对象实例,直接将对象实例弹出;如果 elements 数组中没有可用的对象实例,会调用 scavenge 方法,scavenge 的作用是从其他线程回收的对象实例中转移一些到 elements 数组当中,也就是说,它会想办法从 WeakOrderQueue 链表中迁移部分对象实例。每个 Stack 会有一个 WeakOrderQueue 链表,每个 WeakOrderQueue 节点都维持了相应异线程回收的对象,那么以什么样的策略从 WeakOrderQueue 链表中迁移对象实例呢?继续跟进 scavenge 的源码:

boolean scavenge() {    // 尝试从 WeakOrderQueue 中转移对象实例到 Stack 中    if (scavengeSome()) {        return true;    }    // 如果迁移失败,就会重置 cursor 指针到 head 节点    prev = null;    cursor = head;    return false;}boolean scavengeSome() {    WeakOrderQueue prev;    WeakOrderQueue cursor = this.cursor; // cursor 指针指向当前 WeakorderQueueu 链表的读取位置// 如果 cursor 指针为 null, 则是第一次从 WeakorderQueueu 链表中获取对象    if (cursor == null) {        prev = null;        cursor = head;        if (cursor == null) {            return false;        }    } else {        prev = this.prev;    }    boolean success = false;    // 不断循环从 WeakOrderQueue 链表中找到一个可用的对象实例    do {        // 尝试迁移 WeakOrderQueue 中部分对象实例到 Stack 中        if (cursor.transfer(this)) {            success = true;            break;        }        WeakOrderQueue next = cursor.next;        if (cursor.owner.get() == null) {            // 如果已退出的线程还有数据            if (cursor.hasFinalData()) {                for (;;) {                    if (cursor.transfer(this)) {                        success = true;                    } else {                        break;                    }                }            }            // 将已退出的线程从 WeakOrderQueue 链表中移除            if (prev != null) {                prev.setNext(next);            }        } else {            prev = cursor;        }        // 将 cursor 指针指向下一个 WeakOrderQueue        cursor = next;    } while (cursor != null && !success);    this.prev = prev;    this.cursor = cursor;    return success;}

scavenge 的源码中首先会从 cursor 指针指向的 WeakOrderQueue 节点回收部分对象到 Stack 的 elements 数组中,如果没有回收到数据就会将 cursor 指针移到下一个 WeakOrderQueue,重复执行以上过程直至回到到对象实例为止。具体的流程可以结合下图来理解。

此外,每次移动 cursor 时,都会检查 WeakOrderQueue 对应的线程是否已经退出了,如果线程已经退出,那么线程中的对象实例都会被回收,然后将 WeakOrderQueue 节点从链表中移除。

还有一个问题,每次 Stack 从 WeakOrderQueue 链表会回收多少数据呢?我们依然结合上图讲解,每个 WeakOrderQueue 中都包含一个 Link 链表,Netty 每次会回收其中的一个 Link 节点所存储的对象。从图中可以看出,Link 内部会包含一个读指针 readIndex,每个 Link 节点默认存储 16 个对象,读指针到链表尾部就是可以用于回收的对象实例,每次回收对象时,readIndex 都会从上一次记录的位置开始回收。

在回收对象实例之前,Netty 会计算出可回收对象的数量,加上 Stack 中已有的对象数量后,如果超过 Stack 的当前容量且小于 Stack 的最大容量,会对 Stack 进行扩容。为了防止回收对象太多导致 Stack 的容量激增,在每次回收时 Netty 会调用 dropHandle 方法控制回收频率,具体源码如下:

boolean dropHandle(DefaultHandle<?> handle) {    if (!handle.hasBeenRecycled) {        if ((++handleRecycleCount & ratioMask) != 0) {            // Drop the object.            return true;        }        handle.hasBeenRecycled = true;    }    return false;}

dropHandle 方法中主要靠 hasBeenRecycled 和 handleRecycleCount 两个变量控制回收的频率,会从每 8 个未被收回的对象中选取一个进行回收,其他的都被丢弃掉。

到此为止,从 Recycler 中获取对象的主流程已经讲完了,简单总结为两点:

当 Stack 中 elements 有数据时,直接从栈顶弹出。当 Stack 中 elements 没有数据时,尝试从 WeakOrderQueue 中回收一个 Link 包含的对象实例到 Stack 中,然后从栈顶弹出。Recycler 对象回收原理

理解了如何从 Recycler 获取对象之后,再学习 Recycler 对象回收的原理就会清晰很多了,同样上文代码示例中定位到对象回收的源码入口 DefaultHandle#recycle()。

// DefaultHandle#recyclepublic void recycle(Object object) {    if (object != value) {        throw new IllegalArgumentException("object does not belong to handle");    }    Stack<?> stack = this.stack;    if (lastRecycledId != recycleId || stack == null) {        throw new IllegalStateException("recycled already");    }    stack.push(this);}// Stack#pushpublic void push(DefaultHandle<?> item) {    Thread currentThread = Thread.currentThread();    if (threadRef.get() == currentThread) {        pushNow(item);    } else {        pushLater(item, currentThread);    }}

从源码中可以看出,在回收对象时,会向 Stack 中 push 对象,push 会分为同线程回收和异线程回收两种情况,分别对应 pushNow 和 pushLater 两个方法,我们逐一进行分析。

同线程对象回收

如果是当前线程回收自己分配的对象时,会调用 pushNow 方法:

private void pushNow(DefaultHandle<?> item) {    if ((item.recycleId | item.lastRecycledId) != 0) { // 防止被多次回收    throw new IllegalStateException("recycled already");    }    item.recycleId = item.lastRecycledId = OWN_THREAD_ID;    int size = this.size;    // 1. 超出最大容量 2. 控制回收速率    if (size >= maxCapacity || dropHandle(item)) {        return;    }    if (size == elements.length) {        elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));    }    elements[size] = item;    this.size = size + 1;}

同线程回收对象的逻辑非常简单,就是直接向 Stack 的 elements 数组中添加数据,对象会被存放在栈顶指针指向的位置。如果超过了 Stack 的最大容量,那么对象会被直接丢弃,同样这里使用了 dropHandle 方法控制对象的回收速率,每 8 个对象会有一个被回收到 Stack 中。

异线程对象回收

接下来我们分析异线程对象回收的场景,想必你已经猜到,异线程回收对象时,并不会添加到 Stack 中,而是会与 WeakOrderQueue 直接打交道,先看下 pushLater 的源码:

private void pushLater(DefaultHandle<?> item, Thread thread) {    Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get(); // 当前线程帮助其他线程回收对象的缓存    WeakOrderQueue queue = delayedRecycled.get(this); // 取出对象绑定的 Stack 对应的 WeakOrderQueue    if (queue == null) {        // 最多帮助 2*CPU 核数的线程回收线程        if (delayedRecycled.size() >= maxDelayedQueues) {            delayedRecycled.put(this, WeakOrderQueue.DUMMY); // WeakOrderQueue.DUMMY 表示当前线程无法再帮助该 Stack 回收对象            return;        }        // 新建 WeakOrderQueue        if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {            // drop object            return;        }        delayedRecycled.put(this, queue);    } else if (queue == WeakOrderQueue.DUMMY) {        // drop object        return;    }    queue.add(item); // 添加对象到 WeakOrderQueue 的 Link 链表中}

pushLater 的实现过程可以总结为两个步骤:获取 WeakOrderQueue添加对象到 WeakOrderQueue 中

首先看下如何获取 WeakOrderQueue 对象。通过 FastThreadLocal 取出当前对象的 DELAYED_RECYCLED 缓存,DELAYED_RECYCLED 存放着当前线程帮助其他线程回收对象的映射关系。假如 item 是 ThreadA 分配的对象,当前线程是 ThreadB,此时 ThreadB 帮助 ThreadA 回收 item,那么 DELAYED_RECYCLED 放入的 key 是 StackA。然后从 delayedRecycled 中取出 StackA 对应的 WeakOrderQueue,如果 WeakOrderQueue 不存在,那么为 StackA 新创建一个 WeakOrderQueue,并将其加入 DELAYED_RECYCLED 缓存。WeakOrderQueue.allocate() 会检查帮助 StackA 回收的对象总数是否超过 2K 个,如果没有超过 2K,会将 StackA 的 head 指针指向新创建的 WeakOrderQueue,否则不再为 StackA 回收对象。

当然 ThreadB 不会只帮助 ThreadA 回收对象,它可以帮助其他多个线程回收,所以 DELAYED_RECYCLED 使用的 Map 结构,为了防止 DELAYED_RECYCLED 内存膨胀,Netty 也采取了保护措施,从 delayedRecycled.size() >= maxDelayedQueues 可以看出,每个线程最多帮助 2 倍 CPU 核数的线程回收线程,如果超过了该阈值,假设当前对象绑定的为 StackX,那么将在 Map 中为 StackX 放入一种特殊的 WeakOrderQueue.DUMMY,表示当前线程无法帮助 StackX 回收对象。

接下来我们继续分析对象是如何被添加到 WeakOrderQueue 的,直接跟进 queue.add(item) 的源码:

void add(DefaultHandle<?> handle) {    handle.lastRecycledId = id;    Link tail = this.tail;    int writeIndex;    // 如果链表尾部的 Link 已经写满,那么再新建一个 Link 追加到链表尾部    if ((writeIndex = tail.get()) == LINK_CAPACITY) {        // 检查是否超过对应 Stack 可以存放的其他线程帮助回收的最大对象数        if (!head.reserveSpace(LINK_CAPACITY)) {            // Drop             it.return;        }        this.tail = tail = tail.next = new Link();        writeIndex = tail.get();    }    tail.elements[writeIndex] = handle; // 添加对象到 Link 尾部    handle.stack = null; // handle 的 stack 属性赋值为 null    tail.lazySet(writeIndex + 1);}

在向 WeakOrderQueue 写入对象之前,会先判断 Link 链表的 tail 节点是否还有空间存放对象。如果还有空间,直接向 tail Link 尾部写入数据,否则直接丢弃对象。如果 tail Link 已经没有空间,会新建一个 Link 之后再存放对象,新建 Link 之前会检查异线程帮助回收的对象总数超过了 Stack 设置的阈值,如果超过了阈值,那么对象也会被丢弃掉。

对象被添加到 Link 之后,handle 的 stack 属性被赋值为 null,而在取出对象的时候,handle 的 stack 属性又再次被赋值回来,为什么这么做呢,岂不是很麻烦?如果 Stack 不再使用,期望被 GC 回收,发现 handle 中还持有 Stack 的引用,那么就无法被 GC 回收,从而造成内存泄漏。

到此为止,Recycler 如何回收对象的实现原理就全部分析完了,在多线程的场景下,Netty 考虑的还是非常细致的,Recycler 回收对象时向 WeakOrderQueue 中存放对象,从 Recycler 获取对象时,WeakOrderQueue 中的对象会作为 Stack 的储备,而且有效地解决了跨线程回收的问题,是一个挺新颖别致的设计。

Recycler在 Netty 中的应用

Recycler在 Netty 里面使用也是非常频繁的,我们直接看下Netty 源码中newObject相关的引用,如下图所示。

其中比较常用的有PooledHeapByteBuf和PooledDirectByteBuf,分别对应的堆内存和堆外内存的池化实现。例如我们在使用 PooledDirectByteBuf 的时候,并不是每次都去创建新的对象实例,而是从对象池中获取预先分配好的对象实例,不再使用PooledDirectByteBuf 时,被回收归还到对象池中。

此外,可以看到内存池的MemoryRegionCache 也有使用到对象池,MemoryRegionCache 中保存着一个队列,队列中每个 Entry 节点用于保存内存块,Entry 节点在 Netty 中就是以对象池的形式进行分配和释放,在这里我就不展开了,建议你翻阅下源码,学习下 Entry 节点是何时被分配和释放的,从而加深下对 Recycler 对象池的理解。

十五、零拷贝

零拷贝是一个耳熟能详的词语,在 Linux、Kafka、RocketMQ 等知名的产品中都有使用,通常用于提升 I/O 性能。而且零拷贝也是面试过程中的高频问题,那么你知道零拷贝体现在哪些地方吗?Netty 的零拷贝技术又是如何实现的呢?接下来我们就针对 Netty 零拷贝特性进行详细地分析。

传统 Linux 中的零拷贝技术

在介绍 Netty 零拷贝特性之前,我们有必要学习下传统 Linux 中零拷贝的工作原理。所谓零拷贝,就是在数据操作时,不需要将数据从一个内存位置拷贝到另外一个内存位置,这样可以减少一次内存拷贝的损耗,从而节省了 CPU 时钟周期和内存带宽。

我们模拟一个场景,从文件中读取数据,然后将数据传输到网络上,那么传统的数据拷贝过程会分为哪几个阶段呢?具体如下图所示。

从上图中可以看出,从数据读取到发送一共经历了四次数据拷贝,具体流程如下:

当用户进程发起 read() 调用后,上下文从用户态切换至内核态。DMA 引擎从文件中读取数据,并存储到内核态缓冲区,这里是第一次数据拷贝请求的数据从内核态缓冲区拷贝到用户态缓冲区,然后返回给用户进程。第二次数据拷贝的过程同时,会导致上下文从内核态再次切换到用户态。用户进程调用 send() 方法期望将数据发送到网络中,此时会触发第三次线程切换,用户态会再次切换到内核态,请求的数据从用户态缓冲区被拷贝到 Socket 缓冲区。最终 send() 系统调用结束返回给用户进程,发生了第四次上下文切换。第四次拷贝会异步执行,从 Socket 缓冲区拷贝到协议引擎中。

说明:DMA(Direct Memory Access,直接内存存取)是现代大部分硬盘都支持的特性,DMA 接管了数据读写的工作,不需要 CPU 再参与 I/O 中断的处理,从而减轻了 CPU 的负担。

传统的数据拷贝过程为什么不是将数据直接传输到用户缓冲区呢?其实引入内核缓冲区可以充当缓存的作用,这样就可以实现文件数据的预读,提升 I/O 的性能。但是当请求数据量大于内核缓冲区大小时,在完成一次数据的读取到发送可能要经历数倍次数的数据拷贝,这就造成严重的性能损耗。

接下来我们介绍下使用零拷贝技术之后数据传输的流程。重新回顾一遍传统数据拷贝的过程,可以发现第二次和第三次拷贝是可以去除的,DMA 引擎从文件读取数据后放入到内核缓冲区,然后可以直接从内核缓冲区传输到 Socket 缓冲区,从而减少内存拷贝的次数。

在 Linux 中系统调用 sendfile() 可以实现将数据从一个文件描述符传输到另一个文件描述符,从而实现了零拷贝技术。在 Java 中也使用了零拷贝技术,它就是 NIO FileChannel 类中的 transferTo() 方法,transferTo() 底层就依赖了操作系统零拷贝的机制,它可以将数据从 FileChannel 直接传输到另外一个 Channel。transferTo() 方法的定义如下:

public abstract long transferTo(long position, long count, WritableByteChannel target) throws IOException;

FileChannel#transferTo() 的使用也非常简单,我们直接看如下的代码示例,通过 transferTo() 将 from.data 传输到 to.data(),等于实现了文件拷贝的功能。

public void testTransferTo() throws IOException {    RandomAccessFile fromFile = new RandomAccessFile("from.data", "rw");    FileChannel fromChannel = fromFile.getChannel();    RandomAccessFile toFile = new RandomAccessFile("to.data", "rw");    FileChannel toChannel = toFile.getChannel();    long position = 0;    long count = fromChannel.size();    fromChannel.transferTo(position, count, toChannel);}

在使用了 FileChannel#transferTo() 传输数据之后,我们看下数据拷贝流程发生了哪些变化,如下图所示:

比较大的一个变化是,DMA 引擎从文件中读取数据拷贝到内核态缓冲区之后,由操作系统直接拷贝到 Socket 缓冲区,不再拷贝到用户态缓冲区,所以数据拷贝的次数从之前的 4 次减少到 3 次。

但是上述的优化离达到零拷贝的要求还是有差距的,能否继续减少内核中的数据拷贝次数呢?在 Linux 2.4 版本之后,开发者对 Socket Buffer 追加一些 Descriptor 信息来进一步减少内核数据的复制。如下图所示,DMA 引擎读取文件内容并拷贝到内核缓冲区,然后并没有再拷贝到 Socket 缓冲区,只是将数据的长度以及位置信息被追加到 Socket 缓冲区,然后 DMA 引擎根据这些描述信息,直接从内核缓冲区读取数据并传输到协议引擎中,从而消除最后一次 CPU 拷贝。

通过上述 Linux 零拷贝技术的介绍,你也许还会存在疑问,最终使用零拷贝之后,不是还存在着数据拷贝操作吗?其实从 Linux 操作系统的角度来说,零拷贝就是为了避免用户态和内核态之间的数据拷贝。无论是传统的数据拷贝还是使用零拷贝技术,其中有 2 次 DMA 的数据拷贝必不可少,只是这 2 次 DMA 拷贝都是依赖硬件来完成,不需要 CPU 参与。所以,在这里我们讨论的零拷贝是个广义的概念,只要能够减少不必要的 CPU 拷贝,都可以被称为零拷贝。

Netty的零拷贝技术

介绍完传统 Linux 的零拷贝技术之后,我们再来学习下 Netty 中的零拷贝如何实现。Netty 中的零拷贝和传统 Linux 的零拷贝不太一样。Netty 中的零拷贝技术除了操作系统级别的功能封装,更多的是面向用户态的数据操作优化,主要体现在以下 5 个方面:

堆外内存,避免 JVM 堆内存到堆外内存的数据拷贝。CompositeByteBuf 类,可以组合多个 Buffer 对象合并成一个逻辑上的对象,避免通过传统内存拷贝的方式将几个 Buffer 合并成一个大的 Buffer。通过 Unpooled.wrappedBuffer 可以将 byte 数组包装成 ByteBuf 对象,包装过程中不会产生内存拷贝。ByteBuf.slice 操作与 Unpooled.wrappedBuffer 相反,slice 操作可以将一个 ByteBuf 对象切分成多个 ByteBuf 对象,切分过程中不会产生内存拷贝,底层共享一个 byte 数组的存储空间。Netty 使用 FileRegion 实现文件传输,FileRegion 底层封装了 FileChannel#transferTo() 方法,可以将文件缓冲区的数据直接传输到目标 Channel,避免内核缓冲区和用户态缓冲区之间的数据拷贝,这属于操作系统级别的零拷贝。

下面我们从以上 5 个方面逐一进行介绍。

堆外内存

如果在 JVM 内部执行 I/O 操作时,必须将数据拷贝到堆外内存,才能执行系统调用。这是所有 VM 语言都会存在的问题。那么为什么操作系统不能直接使用 JVM 堆内存进行 I/O 的读写呢?主要有两点原因:第一,操作系统并不感知 JVM 的堆内存,而且 JVM 的内存布局与操作系统所分配的是不一样的,操作系统并不会按照 JVM 的行为来读写数据。第二,同一个对象的内存地址随着 JVM GC 的执行可能会随时发生变化,例如 JVM GC 的过程中会通过压缩来减少内存碎片,这就涉及对象移动的问题了。

Netty 在进行 I/O 操作时都是使用的堆外内存,可以避免数据从 JVM 堆内存到堆外内存的拷贝。

CompositeByteBuf

CompositeByteBuf 是 Netty 中实现零拷贝机制非常重要的一个数据结构,CompositeByteBuf 可以理解为一个虚拟的 Buffer 对象,它是由多个 ByteBuf 组合而成,但是在 CompositeByteBuf 内部保存着每个 ByteBuf 的引用关系,从逻辑上构成一个整体。比较常见的像 HTTP 协议数据可以分为头部信息 header消息体数据 body,分别存在两个不同的 ByteBuf 中,通常我们需要将两个 ByteBuf 合并成一个完整的协议数据进行发送,可以使用如下方式完成:

ByteBuf httpBuf = Unpooled.buffer(header.readableBytes() + body.readableBytes());httpBuf.writeBytes(header);httpBuf.writeBytes(body);

可以看出,如果想实现 header 和 body 这两个 ByteBuf 的合并,需要先初始化一个新的 httpBuf,然后再将 header 和 body 分别拷贝到新的 httpBuf。合并过程中涉及两次 CPU 拷贝,这非常浪费性能。如果使用 CompositeByteBuf 如何实现类似的需求呢?如下所示:

CompositeByteBuf httpBuf = Unpooled.compositeBuffer();httpBuf.addComponents(true, header, body);

CompositeByteBuf 通过调用 addComponents() 方法来添加多个 ByteBuf,但是底层的 byte 数组是复用的,不会发生内存拷贝。但对于用户来说,它可以当作一个整体进行操作。那么 CompositeByteBuf 内部是如何存放这些 ByteBuf,并且如何进行合并的呢?我们先通过一张图看下 CompositeByteBuf 的内部结构:

从图上可以看出,CompositeByteBuf 内部维护了一个 Components 数组。在每个 Component 中存放着不同的 ByteBuf,各个 ByteBuf 独立维护自己的读写索引,而 CompositeByteBuf 自身也会单独维护一个读写索引。由此可见,Component 是实现 CompositeByteBuf 的关键所在,下面看下 Component 结构定义:

private static final class Component {    final ByteBuf srcBuf; // 原始的 ByteBuf    final ByteBuf buf; // srcBuf 去除包装之后的 ByteBuf    int srcAdjustment; // CompositeByteBuf 的起始索引相对于 srcBuf 读索引的偏移    int adjustment; // CompositeByteBuf 的起始索引相对于 buf 的读索引的偏移    int offset; // Component 相对于 CompositeByteBuf 的起始索引位置    int endOffset; // Component 相对于 CompositeByteBuf 的结束索引位置// 省略其他代码}

为了方便理解上述 Component 中的属性含义,我同样以 HTTP 协议中 header 和 body 为示例,通过一张图来描述 CompositeByteBuf 组合后其中 Component 的布局情况,如下所示:

从图中可以看出,header 和 body 分别对应两个 ByteBuf,假设 ByteBuf 的内容分别为 "header" 和 "body",那么 header ByteBuf 中 offset~endOffset 为 0~6,body ByteBuf 对应的 offset~endOffset 为 0~10。由此可见,Component 中的 offset 和 endOffset 可以表示当前 ByteBuf 可以读取的范围,通过 offset 和 endOffset 可以将每一个 Component 所对应的 ByteBuf 连接起来,形成一个逻辑整体。

此外 Component 中 srcAdjustment 和 adjustment 表示 CompositeByteBuf 起始索引相对于 ByteBuf 读索引的偏移。初始 adjustment = readIndex - offset,这样通过 CompositeByteBuf 的起始索引就可以直接定位到 Component 中 ByteBuf 的读索引位置。当 header ByteBuf 读取 1 个字节,body ByteBuf 读取 2 个字节,此时每个 Component 的属性又会发生什么变化呢?如下图所示。

至此,CompositeByteBuf 的基本原理我们已经介绍完了,关于具体 CompositeByteBuf 数据操作的细节在这里就不做展开了,有兴趣的同学可以自己深入研究 CompositeByteBuf 的源码。

Unpooled.wrappedBuffer 操作

介绍完 CompositeByteBuf 之后,再来理解 Unpooled.wrappedBuffer 操作就非常容易了,Unpooled.wrappedBuffer 同时也是创建 CompositeByteBuf 对象的另一种推荐做法。

Unpooled 提供了一系列用于包装数据源的 wrappedBuffer 方法,如下所示:

Unpooled.wrappedBuffer方法可以将不同的数据源的一个或者多个数据包装成一个大的ByteBuf对象,其中数据源的类型包括 byte[]、ByteBuf、ByteBuffer。包装的过程中不会发生数据拷贝操作,包装后生成的ByteBuf对象和原始ByteBuf对象是共享底层的byte 数组。

ByteBuf.slice操作

ByteBuf.slice和Unpooled.wrappedBuffer的逻辑正好相反,ByteBuf.slice是将一个ByteBuf 对象切分成多个共享同一个底层存储的ByteBuf对象。

ByteBuf提供了两个slice切分方法:

public ByteBuf slice();public ByteBuf slice(int index, int length);

假设我们已经有一份完整的HTTP数据,可以通过slice方法切分获得header和body两个ByteBuf 对象,对应的内容分别为"header" 和"body",实现方式如下:

ByteBuf httpBuf = ...ByteBuf header = httpBuf.slice(0, 6);ByteBuf body = httpBuf.slice(6, 4);

通过slice切分后都会返回一个新的ByteBuf对象,而且新的对象有自己独立的readerIndex、writerIndex索引,如下图所示。由于新的ByteBuf对象与原始的ByteBuf对象数据是共享的,所以通过新的ByteBuf对象进行数据操作也会对原始ByteBuf对象生效。

文件传输FileRegion

在Netty源码的example包中,提供了FileRegion的使用示例,以下代码片段摘自FileServerHandler.java。

@Overridepublic void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {    RandomAccessFile raf = null;    long length = -1;    try {        raf = new RandomAccessFile(msg, "r");        length = raf.length();    } catch (Exception e) {        ctx.writeAndFlush("ERR: " + e.getClass().getSimpleName() + ": " + e.getMessage() + '\n');        return;    } finally {        if (length < 0 && raf != null) {            raf.close();        }    }    ctx.write("OK: " + raf.length() + '\n');    if (ctx.pipeline().get(SslHandler.class) == null) {        // SSL not enabled - can use zero-copy file transfer.        ctx.write(new DefaultFileRegion(raf.getChannel(), 0, length));    } else {        // SSL enabled - cannot use zero-copy file transfer.        ctx.write(new ChunkedFile(raf));    }    ctx.writeAndFlush("\n");}

从FileRegion的使用示例可以看出,Netty使用FileRegion实现文件传输的零拷贝。FileRegion的默认实现类是 DefaultFileRegion,通过DefaultFileRegion将文件内容写入到NioSocketChannel。

那么FileRegion是如何实现零拷贝的呢?我们通过源码看看FileRegion到底使用了什么黑科技。

public class DefaultFileRegion extends AbstractReferenceCounted implements FileRegion {    private final File f; // 传输的文件    private final long position; // 文件的起始位置    private final long count; // 传输的字节数    private long transferred; // 已经写入的字节数    private FileChannel file; // 文件对应的 FileChannel    @Override    public long transferTo(WritableByteChannel target, long position) throws IOException {        long count = this.count - position;        if (count < 0 || position < 0) {            throw new IllegalArgumentException(                    "position out of range: " + position +                    " (expected: 0 - " + (this.count - 1) + ')');        }        if (count == 0) {            return 0L;        }        if (refCnt() == 0) {            throw new IllegalReferenceCountException(0);        }        open();        long written = file.transferTo(this.position + position, count, target);        if (written > 0) {            transferred += written;        } else if (written == 0) {            validate(this, position);        }        return written;    }    // 省略其他代码}

从源码可以看出,FileRegion其实就是对 FileChannel的包装,并没有什么特殊操作,底层使用的是JDK下NIO中的 FileChannel#transferTo()方法实现文件传输,所以 FileRegion是操作系统级别的零拷贝,对于传输大文件会很有帮助。

到此为止,Netty相关的零拷贝技术都已经介绍完了,可以看出 Netty对于ByteBuf做了更多精进的设计和优化。

标签: #bytebuf 读取所有数据