龙空技术网

浅谈Netty源码

Java老油条 263

前言:

此刻你们对“netty源码剖析”都比较关切,大家都想要了解一些“netty源码剖析”的相关内容。那么小编同时在网摘上汇集了一些关于“netty源码剖析””的相关文章,希望咱们能喜欢,同学们一起来学习一下吧!

前言

Netty框架的原理是Reactor模型中基于多个反应器的多线程模式,本篇文章主要介绍Netty较为重要的几个概念,编写思路借鉴了参考资料中的文章

ChannelFuture

我们先来了解了解Netty中几个较为重要的接口

public interface Future<V> extends java.util.concurrent.Future<V> {    // I/O操作是否成功,成功返回true    boolean isSuccess();​    // 是否可取消    boolean isCancellable();​    // 抛出I/O操作失败原因    Throwable cause();​    // 此处使用了观察者模式,该Future任务完成,这个Listener会立刻收到通知    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);​    // 移除监听器    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);​    // 等待Future任务完成,如果任务失败会抛出异常    Future<V> sync() throws InterruptedException;    Future<V> syncUninterruptibly();​    // 等待Future任务完成,如果任务失败不会抛出异常    Future<V> await() throws InterruptedException;    Future<V> awaitUninterruptibly();    boolean await(long timeout, TimeUnit unit) throws InterruptedException;    boolean await(long timeoutMillis) throws InterruptedException;    boolean awaitUninterruptibly(long timeout, TimeUnit unit);    boolean awaitUninterruptibly(long timeoutMillis);​    // 立即获取Future任务结果,如果Future任务未完成会返回null,所以在使用这个方法之前最好先判断这个Future任务是否完成    V getNow();}复制代码

Netty的Future接口继承了jdk1.5的Future接口,在本身已经有Future接口的情况下为什么要重复造轮子?

这是因为jdk1.5的Future接口不满足Netty的需求,jdk的Future接口可以获取异步计算的结果,并且提供了多种方法,可查看任务是否取消是否完成,但是使用者无法知道方法什么时候完成,比如某用户提交了一个Future任务,什么时候才能去调用get()方法获取结果,总不能循环调用isDone()方法吧,这样太消耗cpu资源。Netty的Future接口一定程度上弥补了这个缺陷,通过新增监听器,可以得知该任务是否完成以及任务完成后该做的事情,isSuccess可以得知任务是否成功,netty的Future接口更加智能

public interface ChannelFuture extends Future<Void> {​    // 返回ChannelFuture关联的Channel    Channel channel();​    @Override    ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);​    @Override    ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);​    @Override    ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);​    @Override    ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);​    @Override    ChannelFuture sync() throws InterruptedException;​    @Override    ChannelFuture syncUninterruptibly();​    @Override    ChannelFuture await() throws InterruptedException;​    @Override    ChannelFuture awaitUninterruptibly();}复制代码

ChannelFuture继承了Netty的Future接口,由于Netty中所有的I/O操作都是异步了,所以当方法返回时,不代表I/O操作已经完成,所以ChannelFuture封装了异步I/O操作的结果,接口定义的方法与Netty的Future接口相似,并没有什么新鲜的,值得一提的是ChannelFuture关联了Channel

public interface GenericFutureListener<F extends Future<?>> extends EventListener {​    // 此operation关联的Future任务完成时,这个方法会被调用    void operationComplete(F future) throws Exception;}复制代码

GenericFutureListener接口中定义了Listener的回调方法,当Future任务完成时,会回调此类中的方法

public interface Promise<V> extends Future<V> {​    // 标记该Future任务成功,并且会通知所有的Listener    // 如果该操作失败,会抛出异常(失败是指该Future任务已经有了成功的结果或者失败的结果)    Promise<V> setSuccess(V result);​    // 标记该Future任务成功,并且会通知所有的Listener    // 操作失败不抛出异常,返回false    boolean trySuccess(V result);​    // 标记该Future任务失败,并且会通知所有的Listener    // 如果该操作失败,会抛出异常(失败是指该Future任务已经有了成功的结果或者失败的结果)    Promise<V> setFailure(Throwable cause);​    // 标记该Future任务失败,并且会通知所有的Listener    // 操作失败不抛出异常,返回false    boolean tryFailure(Throwable cause);​    // 标记该Future任务不可取消    boolean setUncancellable();​    @Override    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);​    @Override    Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);​    @Override    Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);​    @Override    Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);​    @Override    Promise<V> await() throws InterruptedException;​    @Override    Promise<V> awaitUninterruptibly();​    @Override    Promise<V> sync() throws InterruptedException;​    @Override    Promise<V> syncUninterruptibly();}复制代码

Promise同样也继承了Netty的Future接口,由于Netty的Future接口中没有写操作相关的接口,所以Netty通过Promise进行扩展,用于设置I/O操作的结果,接口中的setSuccess()、setFailure()方法会在任务完成后调用,然后回调Listener中的方法,经过这些操作后,await() 或 sync() 的线程就会从等待中返回。

public interface ChannelPromise extends ChannelFuture, Promise<Void> {​    @Override    Channel channel();​    @Override    ChannelPromise setSuccess(Void result);​    ChannelPromise setSuccess();​    boolean trySuccess();​    @Override    ChannelPromise setFailure(Throwable cause);​    @Override    ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);​    @Override    ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);​    @Override    ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);​    @Override    ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);​    @Override    ChannelPromise sync() throws InterruptedException;​    @Override    ChannelPromise syncUninterruptibly();​    @Override    ChannelPromise await() throws InterruptedException;​    @Override    ChannelPromise awaitUninterruptibly();​    ChannelPromise unvoid();}​复制代码

