龙空技术网

Netty ChannelPipeline 数据管道

java任我行 89

前言:

目前兄弟们对“管道java”大致比较注重,同学们都想要了解一些“管道java”的相关内容。那么小编在网络上网罗了一些有关“管道java””的相关文章,希望朋友们能喜欢,姐妹们一起来学习一下吧!

1. 前言

前面,我们也提到了 ChannelPipeline,它是管道或者说管理 Handler 的集合,很多同学很容易搞混 Channel、ChannelPipeline 和 ChannelHandler 之间关系。

本节内容我们需要理清并且掌握以下知识点:

Channel、ChannelPipeline、ChannelHandler 之间的关系;了解 ChannelPipeline 如何管理 ChannelHandler。2. 三者之间的关系

Channel 是一个连接通道,客户端和服务端连接成功之后,会维持一个 Channel,可以通过 Channel 来发送数据。Channel 有且仅有一个 ChannelPipeline 与之相对应,ChannelPipeline 又维护着一个由多个 ChannelHandlerContext 组成的双向链表,ChannelHandlerContext 又关联着一个 ChannelHandler。

它们之间的关系,大概如下图所示:

3. ChannelPipeline 核心方法

ChannelPipeline 的最常用方法:

方法

描述

addFirst(…)

添加 ChannelHandler 在 ChannelPipeline 的第一个位置

addBefore(…)

在 ChannelPipeline 中指定的 ChannelHandler 名称之前添加 ChannelHandler

addAfter(…)

在 ChannelPipeline 中指定的 ChannelHandler 名称之后添加 ChannelHandler

addLast(…)

在 ChannelPipeline 的末尾添加 ChannelHandler

remove(…)

删除 ChannelPipeline 中指定的 ChannelHandler

replace(…)

替换 ChannelPipeline 中指定的 ChannelHandler

ChannelHandler first()

获取链表当中的第一个节点

ChannelHandler last()

获取链表当中的最后一个节点

实例:

ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap    .group(bossGroup, workerGroup)    .channel(NioServerSocketChannel.class)    .childHandler(new ChannelInitializer<NioSocketChannel>() {        protected void initChannel(NioSocketChannel ch) {            ch.pipeline().addLast(new Handler1());            ch.pipeline().addLast(new Handler2());            ch.pipeline().addLast(new Handler3());            ch.pipeline().addLast(new Handler4());        }    });

总结,ChannelPipeline 的用法比较固定,虽然方法很多,但是一般常用的就是 addLast。

4. ChannelPipeline 管理链表

实例:

//1.创建ChannelPipelineChannelPipeline pipeline = ch.pipeline();//2.创建HandlerFirstHandler firstHandler = new FirstHandler();SecondHandler secondHandler=new SecondHandler();ThirdHandler thirdHandler=new ThirdHandler();FourthHandler fourthHandler=new FourthHandler();//3.操作pipeline.addLast("handler1", firstHandler);pipeline.addFirst("handler2", secondHandler);//在最开始添加pipeline.addLast("handler3", thirdHandler);//在最后添加pipeline.remove("handler3"); //根据名称删除pipeline.remove(firstHandler);//根据对象删除pipeline.replace("handler2", "handler4", fourthHandler);//替换

输出结果:

FourthHandler
5. 入站和出站执行顺序

在真实的项目开发当中,inboundHandler 和 outboundHandler 都是多个的,一般是一个业务处理对应一个 Handler。那么多个的情况下,Pipeline 的执行顺序又是怎么样的呢?

5.1 Inbound 不往下传递

实例:

ch.pipeline().addLast(new InboundHandler1());ch.pipeline().addLast(new InboundHandler2());public class InboundHandler1 extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("inbound1>>>>>>>>>");    }}public class InboundHandler2 extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("inbound2>>>>>>>>>");    }}

执行结果:

inbound1>>>>>>>>>

思考:为什么不执行 InboundHandler2 呢?

原因:InboundHandler1 没有手工往下传递执行。

5.2 Inbound 流转顺序

实例:

ch.pipeline().addLast(new InboundHandler1());ch.pipeline().addLast(new InboundHandler2());public class InboundHandler1 extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("inbound1>>>>>>>>>");                //往下传递        super.channelRead(ctx, msg);    }}public class InboundHandler2 extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("inbound2>>>>>>>>>");    }}

执行结果:

inbound1>>>>>>>>>inbound2>>>>>>>>>

InboundHandler 之间可以通过 super.channelRead(ctx, msg); 往下传递。

