前言:
现在你们对“nettypipeline怎么用”大约比较关切,姐妹们都需要了解一些“nettypipeline怎么用”的相关资讯。那么小编同时在网上汇集了一些关于“nettypipeline怎么用””的相关资讯,希望同学们能喜欢,朋友们一起来学习一下吧!ChannelPipeline的剖析学习目标ChannelPipeline概念ChannelPipelinel创建和初始化过程ChannelPipelinel实战学习目标ChannelPipeline责任链实现ChannelPipeline初始化过程和传递过程ChannelPipeline概念
ChannelPipeline:由多个ChannelHandler组成的责任链,专门负责拦截处理Channel的Inbound(入站)和Outbound(出站)事件的。例如:我们会定义一个ChannelHandler,添加到ChannelPipeline中,从socket中读取数据,进行相应的业务逻辑处理。
ChannelHandler:分为ChannelInboundHandler和ChannelOutboundHandler两个接口,与之对应的还有两个适配器类:ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter,开发时只需继承适配器类重写相应方法就可以了,别的方法适配器类已经给处理了。还有个SimpleChannelInboundHandler只是在channelRead后进行了msg的自动释放。
ChannelPipeline实现原理数据结构
双向链表,固定头尾(head和tail),如图所示:
事件传播方向Inbound事件处理顺序:head--->tailOutbound事件处理顺序:tail---->head注意:配置In/out的handler的顺序注意:所有hanlder都无法处理的消息,将丢弃掉,不会抛异常;在处理In事件时,会过滤掉所有的OutboundHandler,只由InboundHandler来处理;Out事件同理。事件传播方法Inbound事件传播方法
ChannelHandlerContext#fireChannelRegistered()ChannelHandlerContext#fireChannelActive()ChannelHandlerContext#fireChannelRead(Object)ChannelHandlerContext#fireChannelReadComplete()ChannelHandlerContext#fireExceptionCaught(Throwable)ChannelHandlerContext#fireUserEventTriggered(Object)ChannelHandlerContext#fireChannelWritabilityChanged()ChannelHandlerContext#fireChannelInactive()ChannelHandlerContext#fireChannelUnregistered()Outbound事件传播方法
ChannelHandlerContext#bind(SocketAddress, ChannelPromise)ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)ChannelHandlerContext#write(Object, ChannelPromise)ChannelHandlerContext#flush()ChannelHandlerContext#read()ChannelHandlerContext#disconnect(ChannelPromise)ChannelHandlerContext#close(ChannelPromise)ChannelHandlerContext#deregister(ChannelPromise)ChannelPipelinel创建和初始化过程创建过程
无论client,还是server,都会在创建Channel时,还会创建了一个默认DefaultChannelPipeline。
AbstractChannel 类protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline();}protected DefaultChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this);}
server端的channel初始化时,会在pipeline中添加一个ChannelInitializer。
ServerBootstrap 类void init(Channel channel) { ...... p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });}
当前pipeline是这样的,如下图所示,ChannelInitializer是临时存在的,它的使命是在channel注册到EvenLoop时,把配置handler初始到pipeline链上,同时会在链上移除自己。
初始化过程
从channel注册到EvenLoop的方法: AbstractChannel#register0(...)------>pipeline.invokeHandlerAddedIfNeeded(); ------> ChannelInitializer#handlerAdded(...) ---ChannelInitializer#initChannel(...)
AbstractChannel#register0方法private void register0(ChannelPromise promise) { ...... // 此处会把配置handler初始到pipeline链上 pipeline.invokeHandlerAddedIfNeeded(); ......}ChannelInitializer类handlerAdded和initChannelprivate boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { // Guard against re-entrance. try { // 调用我们重写的initChannel方法,把handler添加到pipeline链上 initChannel((C) ctx.channel()); } catch (Throwable cause) { exceptionCaught(ctx, cause); } finally { ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this) != null) { // 把自己从pipeline中移除 pipeline.remove(this); } } return true; } return false;}
注册完后,所有的handler都初始化到pipeline链上了,如下图所示:
client端的pipeline初始化过程差不多。聊到这了,咱们在来看看server端接收到一个新连接后,pipeline是怎么创建的呢?整个过程和上面一样的。
感兴趣的话,可以按照下面的步骤跟踪一下:
NioEventLoop#run--->NioEventLoop#processSelectedKeys--->NioEventLoop#processSelectedKeysOptimized--->NioEventLoop#processSelectedKey--->NioMessageUnsafe#read()--->NioServerSocketChannel#doReadMessages--->new NioSocketChannel(this, ch) 默认创建一个DefaultChannelPipeline。
初始化过程:ServerBootstrap的ServerBootstrapAcceptor类#channelRead()中childGroup.register(child),初始化的流程和上面的一样了。
ChannelPipelinel实战
public class PipeLineOrderServer extends AbstractNettyServer { @Override public void initChannelInitializer(SocketChannel ch) { ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new OutBoundHandler1()); ch.pipeline().addLast(new OutBoundHandler2()); ch.pipeline().addLast(new InBoundHandler1()); ch.pipeline().addLast(new InBoundHandler2()); } public static void main(String[] args) { Server server = new PipeLineOrderServer(); server.start(); } @ChannelHandler.Sharable class InBoundHandler1 extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InBoundHandler1-channelRead:" + msg.toString()); ctx.fireUserEventTriggered(msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } @ChannelHandler.Sharable class InBoundHandler2 extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InBoundHandler2-channelRead:" + msg.toString()); ctx.writeAndFlush(Unpooled.wrappedBuffer("hello world!\n".getBytes())); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { System.out.println("InBoundHandler2-userEventTriggered:" + evt.toString()); ctx.writeAndFlush(Unpooled.wrappedBuffer("hello world!\n".getBytes())); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } class OutBoundHandler1 extends ChannelOutboundHandlerAdapter{ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("OutBoundHandler1====="); ctx.write(msg, promise); } } class OutBoundHandler2 extends ChannelOutboundHandlerAdapter{ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("OutBoundHandler2====="); ctx.write(msg, promise); } }}
执行结果:InBoundHandler1-channelRead:abc23InBoundHandler2-userEventTriggered:abc23OutBoundHandler2=====OutBoundHandler1=====InBoundHandler1-channelRead:asdInBoundHandler2-userEventTriggered:asdOutBoundHandler2=====OutBoundHandler1=====
标签: #nettypipeline怎么用