细说 Java 线程池
一、 Java 线程池使用
1.1、线程池创建

1.2、线程池执行流程
提交任务到线程池时,调用execute 方法如下图所示:

1.3、 线程池关闭
-  
shutdown()拒绝新任务,等待已提交任务(包括队列中的任务)执行完毕,不中断正在运行的任务。

 -  
shutdownNow()强制关闭:中断所有线程(包括运行中的任务),返回未执行的任务列表,慎用。

 
1.3.1、 优雅关闭
ExecutorService pool = Executors.newFixedThreadPool(5);// 1. 启动关闭流程,拒绝新任务
pool.shutdown(); try {// 2. 等待现有任务完成(超时时间根据业务设定)if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {// 3. 超时后强制终止List<Runnable> unfinishedTasks = pool.shutdownNow(); log.warn("强制关闭,未完成任务数: {}", unfinishedTasks.size());// 4. 再次等待线程响应中断if (!pool.awaitTermination(30, TimeUnit.SECONDS)) {log.error("线程池未能完全关闭");}}
} catch (InterruptedException e) {// 5. 处理中断异常,重新尝试强制关闭pool.shutdownNow();Thread.currentThread().interrupt(); 
}
 
1.4、线程池状态
线程有状态(详见Java线程状态详解),线程池也有状态。
1.4.1、状态概览
线程池状态如下所示:
 状态类型为int,RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED

- RUNNING:线程池创建后的状态,接收新任务,以及对已添加到队列中的任务进行处理。
 - SHUTDOWN:调用 
shutdown方法,线程池就会转换成 SHUTDOWN 状态,此时线程池不再接收新任务,继续处理任务队列中剩余的任务。 - STOP:调用 
shutdownNow方法,线程池就会转换成 STOP 状态,不接收新任务,不处理队列中的剩余任务,尝试中断正在处理的任务的线程。 - TIDYING:队列为空,线程已终止; 
- SHUTDOWN 状态下,所有任务(包括队列)均已完成,线程线程池会变为 TIDYING 状态;
 - STOP 状态下,队列任务均已导出,线程池会变为 TIDYING 状态。
 
 - TERMINATED:线程池彻底终止。线程池在 TIDYING 状态执行完 
terminated(详见下文)方法就会转变为 TERMINATED 状态。 
1.4.2、线程池状态存储
    private static final int COUNT_BITS = Integer.SIZE - 3; // 32 - 3 = 29 ,线程数量为2^29, 高3位表示线程池状态,最多可以8种。private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// Packing and unpacking ctlprivate static int runStateOf(int c)     { return c & ~COUNT_MASK; }private static int workerCountOf(int c)  { return c & COUNT_MASK; }private static int ctlOf(int rs, int wc) { return rs | wc; }
 
线程池状态(runStateOf)和线程池中线程数量(workerCountOf)一起存在了 ctl 成员变量中,通过高位掩码提取状态(runStateOf)、低位掩码提取线程数(workerCountOf)的位运算提高效率(相较于传统条件判断或锁操作)。
-  
原子性:通过单个原子变量(AtomicInteger)的 CAS 操作,确保线程池状态(runState)和线程数(workerCount)的修改是原子的,避免状态与数量不一致导致的逻辑错误。
举例:当线程池进入 SHUTDOWN 状态时,若状态和线程数分开维护,可能出现状态已更新但线程数意外增加的情况,导致错误地接受新任务。
 -  
内存占用优化: 1个
AtomicInteger(4Byte)替代2个变量,减少内存占用。 -  
竞争条件规避:关闭线程池时,需确保状态变为 TERMINATED 的同时线程数为0,通过
ctl的原子性操作可避免中间状态的不一致。 -  
减少锁开销:仅通过 一次 CAS 操作即可同时修改状态和线程数。
 
1.4.3、方法tryTerminate
 

-  
方法
tryTerminate主要是做线程池的一些收尾工作,转换线程池状态为TERMINATED,彻底终止线程池。关闭线程池的方法
shutdown()和shutdownNow()在方法结束前均会调用此方法。其它调用此方法的方法如下图所示:
 
二、 Java 线程池细节
2.1、线程池拒绝任务策略
当线程池中任务队列已满,并且线程数量达到了最大数量后,线程池将采取指定的拒绝策略来处理新提交的任务,具体策略如下。
2.1.1、DiscardPolicy
- 直接丢弃任务
 

2.1.2、AbortPolicy
- 丢弃任务后,抛出运行时异常;

AbortPolicy 是线程池默认拒绝策略

 
2.1.3、DiscardOldestPolicy
- 踢出队列最老的任务;
 - 再次提交当前任务;

 
2.1.4、CallerRunsPolicy
- 由提交任务的线程来执行任务;
 该策略使用需注意,若 caller 为主线程,且该任务耗时,则会阻塞主线程,后续通过主线程提交到线程池的任务也会相应被阻塞。系统可能直接卡死。
 
2.2、线程池 Worker
- 线程池内部使用HashSet来存储所有Worker;
 - 注意:注释说,需要配合锁(mainLock)来访问,这里了解即可,详见后文2.4小节。
 

2.2.1、线程池 Worker 继承了 AQS
如下图所示,Worker 通过继承 AQS(更多细节查阅自旋、CLH、AQS浅析),来实现一个非重入的独占锁,来确保执行任务期间不会被线程池的其它方法中断。
注释中给了我们一个可能会中断线程的方法
setCorePoolSize。

Worker 覆写 AQS 的方法如下图所示:

2.2.1.1、方法 setCorePoolSize
 
源码如下,该方法用于设置线程池的核心线程数量,当传入值小于当前线程池的核心线程数时,会通过方法interruptIdleWorkers()销毁空闲线程。

2.2.1.2、方法 interruptIdleWorkers()
 


 如上图所示,方法 interruptIdleWorkers() 就是负责中断线程的。
中断线程前会尝试加锁,正是Worker对AQS实现的锁。
接下来看下Worker这把锁发挥威力的地方:方法runWorker() 是用来执行任务的。
如下图所示,每次执行任务前加锁,执行完任务后释放锁。

2.2.2、线程池 Worker 实现了 Runnable
覆写了Runnable中的run()方法如下图示,正是调用了我们前文提到的runWorker方法。

2.2.2.1 Worker 在哪里被启动的
谁创建Worker ,谁负责启动Worker。
 是谁,到底是谁创建的,当然是我们在提交任务(execute)时通过方法addWorker创建的。


 至此,我们的Woker成功创建并启动起来干活了。
2.3、线程池队列
2.3.1、线程池队列获取任务
线程获取任务是通过getTask()方法,如下图所示。

-  
若队列有任务就获取任务后返回;
 -  
若队列为空,就阻塞线程;一直到线程被中断后,继续下次循环;
 -  
线程被谁中断呢?
- 前文提到的方法 
interruptIdleWorkers()中会设置线程中断。

 
- 方法
interruptWorkers()也会设置线程中断。


 
 - 前文提到的方法 
 -  
线程被中断(比如线程池调用
shutdownNow()或者shutdown())后,进入下一次循环,发现线程池需要关闭,所以就减少线程数量,返回null。runWorker方法中while (task != null || (task = getTask()) != null)获取到的任务为空,结束循环,线程结束。

 
2.4、线程池锁

在2.2小节一开始,我们提到过Worker的集合HashSet需要配合mainLock使用,接下来,看下为什么线程池内部要使用锁?
-  
避免中断风暴
interruptIdleWorkers方法内部是串行化(循环遍历)中断Worker的。 -  
维持线程池内部状态一致性,如成员
largestPoolSize用于统计线程池的内Worker曾经达到的最大数量,如果不加锁,该值的准确性无法保证 。- 更新
largestPoolSize

 - 读取
largestPoolSize

 
 - 更新
 -  
确保线程池的Workers集合稳定。
- 方法
addWorker中添加worker

 - 方法
addWorkerFailed中移除worker

如上所示,这两个方法需要加锁来实现,因为Workers集合HashSet不是线程安全的。 
当然,这里可以使用线程安全的集合替代。但是Workers集合还要配合之前提到的
largestPoolSize使用,不如使用一把锁直接解决来的更方便,更高效;这里的锁使用
ReentrantLock而非内置锁synchronized是因为:- 灵活性:
ReentrantLock支持可中断的锁获取、公平性策略等,更适合复杂的线程控制。 - 代码结构:显式锁的使用(配合
try-finally块)使得加锁/解锁的边界更清晰,便于维护。 
 - 方法
 
2.5、线程池钩子方法
每个任务执行前,我们可以定制两个方法:
beforeExecuteafterExecute

定制举例:
- 重新初始化ThreadLocal;
 - 数据统计;
 - 日志处理;
 

三、线程池工具类Executors
 
Executors 中常用的获取线程池的方法如下:
这些方法均没有对资源(线程数量或者队列长度)做合理限制,使用时会增加OOM的风险。
所以使用时需注意明确定义线程池的 7 个参数。