5.3 Inbound 执行顺序

实例:

ch.pipeline().addLast(new InboundHandler1());ch.pipeline().addLast(new InboundHandler2());public class InboundHandler1 extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        //1.往下传递        super.channelRead(ctx, msg);        //2.打印信息        System.out.println("inbound1>>>>>>>>>");    }}public class InboundHandler2 extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("inbound2>>>>>>>>>");    }}

执行结果:

inbound2>>>>>>>>>inbound1>>>>>>>>>

InboundHandler1 先往下传递,再执行自己的业务,那么 InboundHandler2 就会比 InboundHandler1 先执行。

总结:Inbound 是按顺序进行传递,但是逻辑的执行并非是按顺序执行,而是由 super.channelRead(ctx, msg); 去决定。

5.4 流转到 Outbound

InboundHandler 往 OutboundHandler 流转,需要手工调用 ctx.channel().writeAndFlush(),否则无法执行 OutboundHandler 的业务逻辑。

实例:

public class InboundHandler2 extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("inbound2>>>>>>>>>");		//传递到OutboundHandler        ctx.channel().writeAndFlush("hello world");    }}
5.5 Outbound 内部流转

跟 InboundHandler 一样,需要手工往下传递,否则无法流转到下一个 OutboundHandler。

实例:

public class OutboundHandler2 extends ChannelOutboundHandlerAdapter {    @Override    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {        System.out.println("outbound2>>>>>>>>>");		//往下流转        super.write(ctx, msg, promise);    }}

总结:OutboundHandler 是按逆向来流转,但是业务逻辑的执行顺序则是由 super.write(ctx, msg, promise); 决定。

5.6 ctx.writeAndFlush 和 ctx.channel ().writeAndFlush 的区别

很多同学很容易遇到以下问题,并且会想不通。

实例:

ch.pipeline().addLast(new InboundHandler1());ch.pipeline().addLast(new InboundHandler2());ch.pipeline().addLast(new OutboundHandler1());ch.pipeline().addLast(new OutboundHandler2());代码块1234

InboundHandler2 流转代码

public class InboundHandler2 extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("inbound2>>>>>>>>>");		//流转到OutboundHandler2        ctx.channel().writeAndFlush("hello world");    }}

执行结果:

inbound1>>>>>>>>>inbound2>>>>>>>>>outbound2>>>>>>>>>outbound1>>>>>>>>>

修改 InboundHandler2 流转代码

public class InboundHandler2 extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("inbound2>>>>>>>>>");		//【注意,调整这里了】        ctx.writeAndFlush("hello world");    }}

执行结果

inbound1>>>>>>>>>inbound2>>>>>>>>>

思考:为什么这里使用 ctx.writeAndFlush 就流程不下去了呢?

ctx.writeAndFlush(); 最终源码

private AbstractChannelHandlerContext findContextOutbound() {    AbstractChannelHandlerContext ctx = this;    do {        ctx = ctx.prev;    } while(!ctx.outbound);    return ctx;}

通过源码,我们发现它是从当前 InboundHandler 开始往前执行。

ctx.channel().writeAndFlush(); 最终源码

public final ChannelFuture writeAndFlush(Object msg) {    return this.tail.writeAndFlush(msg);}

通过源码,我们发现它是从链表的最后一个节点开始往前面执行。

总结,如果是 OutboundHandler 放在 InboundHandler 之后,使用不同的 writeAndFlush 则得到的结果不一样。

5.7 规律总结

Inbound 的顺序

流转顺序: 多个 Inbound 不会自动往下流转,需要手工调用 ctx.fireChannelRead(msg); 才能流转到下一个;执行顺序: 业务逻辑的执行顺序,则根据 ctx.fireChannelRead(msg); 和逻辑的先后顺序所决定;Inbound 往 Outbound 流转,则需要手工 ctx.channel().writeAndFlush()

Outbound 的顺序

流转顺序: 多个 Outbound 不会自动往下流转,需要手工调用 ctx.write(msg, promise); 才能流转到下一个;执行顺序: 业务逻辑的执行顺序,则根据 ctx.write(msg, promise); 和逻辑的先后顺序所决定。6. 小结

本文主要讲解的知识点

Channel、ChannelPipeline、ChannelHandlerContext、ChannelHandler 之间的关系;ChannelPipeline 的核心方法,以及它是如何管理 Handler 的,主要通过 addLast () 去组装 Handler;入站和出站的流转顺序和业务逻辑的执行顺序。

标签: #管道java