Java并发编程:Java线程池核心ThreadPoolExecutor的使用和原理分析 (3)

3、如果任务成功加入workQueue,判断线程池是否是运行状态,不是的话先把任务移出工作队列,并调用reject方法,使用拒绝策略拒绝该任务。线程如果是非运行中,调用addWorker创建一个新线程。

4、如果放入workQueue失败 (队列已满),则调用addWorker创建线程执行任务,如果这时创建线程失败 (addWorker传进去的第二个参数值是false,说明这种情况是当前线程数不小于maximumPoolSize),就会调用reject(内部调用handler)拒绝接受任务。

整个执行流程用一张图片表示大致如下:

Java并发编程:Java线程池核心ThreadPoolExecutor的使用和原理分析

以上就是execute方法的大概逻辑,接下来看看addWorker的方法实现。

addWorker方法

源码如下:

private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); /**线程池状态不为SHUTDOWN时 * 判断队列或者任务是否为空,是的话返回false */. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); /* 这里可以看出core参数决定着活动线程数的大小比较对象 * core为true表示与 corePoolSize大小进行比较 * core为false表示与 maximumPoolSize大小进行比较 * 当前活动线程数大于比较对象就返回false */ if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 尝试增加workerCount,如果成功,则跳出第一个for循环 if (compareAndIncrementWorkerCount(c)) break retry; // 如果增加workerCount失败,则重新获取ctl的值 c = ctl.get(); // Re-read ctl // 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //创建一个worker对象w w = new Worker(firstTask); //实例化w的线程t final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // workers是一个HashSet,保存着任务的worker对象 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //启动线程 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }

从代码中可以看出,addWorker方法的主要工作是在线程池中创建一个新的线程并执行,其中firstTask参数指定的是新线程需要执行的第一个任务,core参数决定于活动线程数的比较对象是corePoolSize还是maximumPoolSize。根据传进来的参数首先对线程池和队列的状态进行判断,满足条件就新建一个Worker对象,并实例化该对象的线程,最后启动线程。

Worker类

根据addWorker源码中的逻辑,我们可以发现,线程池中的每一个线程其实都是对应的Worker对象在维护的,所以我们有必要对Worker类一探究竟,先看一下类的源码:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpxdgz.html