NioEventLoopGroup 源码分析 1. 在阅读源码时做了一定的注释,并且做了一些测试分析源码内的执行流程,由于博客篇幅有限。为了方便 IDE 查看、跟踪、调试 代码,所以在 github 上提供 netty 的源码、详细的注释及测试用例。欢迎大家 star、fork ! 2. 由于个人水平有限,对源码的分析理解可能存在偏差或不透彻的地方还请大家在评论区指出,谢谢!
从今天开始,就准备进军 ne tty 了,主要的想法是看看 netty4 中一些比较重要的实现,也就是能经常出现在我们面前的东西。主要是: 线程池、通道、管道、编解码器、以及常用的工具类。
然后现在看源码应该不会像之前的 jdk 那么细致了,主要是看了一个类以后就发现 netty 对代码封装太强了,基本一个功能可能封装了七八个类去实现,很多的抽象类但是这些抽象类中的功能还非常的多。所以说主要看这个流程,以及里面写的比较好的代码或者比较新的思想会仔细的去看看。具体的子字段,每个方法不可能做到那么细致。
好,正式开始 netty 源码征战 !
这里首先讲一下结论,也就是先说我看这个类的源码整理出来的思路,主要就是因为这些类太杂,一个功能在好几个类中才完全实现。
我们在 new 一个 worker/boss 线程的时候一般是采用的直接使用的无参的构造方法,但是无参的构造方法他创建的线程池的大小是我们 CPU 核心的 2 倍。紧接着就需要 new 这么多个线程放到线程池里面,这里的线程池采用的数据结构是一个数组存放的,每一个线程需要设置一个任务队列,显然任务队列使用的是一个阻塞队列,这里实际采用的是 LinkedBlockQueue ,然后回想一下在 jdk 中的线程池是不是还有一个比较重要的参数就是线程工厂,对的!这里也有这个东西,他是需要我们手动传入的,但是如果不传则会使用一个默认的线程工厂,里面有一个 newThread 方法,这个方法实现基本和 jdk 中的实现一模一样,就是创建一个级别为 5 的非 Daemon 线程。对这就是我们在创建一个线程池时候完成的全部工作!
好现在来具体说一下,我们每次创建的是 NioEventLoopGroup 但是他又继承了 n 个类才实现了线程池,也就是线程池的祖先是 ScheduledExecutorService 是 jdk 中的线程池的一个接口,其中里面最重要的数据结构就是一个 children 数组,用来装线程的。
然后具体的线程他也是进行了封装的,也就是我们常看到的 NioEventLoop 。这个类里面有两个比较重要的结构:taskQueue 和 thread 。很明显这个非常类似 jdk 中的线程池。
2. NioEventLoopGroup 线程池分析首先要创建线程池,传入的线程数为 0,他是一直在调用 this() 最后追溯到 super(nThreads,threadFactory,selectorProvider) 也就是使用了 MultithreadEventLoopGroup 的构造方法,在这一步确定了当传入的线程数为 0 时应该设置的线程数为 CPU 核心的两倍。然后再次上调,调用了 MultithreadEventExecutorGroup 的构造方法,在这里才是真正的开始了线程池的初始化。
首先设置了线程池工厂,然后初始化 chooser ,接着创建 n 个线程放到 children 数组中,最后设置线程中断的监听事件。
/** * 这个方法流程: * 1、设置了默认的线程工厂 * 2、初始化 chooser * 3、创建nTreads个NioEventLoop对象保存在children数组中 * 4、添加中断的监听事件 * @param nThreads * @param threadFactory * @param args */ protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } // 默认使用线程工厂是 DefaultThreadFactory if (threadFactory == null) { threadFactory = newDefaultThreadFactory(); } children = new SingleThreadEventExecutor[nThreads]; // 二的平方的实现是看 n&-n==n //根据线程个数是否为2的幂次方,采用不同策略初始化chooser if (isPowerOfTwo(children.length)) { chooser = new PowerOfTwoEventExecutorChooser(); } else { chooser = new GenericEventExecutorChooser(); } //产生nTreads个NioEventLoop对象保存在children数组中 for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(threadFactory, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { // 没成功,把已有的线程优雅关闭 if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } // 没有完全关闭的线程让它一直等待 for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { Thread.currentThread().interrupt(); break; } } } } } // 对每一个 children 添加中断线程时候的监听事件,就是将 terminatedChildren 自增 // 判断是否到达线程总数,是则更新 terminationFuture final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } }