龙空技术网

Netty实战 IM即时通讯系统(十一)pipeline与channelHandler

大数据小菜鸡 461

前言:

此刻各位老铁们对“nettypipeline执行完”大体比较珍视,各位老铁们都需要知道一些“nettypipeline执行完”的相关内容。那么小编同时在网上收集了一些对于“nettypipeline执行完””的相关内容,希望各位老铁们能喜欢,我们快快来学习一下吧!

Netty实战 IM即时通讯系统(十一)pipeline与channelHandler

零、 目录

IM系统简介Netty 简介Netty 环境配置服务端启动流程客户端启动流程实战: 客户端和服务端双向通信数据传输载体ByteBuf介绍客户端与服务端通信协议编解码实现客户端登录实现客户端与服务端收发消息pipeline与channelHandler构建客户端与服务端pipeline拆包粘包理论与解决方案channelHandler的生命周期使用channelHandler的热插拔实现客户端身份校验客户端互聊原理与实现群聊的发起与通知群聊的成员管理(加入与退出,获取成员列表)群聊消息的收发及Netty性能优化心跳与空闲检测总结扩展

一、 简介

这一小节中 , 我们来学习Netty 中一大核心组件: pipeline 与 channelHandler上一小节最后 , 我们提出: 如何避免switch-case 泛滥? , 我们注意到, 不管是服务端还是客户端 , 处理流程大致分为以下步骤

我们把这三类逻辑都写在一个类里面, 客户端写在 ClientHandler , 服务端写在 ServerHandler , 如果要做功能的扩展 (比如 , 我们要校验魔数 , 或者其他特殊逻辑) , 只能在一个类里面去修改 , 这个类就会变得越来越臃肿。另外 , 我们注意到, 每次发指令数据包都要是都调用编码器编码成byteBuf , 对于这类场景的编码优化, 我们能想到的办法自然是模块化处理 , 不同的逻辑放置到单独的类中来处理 , 最后将这些逻辑串联起来 , 形成一个完整的逻辑处理链 。Netty中的pipeline 与channelHandler 正是来解决这个问题的: 他通过责任链设计模式来组织代码逻辑 , 并且能够支持逻辑的动态添加和删除 , Netty能够支持各类协议的支持和扩展 , 比如: HTTP , WebSocket ,Redis 靠的就是pipeline与channelHanler。

二、 pipeline 与channelHandler 的构成

无论是从服务端来看 , 还是从客户端来看 , 在Netty整个框架中, 一条连接对应着一个channel ,这个channel 所有的处理逻辑都在一个叫做ChannelPipeline的对象里面 , ChannelPipeline 是一个双向链表结构 , 他和Channel之间是一对一的关系ChannelPipeline 里面每个节点都是一个ChannelHandlerContext 对象 , 这个对象能够拿到和Channel相关的上下文信息 , 然后这个对象抱着一个重要的对象 , 那就是逻辑处理器 ChannelHandler接下啦我们来看一下ChannelHandler有哪些分类

三、 ChannelHandler 的分类

可以考到ChannelHandler 有两大子接口第一个子接口是ChannelInboundHandler , 从字面意思也可以猜到,他是处理读数据的逻辑 , 比如: 我们在一段读到一段数据 , 首先要解析这段数据 , 然后对这些数据做一些逻辑处理 , 最终把响应写到对端 , 在开始组装响应之前的逻辑 , 都可以放在ChannelInboundHandler 中处理 , 他的一个最重要的方法就是 channelRead , 读者可以将ChannelInboundHandler中的逻辑处理过程与TCP的七层协议的解析连接起来 , 收到的数据一层一层从物理层传送到我们的应用层 。第二个子接口 ChannelOutboundHandler 是处理写数据的逻辑 , 他是定义我们一段在组装完响应之后吧数据写到对端的逻辑 ,比如我们封装好一个response对象 , 接下来我们可能对这个response做一些其他的特殊的逻辑, 然后 , 在编码成byteBuf , 最终写到对端 , 它里面最核心的一个方法就是write() , 读者可以将ChannelOutboundHandler的逻辑处理过程与TCP 的七层协议的封装过程联系起来 , 我们在应用层组装响应之后 , 通过层层协议的封装 , 直到最底层的物理层。这两个子接口分别有对应的默认的实现 , ChannelInboundHandlerAdapter 和ChannelOutboundHandlerAdapter , 他们分别实现了两大接口的所有功能 , 默认情况下回吧读写事件传播到下一个handler。说了这么多理论, 其实还是比较抽象的 , 下面我们就用一个具体的demo 来学习一下这两个handler的事件传播机制。

四、 ChannelInboundHanndler 的事件传播

关于ChannelInboundHandler , 我们拿 ChannelRead() 为例子 , 来体验一下inbound时间的传播我们在服务端的pipeline 添加三个ChannelInboundHandler

Test_12_Server.javaserverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new Test_11_InboundHandlerA()); ch.pipeline().addLast(new Test_11_InboundHandlerB()); ch.pipeline().addLast(new Test_11_InboundHandlerC()); } });
每个inBoundHandler 继承自ChannelInboundHandlerAdapter , 然后实现了 channelRead() 方法
/** * 2019年1月29日 * @author outman * 服务端处理 A */class Test_11_InboundHandlerA extends ChannelInboundHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buffer = (ByteBuf)msg; System.out.println("Test_11_InboundHandlerA --> " + buffer.toString(Charset.forName("UTF-8"))); super.channelRead(ctx, msg); }}/** * 2019年1月29日 * @author outman * 服务端处理 B */class Test_11_InboundHandlerB extends ChannelInboundHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buffer = (ByteBuf)msg; System.out.println("Test_11_InboundHandlerB --> " + buffer.toString(Charset.forName("UTF-8"))); super.channelRead(ctx, msg); }}/** * 2019年1月29日 * @author outman * 服务端处理 C */class Test_11_InboundHandlerC extends ChannelInboundHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buffer = (ByteBuf)msg; System.out.println("Test_11_InboundHandlerC --> " + buffer.toString(Charset.forName("UTF-8"))); super.channelRead(ctx, msg); }}
在channelRead() 方法里面 , 我们当前handler 的信息 , 然后调用父类的channelRead() 方法 , 而这里父类的channelRead()方法会自定调用下一个inboundHandler的channelRead() , 并且会把处理完毕的对象传递给下一个inboundHandler , 我们例子中传递的对象都是同一个msg 。我们通过 addLast()方法来为pipeline添加 inboundHandler , 当然 , 除了这个方法还有其他的方法 , 感兴趣的同学可以去浏览一下pipeline的api ,这里我们添加的顺序为 A --> B --> c , 然后我们来看一下控制台输出

五、ChannelOutboundHandler 的时间传播

关于ChannelOutboundHandler , 我们拿write()为例子,来体验一下outbound事件的传播。我们继续在服务端的pipeline添加三个ChannelOutboundHandler

Test_12_Server.javaserverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { // inbound 服务端读数据逻辑 ch.pipeline().addLast(new Test_11_InboundHandlerA()); ch.pipeline().addLast(new Test_11_InboundHandlerB()); ch.pipeline().addLast(new Test_11_InboundHandlerC()); // outbound 服务端系数据逻辑 ch.pipeline().addLast(new Test_11_OutboundHandlerA()); ch.pipeline().addLast(new Test_11_OutboundHandlerB()); ch.pipeline().addLast(new Test_11_OutboundHandlerC()); } });
每个outboundHandler都继承自ChannelOutboundHandlerAdapter , 然后实现了write()方法
/** * 2019年2月14日 * @author outman * * 服务端写数据逻辑 A */class Test_12_OutboundHandlerA extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("Test_12_OutboundHandlerA --> " + msg); super.write(ctx, msg, promise); }}/** * 2019年2月14日 * @author outman * * 服务端写数据逻辑 B */class Test_12_OutboundHandlerB extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("Test_12_OutboundHandlerB --> " + msg); super.write(ctx, msg, promise); }}/** * 2019年2月14日 * @author outman * * 服务端写数据逻辑 C */class Test_12_OutboundHandlerC extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("Test_12_OutboundHandlerC --> " + msg); super.write(ctx, msg, promise); }}
在write()方法里面 , 我们打印当前handler的信息,然后调用父类write()方法,而这里父类的write()方法会自动调用到下一个outboundHandler的write()方法,并且把当前outboundHandler里处理完毕的对象传递到下一个outboundHandler.我们通过addLast()方法添加outboundHandler的顺序为 A -> B -> C , 最后我们来看一下控制台的输出

可以看到outboundHandler的执行顺序与我们添加的顺序相反 , 最后,我们在来看一下pipeline的结构和执行顺序。pipeline 的结构不管我们定义的是那种类型的handler, 最终他们都以双向链表的形式连接,这里实际链表的节点是ChannelHandlerContext , 这里为了让结构清晰突出,可以直接把节点看做ChannelHandlerContextpipeline 的执行顺序 虽然两种类型的handler在一个双向链表里, 但是这两类handler 的分工是不一样的,inboundHandler的事件通常只会传播到下一个channelHandler,outboundHandler的事件通常通常只会传播到下一个outboundHandler , 两者互不干扰。

六 、 总结

通过我们前面编写客户端、服务端处理逻辑引出了pipeline和channelHandler的概念channelHandler分为inbound和outbound两种类型的接口 , 分别是处理数据读和数据写的逻辑两种类型的handler均有相应的默认实现,默认会把事件传递到下一个 , 这里的传递事件其实说白了就是把本handler的处理结果传递给下一个handler继续处理inboundHandler的执行顺序与我们实际的添加顺序相同 ,而outboundHandler 相反。

七 、 思考

参考本文中的例子 , 如果我们往pipeline里面添加handler的顺序不变,要在控制台打印出inboundA -> inboundB -> outboundB -> outboundA , 该如何实现?如何在每个handler里面打印上一个handler 的处理结束的时间点呢?答: 可以将上一个handler处理结束的时间放在channel的attr中如何让outboundHandler按照添加顺序执行?答: pipeline有andLast() 和andFrist()方法 ,inboundHandler在执行过程中正向遍历链表 使用andLast()方法先进先出可以顺序执行 , outboundHandler在执行过程中逆向遍历链表, 使用andFrist()方法 先进后出可以顺序执行 。OutBoundHandler一直没有被执行到,pipeline也已添加,有可能是什么原因呢?答: ctx.channel().writeAndFlush(msg); 进行写事件才会触发out调用链 。 所以需要将inboundHandlerC 中执行写逻辑 才会执行outboundHandler.

标签: #nettypipeline执行完