前言:
此刻你们对“netty源码剖析”都比较关切,大家都想要了解一些“netty源码剖析”的相关内容。那么小编同时在网摘上汇集了一些关于“netty源码剖析””的相关文章,希望咱们能喜欢,同学们一起来学习一下吧!前言
Netty框架的原理是Reactor模型中基于多个反应器的多线程模式,本篇文章主要介绍Netty较为重要的几个概念,编写思路借鉴了参考资料中的文章
ChannelFuture
我们先来了解了解Netty中几个较为重要的接口
public interface Future<V> extends java.util.concurrent.Future<V> { // I/O操作是否成功,成功返回true boolean isSuccess(); // 是否可取消 boolean isCancellable(); // 抛出I/O操作失败原因 Throwable cause(); // 此处使用了观察者模式,该Future任务完成,这个Listener会立刻收到通知 Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); // 移除监听器 Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); // 等待Future任务完成,如果任务失败会抛出异常 Future<V> sync() throws InterruptedException; Future<V> syncUninterruptibly(); // 等待Future任务完成,如果任务失败不会抛出异常 Future<V> await() throws InterruptedException; Future<V> awaitUninterruptibly(); boolean await(long timeout, TimeUnit unit) throws InterruptedException; boolean await(long timeoutMillis) throws InterruptedException; boolean awaitUninterruptibly(long timeout, TimeUnit unit); boolean awaitUninterruptibly(long timeoutMillis); // 立即获取Future任务结果,如果Future任务未完成会返回null,所以在使用这个方法之前最好先判断这个Future任务是否完成 V getNow();}复制代码
Netty的Future接口继承了jdk1.5的Future接口,在本身已经有Future接口的情况下为什么要重复造轮子?
这是因为jdk1.5的Future接口不满足Netty的需求,jdk的Future接口可以获取异步计算的结果,并且提供了多种方法,可查看任务是否取消是否完成,但是使用者无法知道方法什么时候完成,比如某用户提交了一个Future任务,什么时候才能去调用get()方法获取结果,总不能循环调用isDone()方法吧,这样太消耗cpu资源。Netty的Future接口一定程度上弥补了这个缺陷,通过新增监听器,可以得知该任务是否完成以及任务完成后该做的事情,isSuccess可以得知任务是否成功,netty的Future接口更加智能
public interface ChannelFuture extends Future<Void> { // 返回ChannelFuture关联的Channel Channel channel(); @Override ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelFuture sync() throws InterruptedException; @Override ChannelFuture syncUninterruptibly(); @Override ChannelFuture await() throws InterruptedException; @Override ChannelFuture awaitUninterruptibly();}复制代码
ChannelFuture继承了Netty的Future接口,由于Netty中所有的I/O操作都是异步了,所以当方法返回时,不代表I/O操作已经完成,所以ChannelFuture封装了异步I/O操作的结果,接口定义的方法与Netty的Future接口相似,并没有什么新鲜的,值得一提的是ChannelFuture关联了Channel
public interface GenericFutureListener<F extends Future<?>> extends EventListener { // 此operation关联的Future任务完成时,这个方法会被调用 void operationComplete(F future) throws Exception;}复制代码
GenericFutureListener接口中定义了Listener的回调方法,当Future任务完成时,会回调此类中的方法
public interface Promise<V> extends Future<V> { // 标记该Future任务成功,并且会通知所有的Listener // 如果该操作失败,会抛出异常(失败是指该Future任务已经有了成功的结果或者失败的结果) Promise<V> setSuccess(V result); // 标记该Future任务成功,并且会通知所有的Listener // 操作失败不抛出异常,返回false boolean trySuccess(V result); // 标记该Future任务失败,并且会通知所有的Listener // 如果该操作失败,会抛出异常(失败是指该Future任务已经有了成功的结果或者失败的结果) Promise<V> setFailure(Throwable cause); // 标记该Future任务失败,并且会通知所有的Listener // 操作失败不抛出异常,返回false boolean tryFailure(Throwable cause); // 标记该Future任务不可取消 boolean setUncancellable(); @Override Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); @Override Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); @Override Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); @Override Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); @Override Promise<V> await() throws InterruptedException; @Override Promise<V> awaitUninterruptibly(); @Override Promise<V> sync() throws InterruptedException; @Override Promise<V> syncUninterruptibly();}复制代码
Promise同样也继承了Netty的Future接口,由于Netty的Future接口中没有写操作相关的接口,所以Netty通过Promise进行扩展,用于设置I/O操作的结果,接口中的setSuccess()、setFailure()方法会在任务完成后调用,然后回调Listener中的方法,经过这些操作后,await() 或 sync() 的线程就会从等待中返回。
public interface ChannelPromise extends ChannelFuture, Promise<Void> { @Override Channel channel(); @Override ChannelPromise setSuccess(Void result); ChannelPromise setSuccess(); boolean trySuccess(); @Override ChannelPromise setFailure(Throwable cause); @Override ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelPromise sync() throws InterruptedException; @Override ChannelPromise syncUninterruptibly(); @Override ChannelPromise await() throws InterruptedException; @Override ChannelPromise awaitUninterruptibly(); ChannelPromise unvoid();}复制代码
ChannelPromise接口同时继承了ChannelFuture, Promise,拥有双方的特性,接口中的方法同样跟之前的接口非常相似,只是返回值变成了ChannelPromise
看完以上的接口后,我们来看看Netty中对于这些接口的实现
观察这张类图,可以发现,DefaultPromise实现了Promise,DefaultChannelPromise实现了ChannelPromise并且继承了DefaultPromise,DefaultPromise由于没有实现ChannelFuture,所以没有ChannelFuture相关的特性,所以要看Netty中关于以上接口的实现,应该去看DefaultChannelPromise这个类
public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint { private final Channel channel; private long checkpoint; /** * Creates a new instance. * * @param channel * the {@link Channel} associated with this future */ public DefaultChannelPromise(Channel channel, EventExecutor executor) { super(executor); this.channel = checkNotNull(channel, "channel"); } @Override public ChannelPromise setSuccess(Void result) { super.setSuccess(result); return this; } @Override public ChannelPromise setFailure(Throwable cause) { super.setFailure(cause); return this; } @Override public ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) { super.addListener(listener); return this; }}复制代码
观察DefaultChannelPromise的代码,可以发现,很多方法都调用了DefaultPromise父类中的方法,所以我们转移一下战场,去看DefaultPromise的代码
@Overridepublic Promise<V> setSuccess(V result) { if (setSuccess0(result)) { return this; } throw new IllegalStateException("complete already: " + this);}@Overridepublic boolean trySuccess(V result) { return setSuccess0(result);}private boolean setSuccess0(V result) { return setValue0(result == null ? SUCCESS : result);}private boolean setValue0(Object objResult) { if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { if (checkNotifyWaiters()) { // 唤醒Listeners notifyListeners(); } return true; } return false;}复制代码
可以看到setSuccess的步骤就是设置值,然后唤醒所有的Listeners,如果这个操作失败,会抛出异常,trySuccess也是同样的步骤,但是不会抛出异常
ChannelPipeline
ChannelPipeline本身是一个与Channel关联的容器对象,这个容器中存放了多个ChannelHandlerContext,ChannelHandlerContext中存放的是我们编写的ChannelHandler对象,多个ChannelHandlerContext使用链表串联,I/O事件按照顺序经过ChannelPipeline中的一个个ChannelHandler
如上图
Netty中的事件分为Inbound事件和Outbound事件,Inbound事件通常由I/O线程触发,例如TCP链路建立事件、读事件,Outbound事件通常是用户主动发起的网络I/O事件,例如连接事件、读事件
p.addLast("1", new InboundHandlerA());p.addLast("2", new InboundHandlerB());p.addLast("3", new OutboundHandlerA());p.addLast("4", new OutboundHandlerB());p.addLast("5", new InboundOutboundHandlerX());复制代码
以上是一段添加ChannelHandler对象的代码,以Inbound开头的类意味着它是一个入站Handler,以Outbound开头的类表示它是一个出站Handler,我们猜测一下这些ChannelHandler的执行顺序。
3、4没有实现ChannelnboundHandler,1、2没有实现ChannelOutboundHandler,5既实现了ChannelnboundHandler又实现了ChannelOutboundHandler,按照先执行Inbound事件,再执行Outbound事件的规则的话,执行顺序应该是1->2->5->3->4->5。
实际上不是的,Inbound事件的执行顺序是从前往后,Outbound事件的执行顺序是从后往前,所以执行顺序是1->2->5->5->4->3
ChannelPipeline的创建时机
前面讲过ChannelPipeline和Channel是一一搭配的,所以Channel创建的时候ChannelPipeline也会随之创建
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); // 此处会调用下面的方法 pipeline = newChannelPipeline();}protected DefaultChannelPipeline newChannelPipeline() { // 创建DefaultChannelPipeline return new DefaultChannelPipeline(this);}复制代码
// 此方法就是创建两个链表节点,并且让头节点和尾节点双向连接protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); // 观察TailContext,可以发现TailContext继承了AbstractChannelHandlerContext // 说明ChannelPipeline中的节点是ChannelHandlerContext,不是ChannelHandler tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head;}复制代码NioEventLoopGroup
可以看到NioEventLoopGroup最顶层继承的接口是Executor,说明NioEventLoopGroup就是一个线程池,NioEventLoop是其创建出来的一个线程
public NioEventLoopGroup() { this(0);}public NioEventLoopGroup(int nThreads) { // 注意,这里executor赋值为null this(nThreads, (Executor) null);}public NioEventLoopGroup(int nThreads, Executor executor) { this(nThreads, executor, SelectorProvider.provider());}public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) { this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);}protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);}// 看到这里,我们可以知道Netty默认的线程数是2 * CPU 个private static final int DEFAULT_EVENT_LOOP_THREADS; static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));}protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);}复制代码
构造器的代码一路追,终于找到干正事的方法
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { checkPositive(nThreads, "nThreads"); // 因为构造器中赋值为null,所以此处executor为ThreadPerTaskExecutor() if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { // 这里很关键,下面细说 children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { // 新建线程失败,将线程优雅关闭 if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } // 选择合适的轮训机制 chooser = chooserFactory.newChooser(children); // 新建一个监听器,用于监听是否所有线程都terminated了 final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; // 给所有的EventExecutor都设置上这个监听器 for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet);}复制代码
// executors的数量是2的幂次方和非2的幂次方使用不同的轮训方式@Overridepublic EventExecutorChooser newChooser(EventExecutor[] executors) { // 判断executors是否是2的幂次方 if (isPowerOfTwo(executors.length)) { return new PowerOfTwoEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); }}复制代码
上面说到,NioEventLoopGroup是一个线程池,NioEventLoop是其创建出来的一个个线程,上面的newChild()方法便是创建一个个NioEventLoop,我们来看看NioEventLoop的构造方法
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) { super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory), rejectedExecutionHandler); this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider"); this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy"); final SelectorTuple selectorTuple = openSelector(); // 这里可以看到selector跟NioEventLoop进行了绑定 this.selector = selectorTuple.selector; this.unwrappedSelector = selectorTuple.unwrappedSelector;}protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS; this.executor = ThreadExecutorMap.apply(executor, this); // 这里是一个关键点,对一个任务队列进行了赋值,这里的任务队列有什么用呢? // NioEventLoop一部分时间会执行I/O任务,一部分时间执行非I/O任务,在执行I/O任务时,如果有任务过来,会先把任务放 // 到任务队列中 this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue"); this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");}复制代码
到这里为止,NioEventLoop便跟selector关联起来了
作者:Xiao镔
链接:
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
标签: #netty源码剖析