龙空技术网

Netty源码-ChannelPipeline的剖析

架构修炼者 120

前言:

现在你们对“nettypipeline怎么用”大约比较关切,姐妹们都需要了解一些“nettypipeline怎么用”的相关资讯。那么小编同时在网上汇集了一些关于“nettypipeline怎么用””的相关资讯,希望同学们能喜欢,朋友们一起来学习一下吧!

ChannelPipeline的剖析学习目标ChannelPipeline概念ChannelPipelinel创建和初始化过程ChannelPipelinel实战学习目标ChannelPipeline责任链实现ChannelPipeline初始化过程和传递过程ChannelPipeline概念

ChannelPipeline:由多个ChannelHandler组成的责任链,专门负责拦截处理Channel的Inbound(入站)和Outbound(出站)事件的。例如:我们会定义一个ChannelHandler,添加到ChannelPipeline中,从socket中读取数据,进行相应的业务逻辑处理。

ChannelHandler:分为ChannelInboundHandler和ChannelOutboundHandler两个接口,与之对应的还有两个适配器类:ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter,开发时只需继承适配器类重写相应方法就可以了,别的方法适配器类已经给处理了。还有个SimpleChannelInboundHandler只是在channelRead后进行了msg的自动释放。

ChannelPipeline实现原理数据结构

双向链表,固定头尾(head和tail),如图所示:

事件传播方向Inbound事件处理顺序:head--->tailOutbound事件处理顺序:tail---->head注意:配置In/out的handler的顺序注意:所有hanlder都无法处理的消息,将丢弃掉,不会抛异常;在处理In事件时,会过滤掉所有的OutboundHandler,只由InboundHandler来处理;Out事件同理。事件传播方法Inbound事件传播方法

ChannelHandlerContext#fireChannelRegistered()ChannelHandlerContext#fireChannelActive()ChannelHandlerContext#fireChannelRead(Object)ChannelHandlerContext#fireChannelReadComplete()ChannelHandlerContext#fireExceptionCaught(Throwable)ChannelHandlerContext#fireUserEventTriggered(Object)ChannelHandlerContext#fireChannelWritabilityChanged()ChannelHandlerContext#fireChannelInactive()ChannelHandlerContext#fireChannelUnregistered()
Outbound事件传播方法
ChannelHandlerContext#bind(SocketAddress, ChannelPromise)ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)ChannelHandlerContext#write(Object, ChannelPromise)ChannelHandlerContext#flush()ChannelHandlerContext#read()ChannelHandlerContext#disconnect(ChannelPromise)ChannelHandlerContext#close(ChannelPromise)ChannelHandlerContext#deregister(ChannelPromise)
ChannelPipelinel创建和初始化过程创建过程

无论client,还是server,都会在创建Channel时,还会创建了一个默认DefaultChannelPipeline。

AbstractChannel 类protected AbstractChannel(Channel parent) {    this.parent = parent;    id = newId();    unsafe = newUnsafe();    pipeline = newChannelPipeline();}protected DefaultChannelPipeline newChannelPipeline() {    return new DefaultChannelPipeline(this);}

server端的channel初始化时,会在pipeline中添加一个ChannelInitializer。

ServerBootstrap 类void init(Channel channel) {  ......    p.addLast(new ChannelInitializer<Channel>() {        @Override        public void initChannel(final Channel ch) {            final ChannelPipeline pipeline = ch.pipeline();            ChannelHandler handler = config.handler();            if (handler != null) {                pipeline.addLast(handler);            }            ch.eventLoop().execute(new Runnable() {                @Override                public void run() {                    pipeline.addLast(new ServerBootstrapAcceptor(                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));                }            });        }    });}

当前pipeline是这样的,如下图所示,ChannelInitializer是临时存在的,它的使命是在channel注册到EvenLoop时,把配置handler初始到pipeline链上,同时会在链上移除自己

初始化过程

从channel注册到EvenLoop的方法: AbstractChannel#register0(...)------>pipeline.invokeHandlerAddedIfNeeded(); ------> ChannelInitializer#handlerAdded(...) ---ChannelInitializer#initChannel(...)

AbstractChannel#register0方法private void register0(ChannelPromise promise) {    ......    // 此处会把配置handler初始到pipeline链上    pipeline.invokeHandlerAddedIfNeeded();    ......}ChannelInitializer类handlerAdded和initChannelprivate boolean initChannel(ChannelHandlerContext ctx) throws Exception {    if (initMap.add(ctx)) { // Guard against re-entrance.        try {            // 调用我们重写的initChannel方法,把handler添加到pipeline链上            initChannel((C) ctx.channel());        } catch (Throwable cause) {            exceptionCaught(ctx, cause);        } finally {            ChannelPipeline pipeline = ctx.pipeline();            if (pipeline.context(this) != null) {                // 把自己从pipeline中移除                pipeline.remove(this);            }        }        return true;    }    return false;}

注册完后,所有的handler都初始化到pipeline链上了,如下图所示:

client端的pipeline初始化过程差不多。聊到这了,咱们在来看看server端接收到一个新连接后,pipeline是怎么创建的呢?整个过程和上面一样的

感兴趣的话,可以按照下面的步骤跟踪一下:

NioEventLoop#run--->NioEventLoop#processSelectedKeys--->NioEventLoop#processSelectedKeysOptimized--->NioEventLoop#processSelectedKey--->NioMessageUnsafe#read()--->NioServerSocketChannel#doReadMessages--->new NioSocketChannel(this, ch) 默认创建一个DefaultChannelPipeline。

初始化过程:ServerBootstrap的ServerBootstrapAcceptor类#channelRead()中childGroup.register(child),初始化的流程和上面的一样了。

ChannelPipelinel实战

public class PipeLineOrderServer extends AbstractNettyServer {    @Override    public void initChannelInitializer(SocketChannel ch) {        ch.pipeline().addLast(new LineBasedFrameDecoder(1024));        ch.pipeline().addLast(new StringDecoder());        ch.pipeline().addLast(new OutBoundHandler1());        ch.pipeline().addLast(new OutBoundHandler2());        ch.pipeline().addLast(new InBoundHandler1());        ch.pipeline().addLast(new InBoundHandler2());    }    public static void main(String[] args) {        Server server = new PipeLineOrderServer();        server.start();    }    @ChannelHandler.Sharable    class InBoundHandler1 extends ChannelInboundHandlerAdapter {        @Override        public void channelActive(ChannelHandlerContext ctx) throws Exception {        }        @Override        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {            System.out.println("InBoundHandler1-channelRead:" + msg.toString());            ctx.fireUserEventTriggered(msg);        }        @Override        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {            cause.printStackTrace();            ctx.close();        }    }    @ChannelHandler.Sharable    class InBoundHandler2 extends ChannelInboundHandlerAdapter {        @Override        public void channelActive(ChannelHandlerContext ctx) throws Exception {        }        @Override        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {            System.out.println("InBoundHandler2-channelRead:" + msg.toString());            ctx.writeAndFlush(Unpooled.wrappedBuffer("hello world!\n".getBytes()));        }        @Override        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {            System.out.println("InBoundHandler2-userEventTriggered:" + evt.toString());            ctx.writeAndFlush(Unpooled.wrappedBuffer("hello world!\n".getBytes()));        }        @Override        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {            cause.printStackTrace();            ctx.close();        }    }    class OutBoundHandler1 extends ChannelOutboundHandlerAdapter{        @Override        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {            System.out.println("OutBoundHandler1=====");            ctx.write(msg, promise);        }    }    class OutBoundHandler2 extends ChannelOutboundHandlerAdapter{        @Override        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {            System.out.println("OutBoundHandler2=====");            ctx.write(msg, promise);        }    }}
执行结果:InBoundHandler1-channelRead:abc23InBoundHandler2-userEventTriggered:abc23OutBoundHandler2=====OutBoundHandler1=====InBoundHandler1-channelRead:asdInBoundHandler2-userEventTriggered:asdOutBoundHandler2=====OutBoundHandler1=====

标签: #nettypipeline怎么用