深入 ThreadPoolExecutor 源码 类结构
这里主要要说的是 ThreadPoolExecutor
类
线程池状态 打开源码映入眼帘的就是这几个字段和方法,对应的就是线程池的一些运行状态和相关方法
private final AtomicInteger ctl = new AtomicInteger (ctlOf(RUNNING, 0 ));private static final int COUNT_BITS = Integer.SIZE - 3 ;private static final int CAPACITY = (1 << COUNT_BITS) - 1 ;private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;private static int runStateOf (int c) { return c & ~CAPACITY; }private static int workerCountOf (int c) { return c & CAPACITY; }private static int ctlOf (int rs, int wc) { return rs | wc; }
状态转换过程
💡 RUNNING :能接受新提交的任务,并且也能处理阻塞队列中的任务;
💡 SHUTDOWN :关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING
状态时,调用 shutdown()
方法会使线程池进入到该状态。(finalize()
方法在执行过程中也会调用 shutdown() 方法进入该状态)
💡 STOP :不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING
或 SHUTDOWN
状态时,调用 shutdownNow()
方法会使线程池进入到该状态
💡 TIDYING :如果所有的任务都已终止了,workerCount (有效线程数) 为 0,线程池进入该状态后会调用 terminated() 方法进入 TERMINATED 状态
💡 TERMINATED :在 terminated() 方法执行完后进入该状态,默认 terminated() 方法中什么也没有做。
进入TERMINATED
的条件如下:
线程池不是 RUNNING 状态;
线程池状态不是 TIDYING 状态或 TERMINATED 状态;
如果线程池状态是 SHUTDOWN 并且 workerQueue 为空;
workerCount 为 0;
设置 TIDYING 状态成功。
成员变量 再往下,就会看见一些很重要的成员变量
private final BlockingQueue<Runnable> workQueue; private final ReentrantLock mainLock = new ReentrantLock ();private final HashSet<Worker> workers = new HashSet <Worker>();private final Condition termination = mainLock.newCondition(); private int largestPoolSize;private long completedTaskCount; private volatile ThreadFactory threadFactory; private volatile RejectedExecutionHandler handler; private volatile long keepAliveTime; private volatile boolean allowCoreThreadTimeOut; private volatile int corePoolSize; private volatile int maximumPoolSize; private static final RejectedExecutionHandler defaultHandler = new AbortPolicy ();
execute() 源码分析
public void execute (Runnable command) { if (command == null ) throw new NullPointerException (); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
这里要理解 addWorker 的第二个参数,true 则代表当前线程池的上界 仍然是 corePoolSize,后面的 addWorker 会根据上界来判断是否增加线程,false 则代表上界 是 maximumPoolSize,这一点在后面的分析中会看到。
Executor 大致执行流程
addWorker() addWorker()
的作用就是创建线程 (Worker) 并且添加到 Workers 集合中,然后启动线程
源码分析
private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null &&!workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker (firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException (); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
🔸 首先判断当前线程池的状态是否适合继续 addWorker,分析这里的 if 条件,RUNNING 不会 false ,STOP,TIDYING,TERMINATED 直接 false,SHUTDOWN 状态如果 firstTask 为空 阻塞队列中还有任务的时候不会 false,其他情况都 false。
🔸 获取当前的 workerCount 判断是否超过了当前的上界,这里就用到了第二个参数
🔸 然后利用 CAS 自旋增加 workerCount
🔸 创建 Worker 对象,获取 mainLock 并加锁,因为 workers 是 HashSet 并不是线程安全的
🔸 再次获取线程池转台并判断是否合法,合法就添加到 workers 中,然后在 finally 块中释放锁
🔸 根据前面的 workerAdded 判断是否启动线程
🔸 在最终的 finally 块中根据是否启动成功来决定是否回滚
addWorkerFailed() 启动失败,回滚之前的添加操作
private void addWorkerFailed (Worker w) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { if (w != null ) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }
Worker 类 封装了线程对象,线程池维护的就是这些 worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L ; final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1 ); this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); } public void run () { runWorker(this ); } protected boolean isHeldExclusively () { return getState() != 0 ; } protected boolean tryAcquire (int unused) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } protected boolean tryRelease (int unused) { setExclusiveOwnerThread(null ); setState(0 ); return true ; } public void lock () { acquire(1 ); } public boolean tryLock () { return tryAcquire(1 ); } public void unlock () { release(1 ); } public boolean isLocked () { return isHeldExclusively(); } void interruptIfStarted () { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
这里可以看到 Worker 继承了 AQS 并且实现了 Runnable 接口,然后借助 AQS 实现了一个独占的不可重入的锁
,其实这也是很巧妙的一点(这里我和博客上的理解有点出入)。
到这里大家肯定会有疑问,为什么这里要实现 AQS 然后实现一个锁?既然要又为什么要实现一个不可重入的,而不直接使用ReentrantLock
那不是更加方便么??除此之外还有一个小细节就是构造器里面为什么setState(-1)
这样不就获取不到锁了么??
其实这是为了后面shutdown
的时候interruptIdleWorkers
能判断出线程是否在工作,从而打断那些空闲的线程。如果使用可重入锁的话就无法通过tryLock()
来判断线程是否在工作。而setState(-1)
则是为了防止在任务没有开始前被打断
runWorker() 在上面AddWorker()
最后添加成功后会启动 Worker 线程,而在 worker 线程中 run 方法又会调用一个runWorker()
方法,这里就是具体执行任务的地方
final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error (x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }
🔸 首先获取了参数传递进来的 worker 携带的任务
🔸 然后执行了 w.unlock(),实际上这里就对应了上面 Worker 类构造器中的 setState(-1),正因为前面设置了 state 为 -1 所以在 unlock() 之前,获得锁的 CAS 操作肯定都会失败,当然这也是为了在任务启动前不会被打断,所以在这里 unlock() 就又将 state 设置为了 0 也就表示可以通过 CAS 获得锁了,也可以被 shutdown 打断了。
🔸 然后这个线程就会执行 worker 中的任务,如果 worker 中任务为空就会从阻塞队列中获取任务
🔸 获取到任务后进入循环先进行 lock() 操作,这就代表已经开始执行任务了,这个时候 shutdown 就无法发送中断信号中断这个线程执行 (注意这个 lock 并不会和其他的 worker 互斥,因为每个 Worker 都是新 new 出来的,完全不相关的,他们的 state 状态都是独立的)
🔸 runStateAtLeast(ctl.get(), STOP)
返回ctl.get() >= STOP
,判断线程池是否正在关闭如果是就打断该线程,如果不是需要确保线程不是 Interrupt 状态
If pool is stopping, ensure thread is interrupted; if not, ensure thread is not interrupted.
🔸beforeExecute()
和afterExecute()
都是空方法交给子类去实现的。
🔸 到 finally 块里面就代表这个工作线程已经快要结束了,processWorkerExit()
就是处理一些”后事”的
getTask() 这个方法就是用来从阻塞队列中获取任务的,如果 getTask()
返回 null,在runWorker()
里面接收到null
后就会跳出循环 进而执行finally
块里面的processWorkerExit()
。而这也就意味着这个线程执行结束
了,不会在执行任务了,剩下的就是等待 JVM 回收这个线程。
private Runnable getTask () { boolean timedOut = false ; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null ; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
🔸 获取线程池状态,如果线程池状态为 SHUTDOWN 并且任务队列没任务 或者 线程状态>=STOP 就通过 CAS 自旋减少线程数,然后返回 null
🔸 判断是否允许当前线程获取任务超时,如果允许核心线程超时就代表所有线程都会超时限制,又或者是当前线程数超过了核心线程数,也就是经过了扩容,所以核心线程之外的线程都是有超时限制的。
🔸 如果 ① wc 超过最大线程数 ②没超过最大线程数,但是超时了并且此时 wc>1(留一个处理任务)③没超过最大线程数,但是超时了并且阻塞队列为空,此时需要回收多余的线程
🔸 根据 timed 选取从阻塞队列中获取任务的方式,要么是限时获取的 poll 或者一直阻塞的 take,获取到了之后返回
processWorkerExit() 看名字就知道是一个收尾的方法,执行线程结束后的一些必要收尾工作。
private void processWorkerExit (Worker w, boolean completedAbruptly) { if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1 ; if (workerCountOf(c) >= min) return ; } addWorker(null , false ); } }
runStateLessThan
c < STOP 返回 true 就说明是SHUTDOWN
或者RUNNING
,到这里该工作线程(Worker)的生命周期就结束了。
工作线程的生命周期
tryTerminate() 在上面的处理收尾工作的processWorkerExit()
的时候中间调用了一个 tryTerminate()
方法
final void tryTerminate () { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return ; if (workerCountOf(c) != 0 ) { interruptIdleWorkers(ONLY_ONE); return ; } final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0 ))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0 )); termination.signalAll(); } return ; } } finally { mainLock.unlock(); } } }
这里最后的termination.signalAll()
实际上是唤醒的awaitTermination()
方法阻塞的线程
public boolean awaitTermination (long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { for (;;) { if (runStateAtLeast(ctl.get(), TERMINATED)) return true ; if (nanos <= 0 ) return false ; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } }
shutdown() public void shutdown () { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); }
这里关键的就是这个打断空闲线程的操作。
interruptIdleWorkers() 打断空闲的 worker,这里的空闲线程其实指的就是在阻塞队列上获取不到任务而阻塞的线程
经过上面的分析我们知道 Worker 会不断的从阻塞队列中去拿任务也就是 getTask()
方法,如果阻塞队列为空就会阻塞住 直到有任务提交到阻塞队列中,或者执行线程被中断。
这里我们是要 SHUTDOWN 那阻塞队列中肯定是不会再有任务提交,所以take()
会阻塞住,所以我们就只能通过打断执行线程的方式来打断take()
操作,否则会一直阻塞,线程池无法关闭。
private void interruptIdleWorkers (boolean onlyOne) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break ; } } finally { mainLock.unlock(); } }
🔸首先获取了 mainLock 保证了 workers 的线程安全避免产生并发修改异常。
🔸然后遍历 workers,打断那些没有被打断并且没有工作的线程,那这里怎么知道线程是不是在工作呢?
别忘了前面提到的 Worker 类借助 AQS 实现了一个不可重入
的lock
方法,而在 worker 执行任务的时候会执行 lock
加锁,所以在这里tryLock()
如果返回true
则说明 并没有在工作可以打断,反之如果正在工作tryLock()
不可重入,无法获取到自己持有的锁返回 false,所以线程肯定是在工作状态所以不应该打断,这些线程会在执行完任务后自行了断,因为线程池状态已经设置为SHUTDOWN
当然前提是这些任务是可终止的
🔸打断后释放tryLock()
获取到的锁
🔸如果只打断一个就直接 break,否则就继续下一轮循环
🔸释放 mianLock
瞎猜:这里可以通过 getState 判断线程状态但是有可能在执行任务的过中阻塞
shutdownNow() 和上面的 shutdown() 一样都是用来关闭线程池的但是看方法名就知道这个比较粗暴,shutdownNow 立刻马上关闭,一点情面都不给😂,虽然说是打断所有的线程但是毕竟使用的是Interrupt
,也许别人正在执行的线程根本就不会理你😂,所以在提交任务的时候要对任务进行正确的 interrupt 响应,或者确保线程不会一直阻塞否则线程池就无法正常关闭
public List<Runnable> shutdownNow () { List<Runnable> tasks; final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
interruptWorkers() private void interruptWorkers () { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } }
打断了所有的启动的线程,即使他们可能不会响应这个 Interrupted 信号,但是由于线程池状态已经变为了 STOP,所以他们也活不长了(当然前提是执行的任务是可以结束的),在下一次获取任务的时候就会直接 return 。
细节是魔鬼 在群里面看见的问题,为什么线程池要用 线程不安全的 HashSet
然后设置了一个 mainLock
控制并发,而不是直接使用线程安全的并发集合?
这一点其实在源码的注释中已经说的很清楚了,之前一直没有注意到
private final ReentrantLock mainLock = new ReentrantLock ();
一开始还挺抗拒的,感觉有的地方有的看不懂(英语渣渣留下眼泪)然后去 stackoverflower 上看了一下找到了一个答案 Using ReentrantLock in ThreadPoolExecutor to ensure thread-safe workers
大概总结一下就是两点
避免 “中断风暴” ,如果是用的显式锁那么如果有 10 个线程同时去执行 shutdown 方法,那么 10 个线程会排队去执行interruptIdleWorkers
,如果使用并发安全的队列的话,在 10 个线程同时去执行 shutdown 方法,那么肯定会同时去 interruptIdleWorkers
也就是所谓的中断风暴
中断风暴(维基百科) 中文几乎搜不到相关资料,其实就是字面意思,而太多的中断请求会的影响系统的整体性能,不得不说大师就是大师,细节之处则更能体现水平,我等也只能膜拜了
另一点就是为了方便统计线程池的一些信息比如 largestPoolSize
等,这点也好理解,使用并发的 Set 只能保这个 set 的并发安全,而对于其他的一些线程池的相关的信息统计起来就比较麻烦,可能又需要另外的加锁,所以索性就直接搞一个全局的锁,一举两得
ThreadPoolExecutor 使用 构造方法 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException (); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException (); this .acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler; }
其实从中也可以看出对各个参数的一些限制。
corePoolSize: 核心池的大小,这个参数与后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()
或者prestartCoreThread()
方法,从这 2 个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建 corePoolSize 个线程或者一个线程。默认情况下
,在创建了线程池后,线程池中的线程数为 0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到 corePoolSize 后,就会把到达的任务放到缓存队列当中
maximumPoolSize: 线程池最大线程数,它表示在线程池中最多允许创建多少个线程
keepAliveTime :表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于 corePoolSize 时
,keepAliveTime 才会起作用,直到线程池中的线程数不大于 corePoolSize:即当线程池中的线程数大于corePoolSize
时,如果一个线程空闲的时间达到 keepAliveTime,则会终止,直到线程池中的线程数不超过 corePoolSize,但是如果调用了allowCoreThreadTimeOut(boolean)
方法,在线程池中的线程数不大于 corePoolSize 时,keepAliveTime 参数也会起作用,直到线程池中的线程数为 0,这些内容其实在上面的深入源码中都有过分析。
unit: 参数 keepAliveTime 的时间单位
workQueue :一个阻塞队列,用来存储等待执行的任务,这个参数的选择会对线程池的运行过程产生重大影响
threadFactory :线程工厂,主要用来创建线程(根据传进来的 Runnable/Callable)
handler :表示当拒绝处理任务时的策略,有以下四种取值
ThreadPoolExecutor.AbortPolicy 丢弃任务并抛出 RejectedExecutionException 异常。
ThreadPoolExecutor.DiscardPolicy 也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy 由调用线程处理该任务
线程池的关闭 shutdown()
shutdownNow()
,上面已经分析过了,就不再过多介绍了。
工厂方法 上面的构造器中一共有 7 个参数,可见要构造一个线程池并非那么容易,所以 jdk 在Executors
类中为我们提供了一些工厂方法,可以直接构造一些特定的线程池。
newCachedThreadPool()
public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>()); }
可以看到core
为 0,最大值为Integer.MAX_VALUE
,任务队列使用的SynchronousQueue
,这个队列是一个很奇葩的阻塞队列,实际上它不是一个真正的队列,每个删除操作都要等待插入操作,反之每个插入操作也都要等待删除动作,只有两个都准备好的时候才不会阻塞,所以它内部不会为队列元素维护空间,也就是说并不会缓存任务,一旦提交了 (put) 任务,要么就由空闲线程去执行 (take),要么创建一条新线程去执行 (take)。
newFixedThreadPool()
public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>()); }
可以看到,最大值和 core 都是 nThread
,也就是最多nThread
个线程,阻塞队列采用 LinkedBlockQueue
newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor (1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>())); }
等价于newFixedThreadPool(1)
,但是这里的返回值经过了一层包装 返回的不再是ThreadPoolExecutor
也就是不会再有那些扩展的 monitor 方法
A wrapper class that exposes only the ExecutorService methods of an ExecutorService implementation.
类似的方法其实还有一些,像newWorkStealingPool
等,感兴趣可以自己去查一查。
其实这里阿里巴巴 Java 开发规范并不建议使用工厂方法创建线程
所以建议还是通过构造器的方式去创建线程,这样也更加灵活更加可控。
赞赏
感谢鼓励