【线程池】Java 线程池 ThreadPoolExecutor 类源码介绍
文章目录
- 前言
- 线程池是什么
- 线程池解决了哪些问题
- 本文主要讲述什么
- 感谢读者
- 线程池 UML 类图
- ThreadPoolExecutor 内部设计
- 核心参数
- 内部类
- 任务队列
- 拒绝策略
- ThreadPoolExecutor 源码
- 线程池生命周期
- 线程池构造函数
- execute() 【提交任务】
- addWorker() 方法 【添加工作线程并启动】
- 了解 retry:
- addWordker() 方法讲述
- Worker 类
- runWorker 【执行任务】
- getTask 【获取任务】
- shutdown() 【停止所有线程】
- shutdownNow() 【停止所有任务,中断执行的任务】
- tryTerminate() 【根据线程池状态进行判断是否结束线程池】
- 总结
- 参考与感谢
前言
线程池是什么
随着现在计算机的飞速发展,现在的计算机基本都是多核 CPU 的了。在目前的系统中创建和销毁线程都是十分昂贵的,为了不频繁的创建和销毁,以及能够更好的管理线程,就出现了池化的思想。
线程池解决了哪些问题
线程池的出现,解决了系统中频繁创建和销毁线程的问题。从系统的角度看还解决了一些问题:
1、降低了资源损耗:线程池中会有一些线程,利用这些线程达到重复使用线程执行任务的作用,避免了频繁的线程创建和销毁,这样就大大降低了系统的资源损耗;
2、提高了线程的管理手段:通过监控线程池中线程的数量和状态,来进行调优和控制;
3、提高的响应速度:当一个任务给到线程池后,一旦有空闲的线程会立即执行该任务;
本文主要讲述什么
本文主要讲述线程池主要实现类 ThreadPoolExecutor 类的源码,让大家能够更加清楚明白 ThreadPoolExecutor 线程池内部的核心设计与实现,以及内部机制有哪些。
感谢读者
读者同学们如果发现了哪些不对的地方请评论或私信我,我会及时更正!非常感谢!!!
线程池 UML 类图
- Executor 接口:这是线程池顶级的接口,定义了一个 execute(Runnable command) 方法,用于提交任务。
- ExecutorService 接口:继承自 Executor 接口,提供了更多管理线程生命周期的方法,如 shutdown()、shutdownNow()、submit()、invokeAll() 等。主要的作用是扩充执行任务的能力和提供了管控线程池的方法。
- AbstractExecutorService 抽象类:这是 ExecutorService 的一个抽象实现类,提供了一些 ExecutorService 接口的默认实现,减少了实现 ExecutorService 接口的类的工作量。让下层实现类只需要关注执行任务的方法即可。
- ThreadPoolExecutor 类:ThreadPoolExecutor 是线程池实现类。继承 AbstractExecutorService,主要用于维护线程池的生命周期、管理线程和任务。它提供了多种构造方法,允许我们对线程池的行为进行详细配置,包括核心线程数、最大线程数、线程空闲时间、任务队列等。
ThreadPoolExecutor 内部设计
核心参数
参数名称 | 参数解释 |
---|---|
corePoolSize | (必填)核心线程数,即线程池一直存活的最线程数量。但是将allowCoreThreadTimeOut参数设置为true后,核心线程处于空闲一段时间以上,也会被回收。 |
maximumPoolSize | (必填)线程池中最大线程数,当核心线程都忙起来并且任务队列满了以后,就开始创建新线程,知道创建的数量到达设置的 maximumPoolSize 数。 |
keepAliveTime | (必填)线程空闲时间,即当线程数大于核心线程数时,非核心线程的空闲时间超过这个时间后,就会被回收。将allowCoreThreadTimeOut参数设置为true后,核心线程也会被回收。 |
unit | (必填)keepAliveTime 的时间单位。有:TimeUnit.DAYS(天)、TimeUnit.HOURS(小时)、TimeUnit.MINUTES(分钟)、TimeUnit.SECONDS(秒)、TimeUnit.MILLISECONDS(毫秒)、TimeUnit.MICROSECONDS(微秒)、TimeUnit.NANOSECONDS(纳秒)。 |
workQueue | (必填)任务队列,用于保存等待执行的任务。 |
threadFactory | 线程工厂,用于指定创建新线程的方式。 |
handler | 拒绝策略,当任务过多而无法处理时,采取的处理策略。 |
内部类
在 ThreadPoolExecutor 中有五个核心的内部类,分别是 AbortPolicy、CallerRunsPolicy、DiscardOldestPolicy、DiscardPolicy 和 Worker。总的来说其实就两类:拒绝策略(Policy)和工作线程类(Worker)。
拒绝策略内部类(Policy)是当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,该如何处理呢?这里的拒绝策略,就是解决这个问题的。拒绝策略表示当任务队列满了且线程数也达到最大了,这时候再新加任务,线程池已经无法承受了,这些新来的任务应该按什么逻辑来处理。
工作线程内部类(Worker)是每一个 Worker 类就会绑定一个线程(Worker 类有一个成员属性持有一个线程对象),可以将 Worker 理解为线程池中执行任务的线程。但是实际上它并不仅仅是一个线程,Worker 里面还会有一些统计信息,存储一些相关的数据。
任务队列
- SynchronousQueue:SynchronousQueue 是一个没有容量的队列(不知道能不能称为一个队列)。它是 BlockingQueue 的一个实现类,与其他实现 BlockingQueue 的实现类对比,它没有容量,更轻量级;入队和出队要配对,也就是说当你入队一个元素后,必须先出队才能入队,否则插入的线程会一直等待。
- LinkedBlockingQueue:LinkedBlockingQueue 是一个支持并发操作的有界阻塞队列,底层数据结构是由一个单链表维护。当使用它的时候,不设置长度限制,则默认 Integer.MAX_VALUE 做为最大容量。
- ArrayBlockingQueue:ArrayBlockingQueue 也是一个支持并发操作的有界阻塞队列,与 LinkedBlockingQueue 不同的是,ArrayBlockingQueue 底层数据结构是由一个环形数组数据结构维护的。同样是当使用它的时候,不设置长度限制,则默认 Integer.MAX_VALUE 做为最大容量。
另外,还支持另外 4 种队列:
- PriorityBlockingQueue:PriorityBlockingQueue 是一个支持优先级处理操作的队列,你可以按照某个规则自定义这个优先级,以保证优先级高的元素放在队列的前面,进行出队的时候能够先出优先级高的元素。
- DelayQueue:DelayQueue 是一个延迟队列,队列中每一个元素都会有一个自己的过期时间,每当使用出队操作的时候,只有过期的元素才会被出队,没有过期的依然会留在队列中。
- LinkedBlockingDeque:LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列,可以头部和尾部进行操作。
- LinkedTransferQueue:LinkedTransferQueue 由链表结构组成的无界阻塞队列。它的设计比较特别,是一种预约模式的设计。当出队时如果队列中有数据则直接取走,没有的话会在队列中占一个位子,这个位子的元素为 null,当有其他线程入队时,则会通知出队的这个线程,告诉它你可以拿走这个元素了。
拒绝策略
当添加任务时,如果线程池中的容量满了以后,线程池会做哪些策略?下面是线程池的一些策略:
- AbortPolicy(默认):AbortPolicy 策略会将新的任务丢弃并抛出 RejectedExecutionException 异常。
- CallerRunsPolicy:CallerRunsPolicy 策略是直接运行这个任务的run方法,但并非是由线程池的线程处理,而是交由任务的调用线程处理。
- DiscardPolicy:DiscardPolicy 策略是直接丢弃任务,不抛出任何异常。
- DiscardOldestPolicy:DiscardOldestPolicy 策略是将当前处于等待队列列头的等待任务强行取出,然后再试图将当前被拒绝的任务提交到线程池执行。
ThreadPoolExecutor 源码
线程池生命周期
// 管理线程池的状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 表示低 29 位的值,用于表示线程池中工作线程的数量
private static final int COUNT_BITS = Integer.SIZE - 3;
// 用于表示线程池中工作线程的数量的最大值,等于 2^29-1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;// 线程池的五种状态,高 3 位表示,下面的状态值依次增高,是根据他们状态流程的顺序依次增高的
// RUNNING 状态
private static final int RUNNING = -1 << COUNT_BITS;
// SHUTDOWN 状态
private static final int SHUTDOWN = 0 << COUNT_BITS;
// STOP 状态
private static final int STOP = 1 << COUNT_BITS;
// TIDYING 状态
private static final int TIDYING = 2 << COUNT_BITS;
// TERMINATED 状态
private static final int TERMINATED = 3 << COUNT_BITS;// 通过与运算,计算返回线程池的状态,将高3位的值保留,低 29 位的值置为 0
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 通过与运算,计算返回线程池的状态,将高 3 位的值置为 0,低 29 位的值保留
private static int workerCountOf(int c) { return c & CAPACITY; }
// 通过或运算,计算返回线程池的状态,将高 3 位和低 29 位的值都保留,合并为 ctl 并返回
private static int ctlOf(int rs, int wc) { return rs | wc; }// runStateLessThan 方法用于判断线程池的状态是否小于某个状态
private static boolean runStateLessThan(int c, int s) {return c < s;
}// runStateAtLeast 方法用于判断线程池的状态是否大于等于某个状态
private static boolean runStateAtLeast(int c, int s) {return c >= s;
}// isRunning 方法用于判断线程池的状态是否是 RUNNING 状态
private static boolean isRunning(int c) {return c < SHUTDOWN;
}
上面这些源码主要是写了当前线程池的状态、线程池的五种状态值定义、计算线程池的状态和判断线程池的状态。
这里再补充一下线程池的五种状态:
- RUNNING 状态:线程池创建后,初始化时会将线程池的状态设置为 RUNNING,表示可接受新任务,并且执行队列中的任务;
- SHUTDOWN 状态:当调用了 shutdown() 方法,则线程池处于 SHUTDOWN 状态,表示不接受新任务,但会执行队列中的任务;
- STOP 状态:当调用了 shutdownNow() 方法,则线程池处于 STOP 状态,表示不接受新任务,并且不再执行队列中的任务,并且中断正在执行的任务;
- TIDYING 状态:所有任务已经中止,且工作线程数量为 0,最后变迁到这个状态的线程将要执行 terminated() 钩子方法,只会有一个线程执行这个方法;
- TERMINATED 状态:中止状态,已经执行完 terminated() 钩子方法;
线程池状态执行流程:
线程池构造函数
当我们创建线程池时,不管是通过 Executor 工具类还是手动创建线程池,最终都会调用的下面这个构造方法来实现的。
/*** 构造方法,创建线程池。* * @param corePoolSize 线程池的核心线程数量* @param maximumPoolSize 线程池的最大线程数* @param keepAliveTime 当线程数大于核心线程数时,多余的空闲线程存活的最长时间* @param unit 存活时间的时间单位* @param workQueue 任务队列,用来储存等待执行任务的队列* @param threadFactory 线程工厂,用来创建线程,一般默认即可* @param handler 拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务*/
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory, RejectedExecutionHandler handler) {if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;
}
这里简单介绍一下线程池的内部结构设计:
在线程池中,如果执行的线程数超过了核心线程数,那么这些多余线程的最长存活时间,不会超过 keepAliveTime 参数。
execute() 【提交任务】
execute() 方法的主要目的就是将任务执行起来。
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 1. 如果工作线程数量小于核心数量if (workerCountOf(c) < corePoolSize) {// 添加一个工作线程(核心) 并将该任务作为该工作线程的第一个任务,创建线程完成后直接返回if (addWorker(command, true))return;c = ctl.get();}// 2. 如果达到了核心数量且线程池是运行状态,则将任务加入队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 再次检查线程池状态,如果不是运行状态,就移除任务并执行拒绝策略if (!isRunning(recheck) && remove(command))reject(command);// 容错检查工作线程数量是否为0,如果为0就创建一个,此时就不用为工作线程绑定任务了,因为任务已经加入到队列中了else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 3. 任务入队列失败(例如队列满了),尝试创建非核心工作线程,将该任务和非核心工作线程绑定在一起else if (!addWorker(command, false))// 尝试创建非核心工作线程失败,执行拒绝策略reject(command);
}
上面这些源码和注释,主要讲述 execute() 方法是怎么做的校验和做了哪些步骤。用流程图表达:
addWorker() 方法 【添加工作线程并启动】
了解 retry:
在了解 addWorker() 方法之前,先了解一下 retry:
下面是一个示例
public static void main(String[] args) {int i = 0;// todo retry: 标记在这里retry:for (;;) {i++;System.out.println("i=" + i);int j = 0;for (;;) {System.out.println("j=" + j);j++;if (j == 3) {break retry;}}}
}
在执行完下面代码后,通过控制台可以看到下面这个输出
i=1
j=0
j=1
j=2
接下来把示例代码改一下
public static void main(String[] args) {int i = 0;for (;;) {i++;System.out.println("i=" + i);int j = 0;// todo retry: 标记在这里retry:for (;;) {System.out.println("j=" + j);j++;if (j == 3) {break retry;}}}
}
输出的内容如下
0
j=1
j=2
i=232781
j=0
j=1
j=2
i=232782
j=0
j=1
j=2
i=232783
...
通过两个示例,能够发现,retry 相当于一个标记,只用在循环里面,break 的时候会到 retry 标记处。如果 retry 没有在循环(for,while)里面,在执行到 retry 时,就会跳出整个循环。如果 retry 在循环里面,可以理解为跳到了关键字处执行,不管几层循环。continue理解也是一样。
需要注意的是: retry: 需要放在for,whlie,do…while的前面声明,变量只跟在 break 和 continue 后面。
addWordker() 方法讲述
addWordker() 方法主要的目的是创建一个线程,然后启动执行,期间也会判断线程池状态和工作线程数量等各种检测。
/*** addWordker() 方法主要做的事情是创建一个线程,然后启动执行,期间也会判断线程池状态和工作线程数量等各种检测。** @param firstTask 创建线程后执行的任务* @param core 该参数决定了线程池容量的约束条件,即当前线程数量以何值为极限值。参数为 true 则使用 corePollSize 作为约束值,否则使用 maximumPoolSize。* @return 是否创建线程并启动成功*/
private boolean addWorker(Runnable firstTask, boolean core) {// 外层循环主要是检查线程池状态retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 线程池状态检查// 1、线程池是非 RUNNING 状态// 2、线程池是 SHUTDOWN 状态 并且 传入的任务是 null 并且 workQueue 不等于空// 两种情况都会返回 falseif (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))return false;// 内层循环:线程池添加核心线程并返回是否添加成功的结果for (;;) {// 获取当前线程数int wc = workerCountOf(c);// 判断是否饱和容量了,如果已经达到最大线程数则不能再新建线程,直接返回 falseif (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))return false;// 完成了上面的判断后,说明现在线程池可以创建新线程,则通过 CAS 将线程数量加 1,因为后面要创建线程了,并跳出外层循环if (compareAndIncrementWorkerCount(c))break retry;// 如果上面的 compareAndIncrementWorkerCount(c) 方法返回 false,则说明有其他线程在操作线程池的线程数量,所以需要重新获取 ctlc = ctl.get();// 如果当前的运行状态已经和最开始获取的状态不一样了,则重新进入外层循环if (runStateOf(c) != rs)continue retry;}}// 如果上面的条件满足,则会把工作线程数量加1,然后执行下面创建线程的动作// 标记是否启动了工作线程boolean workerStarted = false;// 标记是否成功添加了工作线程boolean workerAdded = false;// 要创建的 Worker 对象Worker w = null;try {// 创建工作线程// 增加一个 worker 这个 firstTask 就是一个线程(任务线程,每一个 Worker 都和一个任务线程绑定)w = new Worker(firstTask);// 这个是绑定在 Worker 上的工作线程,并不是任务线程,工作线程是用来执行任务的final Thread t = w.thread;// 判断 t 是否为 nullif (t != null) {// 获取线程池的锁final ReentrantLock mainLock = this.mainLock;// 在读取线程池状态的时候就应该上锁,防止有并发操作破坏程序的一致性,上下不一致mainLock.lock();try {// 锁定后并重新检查线程池的状态,查看是否存在线程工厂的失败或者锁定前的关闭// 获取线程池运行状态int rs = runStateOf(ctl.get());// 检查线程池状态// rs < SHUTDOWN表示是 RUNNING 状态;// 如果 rs 是 RUNNING 状态或者 rs 是 SHUTDOWN 状态并且 firstTask 为 null,向线程池中添加线程。// 因为在 SHUTDOWN 时不会在添加新的任务,但还是会执行 workQueue 中的任务if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) throw new IllegalThreadStateException();// 添加到工作线程队列。 workers 是线程池中存放工作线程的集合// 增加 work 每一个线程池都持有一个 workers 集合,里面存储着线程池的所有 worker,也就是所有线程workers.add(w);// 还在池子中的线程数量(只能在mainLock中使用)int s = workers.size();// 不能超过线程池的最大线程数量if (s > largestPoolSize)largestPoolSize = s;// 标记线程添加成功workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {// 线程添加成功之后启动线程// 启动绑定在 worker上的线程。启动了之后该工作线程就会开始从任务队列中拿任务去执行了t.start();// 标记工作线程启动成功workerStarted = true;}}} finally {// 线程启动失败,执行失败方法(线程数量减1,执行tryTerminate()方法等)if (! workerStarted)addWorkerFailed(w);}// 返回新创建的工作线程是否启动成功return workerStarted;
}
在 addWorker 方法中,如果添加 Worker 并且启动线程失败,则会做失败后的处理:
private void addWorkerFailed(Worker w) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 移除添加失败的 workerif (w != null)workers.remove(w);// 减少 worker 数量decrementWorkerCount();// 尝试终止线程池tryTerminate();} finally {mainLock.unlock();
}
通过上面的源码,可以知道所有的线程都是放在 Worker 这个类中的,到目前为止,任务还并没有执行, 真正的执行是在 Worker 内部类中执行的,在下面会聊到 Worker 内部类中是怎么执行任务的,注意一下这里的 t.start() 这个语句,启动时会调用 Worker 类中的run方法,Worker 本身实现了 Runnable 接口,所以一个 Worker 类型的对象也是一个线程。现在先来总结创建一个线程执行工作任务 addWorker() 这个方法。
流程图表示:
Worker 类
通过上面的分析,就可以知道每一个任务都会放在 Worker 类中,由 Worker 类中的线程去最终执行任务。
下面就来看一下 Worker 类的源码:
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable
{private static final long serialVersionUID = 6138294804551838833L;// 执行任务的线程final Thread thread;// 要被执行的任务Runnable firstTask;// 记录线程完成的任务数volatile long completedTasks;// worker 类的构造方法// 传入一个需要执行的任务,并且通过 getThreadFactory() 创建一个线程来执行任务Worker(Runnable firstTask) {// 设置 worker 线程状态为-1,表示禁止线程中断// state:锁状态,-1为初始值,0为unlock状态,1为lock状态setState(-1); // 在调用runWorker前,禁止中断this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}// 执行任务public void run() {runWorker(this);}// 是否锁住了任务protected boolean isHeldExclusively() {return getState() != 0;}// 尝试获取锁的方法protected boolean tryAcquire(int unused) {// 尝试修改线程状态if (compareAndSetState(0, 1)) {// 设置 exclusiveOwnerThread 为当前线程setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}// 尝试释放锁protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}// 加锁public void lock() { acquire(1); }// 尝试获取锁public boolean tryLock() { return tryAcquire(1); }// 解锁public void unlock() { release(1); }// 是否锁住public boolean isLocked() { return isHeldExclusively(); }// 线程启动后,进行线程中断时执行方法void interruptIfStarted() {Thread t;// worker中状态 >= 0 且 线程不为空 且 当前线程未中断if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {// 中断线程t.interrupt();} catch (SecurityException ignore) {}}}
}
runWorker 【执行任务】
通过 Worker 类中的源码可以得知,在类中最终是调用了 runWorker 方法,执行的任务,那么接下来就来看一下 runWorker 方法。
final void runWorker(Worker w) {// 获取当前线程Thread wt = Thread.currentThread();// 获取要执行的任务Runnable task = w.firstTask;w.firstTask = null;// 解锁,设置 worker 状态 state 为 0,表示允许线程中断w.unlock(); // allow interrupts// 标识线程退出原因,true 代表线程异常退出,false 代表线程正常退出boolean completedAbruptly = true;try {// 当前任务是为空 或 从任务队列获取的任务为空 则停止循环// getTask() 方法从任务队列获取一个任务while (task != null || (task = getTask()) != null) {// 加锁处理并发问题,防止 shutdown() 时终止正在执行的 workerw.lock();// 如果线程池状态 >= STOP(即 STOP 或 TERMINATED) 并且当前线程未被中断if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())// 中断当前线程wt.interrupt();try {// 调用自定义的任务启动前的方法beforeExecute(wt, task);Throwable thrown = null;try {// 执行任务task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 调用自定义的任务启动后的方法afterExecute(task, thrown);}} finally {// 任务对象设置成 nulltask = null;// 完成的工作数 +1w.completedTasks++;// 解锁w.unlock();}}// 标识线程为正常退出completedAbruptly = false;} finally {// 处理 worker 线程退出,主要是用来销毁空闲线程,控制线程池中线程数量的大小变化processWorkerExit(w, completedAbruptly);}
}
通过上面的源码可以得知在 runWorker 方法内部的主要流程主要如下:
- 首先解锁 worker,允许线程中断;
- 通过 while 循环判断当前任务是否为空,若为空则调用 getTask() 方法从阻塞队列中获取待处理的任务;
- 获取到任务后工作线程直接加锁,同时根据线程池状态判断是否中断当前线程;
- 执行任务,在任务执行前后执行自定义扩展方法;
- 重复第 2 步的逻辑,直到阻塞队列中的任务全部执行完毕,最后调用 processWorkerExit() 方法处理 worker 线程退出、销毁空闲线程和控制线程池中线程数量的大小变化;
下面使用比较直观详细的流程图来表示出来:
getTask 【获取任务】
getTask() 方法的主要作用是从任务队列中获取任务。
private Runnable getTask() {// 标记上一次从阻塞队列获取任务是否超时boolean timedOut = false; // Did the last poll() time out?// 自旋for (;;) {int c = ctl.get();// 获取当前线程池状态int rs = runStateOf(c);// 判断是否需获取任务// 若线程池状态 >= STOP 或 (线程池状态为 SHUTDOWN 且 阻塞队列为空),则不需要再处理任务if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {// 工作线程数-1decrementWorkerCount();// 返回return null;}// 获取当前工作线程数int wc = workerCountOf(c);// timed 标记用于判断是否需进行超时控制// allowCoreThreadTimeOut 是否运行核心线程超时销毁(默认为false,核心线程不允许超时销毁)// wc > corePoolSize 当前工作线程数大于核心线程数boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;/*** 当前工作线程数 > maximumPoolSize* 或* timed && timedOut 为 true** 并且** 工作线程数大于1* 或* 阻塞队列为空*/if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {// 使用 CAS 对工作线程数 -1if (compareAndDecrementWorkerCount(c))return null;continue;}// 开始从阻塞队列获取任务try {// timed 为 true 代表核心线程允许超时销毁 或 当前工作线程数大于核心线程数(存在非核心线程)// timed = true:超时等待获取任务 poll(),超时未获取到则返回 null// timed = false:阻塞获取任务 take(),一直等着直到获取到任务Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();// 判断是否获取到任务if (r != null)// 获取到直接返回return r;// 未获取到,代表等待超时也没有拿到任务,设置timedOut值为truetimedOut = true;} catch (InterruptedException retry) {// 即使异常也循环重试timedOut = false;}}
}
通过上面的 getTask() 方法原发分析,大致的主要的过程就是:
- 校验线程池状态和工作线程数是否合规;
- 根据条件选择从任务队列中超时等待获取,还是从任务队列中阻塞获取;
下面用一个直观的流程图来表示:
shutdown() 【停止所有线程】
shutdown() 方法会把状态修改成 SHUTDOWN。并且这里肯定会成功,因为修改时是通过 自旋 的方式,不成功不退出。
public void shutdown() {// 获取线程池锁final ReentrantLock mainLock = this.mainLock;// 加锁,修改线程池状态mainLock.lock();try {// 检查是否有权限关闭线程池checkShutdownAccess();// 修改状态为SHUTDOWN,自旋操作,只有状态修改成功才会返回advanceRunState(SHUTDOWN);// 标记空闲线程为中断状态interruptIdleWorkers();onShutdown();} finally {mainLock.unlock();}tryTerminate();
}private void advanceRunState(int targetState) {// 自旋修改线程池状态for (;;) {// 获取ctlint c = ctl.get();// 如果状态大于 SHUTDOWN(因为只有 RUNNING 状态才能转化为 SHUTDOWN 状态,而只有 RUNNING 状态是小于 SHUTDOWN 状态的),// 或者修改为 SHUTDOWN 成功了,才会 break 跳出自旋if (runStateAtLeast(c, targetState) ||ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))break;}
}
shutdownNow() 【停止所有任务,中断执行的任务】
shutdownNow() 方法会把线程池的状态改成 STOP,线程池不接受新任务,且不再执行队列中的任务,且中断正在执行的任务,同时标记所有线程为中断状态。
public List<Runnable> shutdownNow() {List<Runnable> tasks;// 获取线程池的锁final ReentrantLock mainLock = this.mainLock;// 加锁,进行状态修改操作mainLock.lock();try {// 检查是否有权限关闭线程池checkShutdownAccess();// 修改为 STOP 状态advanceRunState(STOP);// 标记所有线程为中断状态interruptWorkers();// 将队列中的任务全部移除,并返回tasks = drainQueue();} finally {mainLock.unlock();}// 尝试终止线程池tryTerminate();// 返回队列中的任务return tasks;
}private List<Runnable> drainQueue() {BlockingQueue<Runnable> q = workQueue;ArrayList<Runnable> taskList = new ArrayList<Runnable>();// 将队列中的元素添加到一个集合中去,但是如果队列是 DelayQueue 或者 其他类型的 Queue 时使用 drainTo 就会失败,所以就需要逐个删除q.drainTo(taskList);if (!q.isEmpty()) {for (Runnable r : q.toArray(new Runnable[0])) {if (q.remove(r))taskList.add(r);}}return taskList;
}
tryTerminate() 【根据线程池状态进行判断是否结束线程池】
所有任务已经中止,且工作线程数量为0,就会进入这个状态。最后变迁到这个状态的线程将要执行 terminated() 钩子方法,只会有一个线程执行这个方法。
final void tryTerminate() {// 自旋修改状态for (;;) {int c = ctl.get();// 下面几种情况不会执行后续代码// 1. 运行中// 2. 状态的值比 TIDYING 还大,也就是 TERMINATED// 3. SHUTDOWN 状态且任务队列不为空if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;// 工作线程数量不为 0,也不会执行后续代码if (workerCountOf(c) != 0) {// 尝试中断空闲的线程interruptIdleWorkers(ONLY_ONE);return;}// 获取线程池的锁final ReentrantLock mainLock = this.mainLock;// 加锁mainLock.lock();try {// CAS 修改状态为 TIDYING 状态if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {// 更新成功,执行 terminated 钩子方法terminated();} finally {// 确保 terminated 钩子方法执行完毕后,再次修改状态为 TERMINATED(最终的终止状态),线程池彻底关闭// 强制更新状态为 TERMINATED,这里不需要 CAS 了ctl.set(ctlOf(TERMINATED, 0));// 通知所有等待线程termination.signalAll();}return;}} finally {// 解锁mainLock.unlock();}}
}
更新成 TIDYING 或者 TERMINATED 状态的代码都在 tryTerminate() 方法中,而 tryTerminate() 方法在很多个地方被调用,比如 shutdown()、shutdownNow()、线程退出时,所以说几乎每个线程最后消亡的时候都会调用 tryTerminate() 方法,但最后只会有一个线程真正执行到修改状态为 TIDYING 的地方。
修改状态为 TIDYING 后执行 terminated() 方法,最后修改状态为 TERMINATED,标志着线程池真正消亡了。
总结
最后,使用流程图来呈现一下 ThreadPoolExecutor 运行机制的图:
参考与感谢
- 美团线程池文章
- Agodival 博主的线程池文章
- 云深i不知处 博主的线程池文章
非常感谢以上的博主!!!