ChannelPromise接口同时继承了ChannelFuture, Promise,拥有双方的特性,接口中的方法同样跟之前的接口非常相似,只是返回值变成了ChannelPromise

看完以上的接口后,我们来看看Netty中对于这些接口的实现

观察这张类图,可以发现,DefaultPromise实现了Promise,DefaultChannelPromise实现了ChannelPromise并且继承了DefaultPromise,DefaultPromise由于没有实现ChannelFuture,所以没有ChannelFuture相关的特性,所以要看Netty中关于以上接口的实现,应该去看DefaultChannelPromise这个类

public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {​    private final Channel channel;    private long checkpoint;​    /**     * Creates a new instance.     *     * @param channel     *        the {@link Channel} associated with this future     */    public DefaultChannelPromise(Channel channel, EventExecutor executor) {        super(executor);        this.channel = checkNotNull(channel, "channel");    }​​    @Override    public ChannelPromise setSuccess(Void result) {        super.setSuccess(result);        return this;    }​    @Override    public ChannelPromise setFailure(Throwable cause) {        super.setFailure(cause);        return this;    }​    @Override    public ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) {        super.addListener(listener);        return this;    }}复制代码

观察DefaultChannelPromise的代码,可以发现,很多方法都调用了DefaultPromise父类中的方法,所以我们转移一下战场,去看DefaultPromise的代码

@Overridepublic Promise<V> setSuccess(V result) {    if (setSuccess0(result)) {      return this;    }    throw new IllegalStateException("complete already: " + this);}​@Overridepublic boolean trySuccess(V result) {    return setSuccess0(result);}​private boolean setSuccess0(V result) {    return setValue0(result == null ? SUCCESS : result);}​private boolean setValue0(Object objResult) {    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {      if (checkNotifyWaiters()) {          // 唤醒Listeners          notifyListeners();      }      return true;    }    return false;}复制代码

可以看到setSuccess的步骤就是设置值,然后唤醒所有的Listeners,如果这个操作失败,会抛出异常,trySuccess也是同样的步骤,但是不会抛出异常

ChannelPipeline

ChannelPipeline本身是一个与Channel关联的容器对象,这个容器中存放了多个ChannelHandlerContext,ChannelHandlerContext中存放的是我们编写的ChannelHandler对象,多个ChannelHandlerContext使用链表串联,I/O事件按照顺序经过ChannelPipeline中的一个个ChannelHandler

如上图

Netty中的事件分为Inbound事件和Outbound事件,Inbound事件通常由I/O线程触发,例如TCP链路建立事件、读事件,Outbound事件通常是用户主动发起的网络I/O事件,例如连接事件、读事件

p.addLast("1", new InboundHandlerA());p.addLast("2", new InboundHandlerB());p.addLast("3", new OutboundHandlerA());p.addLast("4", new OutboundHandlerB());p.addLast("5", new InboundOutboundHandlerX());复制代码

以上是一段添加ChannelHandler对象的代码,以Inbound开头的类意味着它是一个入站Handler,以Outbound开头的类表示它是一个出站Handler,我们猜测一下这些ChannelHandler的执行顺序。

3、4没有实现ChannelnboundHandler,1、2没有实现ChannelOutboundHandler,5既实现了ChannelnboundHandler又实现了ChannelOutboundHandler,按照先执行Inbound事件,再执行Outbound事件的规则的话,执行顺序应该是1->2->5->3->4->5。

实际上不是的,Inbound事件的执行顺序是从前往后,Outbound事件的执行顺序是从后往前,所以执行顺序是1->2->5->5->4->3

ChannelPipeline的创建时机

前面讲过ChannelPipeline和Channel是一一搭配的,所以Channel创建的时候ChannelPipeline也会随之创建

protected AbstractChannel(Channel parent) {    this.parent = parent;    id = newId();    unsafe = newUnsafe();    // 此处会调用下面的方法    pipeline = newChannelPipeline();}​protected DefaultChannelPipeline newChannelPipeline() {    // 创建DefaultChannelPipeline    return new DefaultChannelPipeline(this);}复制代码
// 此方法就是创建两个链表节点,并且让头节点和尾节点双向连接protected DefaultChannelPipeline(Channel channel) {    this.channel = ObjectUtil.checkNotNull(channel, "channel");    succeededFuture = new SucceededChannelFuture(channel, null);    voidPromise =  new VoidChannelPromise(channel, true);    // 观察TailContext,可以发现TailContext继承了AbstractChannelHandlerContext    // 说明ChannelPipeline中的节点是ChannelHandlerContext,不是ChannelHandler    tail = new TailContext(this);    head = new HeadContext(this);​    head.next = tail;    tail.prev = head;}复制代码
NioEventLoopGroup

可以看到NioEventLoopGroup最顶层继承的接口是Executor,说明NioEventLoopGroup就是一个线程池,NioEventLoop是其创建出来的一个线程

public NioEventLoopGroup() {    this(0);}​public NioEventLoopGroup(int nThreads) {    // 注意,这里executor赋值为null    this(nThreads, (Executor) null);}​public NioEventLoopGroup(int nThreads, Executor executor) {    this(nThreads, executor, SelectorProvider.provider());}​public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {    this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);}​protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);}​// 看到这里,我们可以知道Netty默认的线程数是2 * CPU 个private static final int DEFAULT_EVENT_LOOP_THREADS;​    static {        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));}​​​protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);}复制代码

构造器的代码一路追,终于找到干正事的方法

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,                                            EventExecutorChooserFactory chooserFactory, Object... args) {    checkPositive(nThreads, "nThreads");    // 因为构造器中赋值为null,所以此处executor为ThreadPerTaskExecutor()    if (executor == null) {      executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());    }​    children = new EventExecutor[nThreads];​    for (int i = 0; i < nThreads; i ++) {      boolean success = false;      try {        // 这里很关键,下面细说        children[i] = newChild(executor, args);        success = true;      } catch (Exception e) {        // TODO: Think about if this is a good exception type        throw new IllegalStateException("failed to create a child event loop", e);      } finally {        // 新建线程失败,将线程优雅关闭        if (!success) {          for (int j = 0; j < i; j ++) {            children[j].shutdownGracefully();          }​          for (int j = 0; j < i; j ++) {            EventExecutor e = children[j];            try {              while (!e.isTerminated()) {                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);              }            } catch (InterruptedException interrupted) {              // Let the caller handle the interruption.              Thread.currentThread().interrupt();              break;            }          }        }      }    }    // 选择合适的轮训机制    chooser = chooserFactory.newChooser(children);​    // 新建一个监听器,用于监听是否所有线程都terminated了    final FutureListener<Object> terminationListener = new FutureListener<Object>() {      @Override      public void operationComplete(Future<Object> future) throws Exception {        if (terminatedChildren.incrementAndGet() == children.length) {          terminationFuture.setSuccess(null);        }      }    };    // 给所有的EventExecutor都设置上这个监听器    for (EventExecutor e: children) {      e.terminationFuture().addListener(terminationListener);    }​    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);    Collections.addAll(childrenSet, children);    readonlyChildren = Collections.unmodifiableSet(childrenSet);}复制代码
// executors的数量是2的幂次方和非2的幂次方使用不同的轮训方式@Overridepublic EventExecutorChooser newChooser(EventExecutor[] executors) {    // 判断executors是否是2的幂次方    if (isPowerOfTwo(executors.length)) {      return new PowerOfTwoEventExecutorChooser(executors);    } else {      return new GenericEventExecutorChooser(executors);    }}复制代码

上面说到,NioEventLoopGroup是一个线程池,NioEventLoop是其创建出来的一个个线程,上面的newChild()方法便是创建一个个NioEventLoop,我们来看看NioEventLoop的构造方法

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,                 EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {    super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),          rejectedExecutionHandler);    this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");    this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");    final SelectorTuple selectorTuple = openSelector();    // 这里可以看到selector跟NioEventLoop进行了绑定    this.selector = selectorTuple.selector;    this.unwrappedSelector = selectorTuple.unwrappedSelector;}​​protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,                                        boolean addTaskWakesUp, Queue<Runnable> taskQueue,                                        RejectedExecutionHandler rejectedHandler) {    super(parent);    this.addTaskWakesUp = addTaskWakesUp;    this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;    this.executor = ThreadExecutorMap.apply(executor, this);    // 这里是一个关键点,对一个任务队列进行了赋值,这里的任务队列有什么用呢?    // NioEventLoop一部分时间会执行I/O任务,一部分时间执行非I/O任务,在执行I/O任务时,如果有任务过来,会先把任务放    // 到任务队列中    this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");    this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");}复制代码

到这里为止,NioEventLoop便跟selector关联起来了

作者:Xiao镔

链接:

来源:掘金

著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

标签: #netty源码剖析