当前位置: 首页 > news >正文

【线程池】Java 线程池 ThreadPoolExecutor 类源码介绍

文章目录

  • 前言
    • 线程池是什么
    • 线程池解决了哪些问题
    • 本文主要讲述什么
    • 感谢读者
  • 线程池 UML 类图
  • ThreadPoolExecutor 内部设计
    • 核心参数
    • 内部类
    • 任务队列
    • 拒绝策略
  • ThreadPoolExecutor 源码
    • 线程池生命周期
    • 线程池构造函数
    • execute() 【提交任务】
    • addWorker() 方法 【添加工作线程并启动】
      • 了解 retry:
      • addWordker() 方法讲述
    • Worker 类
    • runWorker 【执行任务】
    • getTask 【获取任务】
    • shutdown() 【停止所有线程】
    • shutdownNow() 【停止所有任务,中断执行的任务】
    • tryTerminate() 【根据线程池状态进行判断是否结束线程池】
  • 总结
  • 参考与感谢

前言

线程池是什么

随着现在计算机的飞速发展,现在的计算机基本都是多核 CPU 的了。在目前的系统中创建和销毁线程都是十分昂贵的,为了不频繁的创建和销毁,以及能够更好的管理线程,就出现了池化的思想。

线程池解决了哪些问题

线程池的出现,解决了系统中频繁创建和销毁线程的问题。从系统的角度看还解决了一些问题:
1、降低了资源损耗:线程池中会有一些线程,利用这些线程达到重复使用线程执行任务的作用,避免了频繁的线程创建和销毁,这样就大大降低了系统的资源损耗;
2、提高了线程的管理手段:通过监控线程池中线程的数量和状态,来进行调优和控制;
3、提高的响应速度:当一个任务给到线程池后,一旦有空闲的线程会立即执行该任务;

本文主要讲述什么

本文主要讲述线程池主要实现类 ThreadPoolExecutor 类的源码,让大家能够更加清楚明白 ThreadPoolExecutor 线程池内部的核心设计与实现,以及内部机制有哪些。

感谢读者

读者同学们如果发现了哪些不对的地方请评论或私信我,我会及时更正!非常感谢!!!



线程池 UML 类图

在这里插入图片描述

  • Executor 接口:这是线程池顶级的接口,定义了一个 execute(Runnable command) 方法,用于提交任务。
  • ExecutorService 接口:继承自 Executor 接口,提供了更多管理线程生命周期的方法,如 shutdown()、shutdownNow()、submit()、invokeAll() 等。主要的作用是扩充执行任务的能力和提供了管控线程池的方法。
  • AbstractExecutorService 抽象类:这是 ExecutorService 的一个抽象实现类,提供了一些 ExecutorService 接口的默认实现,减少了实现 ExecutorService 接口的类的工作量。让下层实现类只需要关注执行任务的方法即可。
  • ThreadPoolExecutor 类:ThreadPoolExecutor 是线程池实现类。继承 AbstractExecutorService,主要用于维护线程池的生命周期、管理线程和任务。它提供了多种构造方法,允许我们对线程池的行为进行详细配置,包括核心线程数、最大线程数、线程空闲时间、任务队列等。


ThreadPoolExecutor 内部设计

核心参数

参数名称参数解释
corePoolSize(必填)核心线程数,即线程池一直存活的最线程数量。但是将allowCoreThreadTimeOut参数设置为true后,核心线程处于空闲一段时间以上,也会被回收。
maximumPoolSize(必填)线程池中最大线程数,当核心线程都忙起来并且任务队列满了以后,就开始创建新线程,知道创建的数量到达设置的 maximumPoolSize 数。
keepAliveTime(必填)线程空闲时间,即当线程数大于核心线程数时,非核心线程的空闲时间超过这个时间后,就会被回收。将allowCoreThreadTimeOut参数设置为true后,核心线程也会被回收。
unit(必填)keepAliveTime 的时间单位。有:TimeUnit.DAYS(天)、TimeUnit.HOURS(小时)、TimeUnit.MINUTES(分钟)、TimeUnit.SECONDS(秒)、TimeUnit.MILLISECONDS(毫秒)、TimeUnit.MICROSECONDS(微秒)、TimeUnit.NANOSECONDS(纳秒)。
workQueue(必填)任务队列,用于保存等待执行的任务。
threadFactory线程工厂,用于指定创建新线程的方式。
handler拒绝策略,当任务过多而无法处理时,采取的处理策略。

内部类

在 ThreadPoolExecutor 中有五个核心的内部类,分别是 AbortPolicy、CallerRunsPolicy、DiscardOldestPolicy、DiscardPolicy 和 Worker。总的来说其实就两类:拒绝策略(Policy)和工作线程类(Worker)。

拒绝策略内部类(Policy)是当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,该如何处理呢?这里的拒绝策略,就是解决这个问题的。拒绝策略表示当任务队列满了且线程数也达到最大了,这时候再新加任务,线程池已经无法承受了,这些新来的任务应该按什么逻辑来处理。

工作线程内部类(Worker)是每一个 Worker 类就会绑定一个线程(Worker 类有一个成员属性持有一个线程对象),可以将 Worker 理解为线程池中执行任务的线程。但是实际上它并不仅仅是一个线程,Worker 里面还会有一些统计信息,存储一些相关的数据。

任务队列

  • SynchronousQueue:SynchronousQueue 是一个没有容量的队列(不知道能不能称为一个队列)。它是 BlockingQueue 的一个实现类,与其他实现 BlockingQueue 的实现类对比,它没有容量,更轻量级;入队和出队要配对,也就是说当你入队一个元素后,必须先出队才能入队,否则插入的线程会一直等待。
  • LinkedBlockingQueue:LinkedBlockingQueue 是一个支持并发操作的有界阻塞队列,底层数据结构是由一个单链表维护。当使用它的时候,不设置长度限制,则默认 Integer.MAX_VALUE 做为最大容量。
  • ArrayBlockingQueue:ArrayBlockingQueue 也是一个支持并发操作的有界阻塞队列,与 LinkedBlockingQueue 不同的是,ArrayBlockingQueue 底层数据结构是由一个环形数组数据结构维护的。同样是当使用它的时候,不设置长度限制,则默认 Integer.MAX_VALUE 做为最大容量。

另外,还支持另外 4 种队列:

  • PriorityBlockingQueue:PriorityBlockingQueue 是一个支持优先级处理操作的队列,你可以按照某个规则自定义这个优先级,以保证优先级高的元素放在队列的前面,进行出队的时候能够先出优先级高的元素。
  • DelayQueue:DelayQueue 是一个延迟队列,队列中每一个元素都会有一个自己的过期时间,每当使用出队操作的时候,只有过期的元素才会被出队,没有过期的依然会留在队列中。
  • LinkedBlockingDeque:LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列,可以头部和尾部进行操作。
  • LinkedTransferQueue:LinkedTransferQueue 由链表结构组成的无界阻塞队列。它的设计比较特别,是一种预约模式的设计。当出队时如果队列中有数据则直接取走,没有的话会在队列中占一个位子,这个位子的元素为 null,当有其他线程入队时,则会通知出队的这个线程,告诉它你可以拿走这个元素了。

拒绝策略

当添加任务时,如果线程池中的容量满了以后,线程池会做哪些策略?下面是线程池的一些策略:

  • AbortPolicy(默认):AbortPolicy 策略会将新的任务丢弃并抛出 RejectedExecutionException 异常。
  • CallerRunsPolicy:CallerRunsPolicy 策略是直接运行这个任务的run方法,但并非是由线程池的线程处理,而是交由任务的调用线程处理。
  • DiscardPolicy:DiscardPolicy 策略是直接丢弃任务,不抛出任何异常。
  • DiscardOldestPolicy:DiscardOldestPolicy 策略是将当前处于等待队列列头的等待任务强行取出,然后再试图将当前被拒绝的任务提交到线程池执行。


ThreadPoolExecutor 源码

线程池生命周期

// 管理线程池的状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 表示低 29 位的值,用于表示线程池中工作线程的数量
private static final int COUNT_BITS = Integer.SIZE - 3;
// 用于表示线程池中工作线程的数量的最大值,等于 2^29-1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// 线程池的五种状态,高 3 位表示,下面的状态值依次增高,是根据他们状态流程的顺序依次增高的
// RUNNING 状态
private static final int RUNNING    = -1 << COUNT_BITS;
// SHUTDOWN 状态
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// STOP 状态
private static final int STOP       =  1 << COUNT_BITS;
// TIDYING 状态
private static final int TIDYING    =  2 << COUNT_BITS;
// TERMINATED 状态
private static final int TERMINATED =  3 << COUNT_BITS;// 通过与运算,计算返回线程池的状态,将高3位的值保留,低 29 位的值置为 0
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 通过与运算,计算返回线程池的状态,将高 3 位的值置为 0,低 29 位的值保留
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 通过或运算,计算返回线程池的状态,将高 3 位和低 29 位的值都保留,合并为 ctl 并返回
private static int ctlOf(int rs, int wc) { return rs | wc; }// runStateLessThan 方法用于判断线程池的状态是否小于某个状态
private static boolean runStateLessThan(int c, int s) {return c < s;
}// runStateAtLeast 方法用于判断线程池的状态是否大于等于某个状态
private static boolean runStateAtLeast(int c, int s) {return c >= s;
}// isRunning 方法用于判断线程池的状态是否是 RUNNING 状态
private static boolean isRunning(int c) {return c < SHUTDOWN;
}

上面这些源码主要是写了当前线程池的状态、线程池的五种状态值定义、计算线程池的状态和判断线程池的状态。
这里再补充一下线程池的五种状态:

  • RUNNING 状态:线程池创建后,初始化时会将线程池的状态设置为 RUNNING,表示可接受新任务,并且执行队列中的任务;
  • SHUTDOWN 状态:当调用了 shutdown() 方法,则线程池处于 SHUTDOWN 状态,表示不接受新任务,但会执行队列中的任务;
  • STOP 状态:当调用了 shutdownNow() 方法,则线程池处于 STOP 状态,表示不接受新任务,并且不再执行队列中的任务,并且中断正在执行的任务;
  • TIDYING 状态:所有任务已经中止,且工作线程数量为 0,最后变迁到这个状态的线程将要执行 terminated() 钩子方法,只会有一个线程执行这个方法;
  • TERMINATED 状态:中止状态,已经执行完 terminated() 钩子方法;

线程池状态执行流程:
在这里插入图片描述


线程池构造函数

当我们创建线程池时,不管是通过 Executor 工具类还是手动创建线程池,最终都会调用的下面这个构造方法来实现的。

/*** 构造方法,创建线程池。* * @param corePoolSize    线程池的核心线程数量* @param maximumPoolSize 线程池的最大线程数* @param keepAliveTime   当线程数大于核心线程数时,多余的空闲线程存活的最长时间* @param unit            存活时间的时间单位* @param workQueue       任务队列,用来储存等待执行任务的队列* @param threadFactory   线程工厂,用来创建线程,一般默认即可* @param handler         拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务*/
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory, RejectedExecutionHandler handler) {if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;
}

这里简单介绍一下线程池的内部结构设计:
在这里插入图片描述

在线程池中,如果执行的线程数超过了核心线程数,那么这些多余线程的最长存活时间,不会超过 keepAliveTime 参数。


execute() 【提交任务】

execute() 方法的主要目的就是将任务执行起来。

public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 1. 如果工作线程数量小于核心数量if (workerCountOf(c) < corePoolSize) {// 添加一个工作线程(核心)  并将该任务作为该工作线程的第一个任务,创建线程完成后直接返回if (addWorker(command, true))return;c = ctl.get();}// 2. 如果达到了核心数量且线程池是运行状态,则将任务加入队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 再次检查线程池状态,如果不是运行状态,就移除任务并执行拒绝策略if (!isRunning(recheck) && remove(command))reject(command);// 容错检查工作线程数量是否为0,如果为0就创建一个,此时就不用为工作线程绑定任务了,因为任务已经加入到队列中了else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 3. 任务入队列失败(例如队列满了),尝试创建非核心工作线程,将该任务和非核心工作线程绑定在一起else if (!addWorker(command, false))// 尝试创建非核心工作线程失败,执行拒绝策略reject(command);
}

上面这些源码和注释,主要讲述 execute() 方法是怎么做的校验和做了哪些步骤。用流程图表达:
在这里插入图片描述


addWorker() 方法 【添加工作线程并启动】

了解 retry:

在了解 addWorker() 方法之前,先了解一下 retry:
下面是一个示例

public static void main(String[] args) {int i = 0;// todo retry: 标记在这里retry:for (;;) {i++;System.out.println("i=" + i);int j = 0;for (;;) {System.out.println("j=" + j);j++;if (j == 3) {break retry;}}}
}

在执行完下面代码后,通过控制台可以看到下面这个输出

i=1
j=0
j=1
j=2

接下来把示例代码改一下

public static void main(String[] args) {int i = 0;for (;;) {i++;System.out.println("i=" + i);int j = 0;// todo retry: 标记在这里retry:for (;;) {System.out.println("j=" + j);j++;if (j == 3) {break retry;}}}
}

输出的内容如下

0
j=1
j=2
i=232781
j=0
j=1
j=2
i=232782
j=0
j=1
j=2
i=232783
...

通过两个示例,能够发现,retry 相当于一个标记,只用在循环里面,break 的时候会到 retry 标记处。如果 retry 没有在循环(for,while)里面,在执行到 retry 时,就会跳出整个循环。如果 retry 在循环里面,可以理解为跳到了关键字处执行,不管几层循环。continue理解也是一样。
需要注意的是: retry: 需要放在for,whlie,do…while的前面声明,变量只跟在 break 和 continue 后面。


addWordker() 方法讲述

addWordker() 方法主要的目的是创建一个线程,然后启动执行,期间也会判断线程池状态和工作线程数量等各种检测。

/*** addWordker() 方法主要做的事情是创建一个线程,然后启动执行,期间也会判断线程池状态和工作线程数量等各种检测。** @param firstTask 创建线程后执行的任务* @param core      该参数决定了线程池容量的约束条件,即当前线程数量以何值为极限值。参数为 true 则使用 corePollSize 作为约束值,否则使用 maximumPoolSize。* @return 是否创建线程并启动成功*/
private boolean addWorker(Runnable firstTask, boolean core) {// 外层循环主要是检查线程池状态retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 线程池状态检查// 1、线程池是非 RUNNING 状态// 2、线程池是 SHUTDOWN 状态 并且 传入的任务是 null 并且 workQueue 不等于空// 两种情况都会返回 falseif (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))return false;// 内层循环:线程池添加核心线程并返回是否添加成功的结果for (;;) {// 获取当前线程数int wc = workerCountOf(c);// 判断是否饱和容量了,如果已经达到最大线程数则不能再新建线程,直接返回 falseif (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))return false;// 完成了上面的判断后,说明现在线程池可以创建新线程,则通过 CAS 将线程数量加 1,因为后面要创建线程了,并跳出外层循环if (compareAndIncrementWorkerCount(c))break retry;// 如果上面的 compareAndIncrementWorkerCount(c) 方法返回 false,则说明有其他线程在操作线程池的线程数量,所以需要重新获取 ctlc = ctl.get();// 如果当前的运行状态已经和最开始获取的状态不一样了,则重新进入外层循环if (runStateOf(c) != rs)continue retry;}}// 如果上面的条件满足,则会把工作线程数量加1,然后执行下面创建线程的动作// 标记是否启动了工作线程boolean workerStarted = false;// 标记是否成功添加了工作线程boolean workerAdded = false;// 要创建的 Worker 对象Worker w = null;try {// 创建工作线程// 增加一个 worker   这个 firstTask 就是一个线程(任务线程,每一个 Worker 都和一个任务线程绑定)w = new Worker(firstTask);// 这个是绑定在 Worker 上的工作线程,并不是任务线程,工作线程是用来执行任务的final Thread t = w.thread;// 判断 t 是否为 nullif (t != null) {// 获取线程池的锁final ReentrantLock mainLock = this.mainLock;// 在读取线程池状态的时候就应该上锁,防止有并发操作破坏程序的一致性,上下不一致mainLock.lock();try {// 锁定后并重新检查线程池的状态,查看是否存在线程工厂的失败或者锁定前的关闭// 获取线程池运行状态int rs = runStateOf(ctl.get());// 检查线程池状态// rs < SHUTDOWN表示是 RUNNING 状态;// 如果 rs 是 RUNNING 状态或者 rs 是 SHUTDOWN 状态并且 firstTask 为 null,向线程池中添加线程。// 因为在 SHUTDOWN 时不会在添加新的任务,但还是会执行 workQueue 中的任务if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) throw new IllegalThreadStateException();// 添加到工作线程队列。 workers 是线程池中存放工作线程的集合// 增加 work  每一个线程池都持有一个 workers 集合,里面存储着线程池的所有 worker,也就是所有线程workers.add(w);// 还在池子中的线程数量(只能在mainLock中使用)int s = workers.size();// 不能超过线程池的最大线程数量if (s > largestPoolSize)largestPoolSize = s;// 标记线程添加成功workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {// 线程添加成功之后启动线程// 启动绑定在 worker上的线程。启动了之后该工作线程就会开始从任务队列中拿任务去执行了t.start();// 标记工作线程启动成功workerStarted = true;}}} finally {// 线程启动失败,执行失败方法(线程数量减1,执行tryTerminate()方法等)if (! workerStarted)addWorkerFailed(w);}// 返回新创建的工作线程是否启动成功return workerStarted;
}

在 addWorker 方法中,如果添加 Worker 并且启动线程失败,则会做失败后的处理:

private void addWorkerFailed(Worker w) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 移除添加失败的 workerif (w != null)workers.remove(w);// 减少 worker 数量decrementWorkerCount();// 尝试终止线程池tryTerminate();} finally {mainLock.unlock();
}

通过上面的源码,可以知道所有的线程都是放在 Worker 这个类中的,到目前为止,任务还并没有执行, 真正的执行是在 Worker 内部类中执行的,在下面会聊到 Worker 内部类中是怎么执行任务的,注意一下这里的 t.start() 这个语句,启动时会调用 Worker 类中的run方法,Worker 本身实现了 Runnable 接口,所以一个 Worker 类型的对象也是一个线程。现在先来总结创建一个线程执行工作任务 addWorker() 这个方法。

流程图表示:
在这里插入图片描述


Worker 类

通过上面的分析,就可以知道每一个任务都会放在 Worker 类中,由 Worker 类中的线程去最终执行任务。
下面就来看一下 Worker 类的源码:

private final class Workerextends AbstractQueuedSynchronizerimplements Runnable
{private static final long serialVersionUID = 6138294804551838833L;// 执行任务的线程final Thread thread;// 要被执行的任务Runnable firstTask;// 记录线程完成的任务数volatile long completedTasks;// worker 类的构造方法// 传入一个需要执行的任务,并且通过 getThreadFactory() 创建一个线程来执行任务Worker(Runnable firstTask) {// 设置 worker 线程状态为-1,表示禁止线程中断// state:锁状态,-1为初始值,0为unlock状态,1为lock状态setState(-1); // 在调用runWorker前,禁止中断this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}// 执行任务public void run() {runWorker(this);}// 是否锁住了任务protected boolean isHeldExclusively() {return getState() != 0;}// 尝试获取锁的方法protected boolean tryAcquire(int unused) {// 尝试修改线程状态if (compareAndSetState(0, 1)) {// 设置 exclusiveOwnerThread 为当前线程setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}// 尝试释放锁protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}// 加锁public void lock()        { acquire(1); }// 尝试获取锁public boolean tryLock()  { return tryAcquire(1); }// 解锁public void unlock()      { release(1); }// 是否锁住public boolean isLocked() { return isHeldExclusively(); }// 线程启动后,进行线程中断时执行方法void interruptIfStarted() {Thread t;// worker中状态 >= 0 且 线程不为空 且 当前线程未中断if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {// 中断线程t.interrupt();} catch (SecurityException ignore) {}}}
}

runWorker 【执行任务】

通过 Worker 类中的源码可以得知,在类中最终是调用了 runWorker 方法,执行的任务,那么接下来就来看一下 runWorker 方法。

final void runWorker(Worker w) {// 获取当前线程Thread wt = Thread.currentThread();// 获取要执行的任务Runnable task = w.firstTask;w.firstTask = null;// 解锁,设置 worker 状态 state 为 0,表示允许线程中断w.unlock(); // allow interrupts// 标识线程退出原因,true 代表线程异常退出,false 代表线程正常退出boolean completedAbruptly = true;try {// 当前任务是为空 或 从任务队列获取的任务为空 则停止循环// getTask() 方法从任务队列获取一个任务while (task != null || (task = getTask()) != null) {// 加锁处理并发问题,防止 shutdown() 时终止正在执行的 workerw.lock();// 如果线程池状态 >= STOP(即 STOP 或 TERMINATED) 并且当前线程未被中断if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())// 中断当前线程wt.interrupt();try {// 调用自定义的任务启动前的方法beforeExecute(wt, task);Throwable thrown = null;try {// 执行任务task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 调用自定义的任务启动后的方法afterExecute(task, thrown);}} finally {// 任务对象设置成 nulltask = null;// 完成的工作数 +1w.completedTasks++;// 解锁w.unlock();}}// 标识线程为正常退出completedAbruptly = false;} finally {// 处理 worker 线程退出,主要是用来销毁空闲线程,控制线程池中线程数量的大小变化processWorkerExit(w, completedAbruptly);}
}

通过上面的源码可以得知在 runWorker 方法内部的主要流程主要如下:

  1. 首先解锁 worker,允许线程中断;
  2. 通过 while 循环判断当前任务是否为空,若为空则调用 getTask() 方法从阻塞队列中获取待处理的任务;
  3. 获取到任务后工作线程直接加锁,同时根据线程池状态判断是否中断当前线程;
  4. 执行任务,在任务执行前后执行自定义扩展方法;
  5. 重复第 2 步的逻辑,直到阻塞队列中的任务全部执行完毕,最后调用 processWorkerExit() 方法处理 worker 线程退出、销毁空闲线程和控制线程池中线程数量的大小变化;
    下面使用比较直观详细的流程图来表示出来:

在这里插入图片描述


getTask 【获取任务】

getTask() 方法的主要作用是从任务队列中获取任务。

private Runnable getTask() {// 标记上一次从阻塞队列获取任务是否超时boolean timedOut = false; // Did the last poll() time out?// 自旋for (;;) {int c = ctl.get();// 获取当前线程池状态int rs = runStateOf(c);// 判断是否需获取任务// 若线程池状态 >= STOP 或 (线程池状态为 SHUTDOWN 且 阻塞队列为空),则不需要再处理任务if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {// 工作线程数-1decrementWorkerCount();// 返回return null;}// 获取当前工作线程数int wc = workerCountOf(c);// timed 标记用于判断是否需进行超时控制// allowCoreThreadTimeOut 是否运行核心线程超时销毁(默认为false,核心线程不允许超时销毁)// wc > corePoolSize 当前工作线程数大于核心线程数boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;/*** 当前工作线程数 > maximumPoolSize* 或* timed && timedOut 为 true** 并且** 工作线程数大于1* 或* 阻塞队列为空*/if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {// 使用 CAS 对工作线程数 -1if (compareAndDecrementWorkerCount(c))return null;continue;}// 开始从阻塞队列获取任务try {// timed 为 true 代表核心线程允许超时销毁 或 当前工作线程数大于核心线程数(存在非核心线程)// timed = true:超时等待获取任务 poll(),超时未获取到则返回 null// timed = false:阻塞获取任务 take(),一直等着直到获取到任务Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();// 判断是否获取到任务if (r != null)// 获取到直接返回return r;// 未获取到,代表等待超时也没有拿到任务,设置timedOut值为truetimedOut = true;} catch (InterruptedException retry) {// 即使异常也循环重试timedOut = false;}}
}

通过上面的 getTask() 方法原发分析,大致的主要的过程就是:

  1. 校验线程池状态和工作线程数是否合规;
  2. 根据条件选择从任务队列中超时等待获取,还是从任务队列中阻塞获取;

下面用一个直观的流程图来表示:

在这里插入图片描述


shutdown() 【停止所有线程】

shutdown() 方法会把状态修改成 SHUTDOWN。并且这里肯定会成功,因为修改时是通过 自旋 的方式,不成功不退出。

public void shutdown() {// 获取线程池锁final ReentrantLock mainLock = this.mainLock;// 加锁,修改线程池状态mainLock.lock();try {// 检查是否有权限关闭线程池checkShutdownAccess();// 修改状态为SHUTDOWN,自旋操作,只有状态修改成功才会返回advanceRunState(SHUTDOWN);// 标记空闲线程为中断状态interruptIdleWorkers();onShutdown();} finally {mainLock.unlock();}tryTerminate();
}private void advanceRunState(int targetState) {// 自旋修改线程池状态for (;;) {// 获取ctlint c = ctl.get();// 如果状态大于 SHUTDOWN(因为只有 RUNNING 状态才能转化为 SHUTDOWN 状态,而只有 RUNNING 状态是小于 SHUTDOWN 状态的),// 或者修改为 SHUTDOWN 成功了,才会 break 跳出自旋if (runStateAtLeast(c, targetState) ||ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))break;}
}

shutdownNow() 【停止所有任务,中断执行的任务】

shutdownNow() 方法会把线程池的状态改成 STOP,线程池不接受新任务,且不再执行队列中的任务,且中断正在执行的任务,同时标记所有线程为中断状态。

public List<Runnable> shutdownNow() {List<Runnable> tasks;// 获取线程池的锁final ReentrantLock mainLock = this.mainLock;// 加锁,进行状态修改操作mainLock.lock();try {// 检查是否有权限关闭线程池checkShutdownAccess();// 修改为 STOP 状态advanceRunState(STOP);// 标记所有线程为中断状态interruptWorkers();// 将队列中的任务全部移除,并返回tasks = drainQueue();} finally {mainLock.unlock();}// 尝试终止线程池tryTerminate();// 返回队列中的任务return tasks;
}private List<Runnable> drainQueue() {BlockingQueue<Runnable> q = workQueue;ArrayList<Runnable> taskList = new ArrayList<Runnable>();// 将队列中的元素添加到一个集合中去,但是如果队列是 DelayQueue 或者 其他类型的 Queue 时使用 drainTo 就会失败,所以就需要逐个删除q.drainTo(taskList);if (!q.isEmpty()) {for (Runnable r : q.toArray(new Runnable[0])) {if (q.remove(r))taskList.add(r);}}return taskList;
}

tryTerminate() 【根据线程池状态进行判断是否结束线程池】

所有任务已经中止,且工作线程数量为0,就会进入这个状态。最后变迁到这个状态的线程将要执行 terminated() 钩子方法,只会有一个线程执行这个方法。

final void tryTerminate() {// 自旋修改状态for (;;) {int c = ctl.get();// 下面几种情况不会执行后续代码// 1. 运行中// 2. 状态的值比 TIDYING 还大,也就是 TERMINATED// 3. SHUTDOWN 状态且任务队列不为空if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;// 工作线程数量不为 0,也不会执行后续代码if (workerCountOf(c) != 0) {// 尝试中断空闲的线程interruptIdleWorkers(ONLY_ONE);return;}// 获取线程池的锁final ReentrantLock mainLock = this.mainLock;// 加锁mainLock.lock();try {// CAS 修改状态为 TIDYING 状态if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {// 更新成功,执行 terminated 钩子方法terminated();} finally {// 确保 terminated 钩子方法执行完毕后,再次修改状态为 TERMINATED(最终的终止状态),线程池彻底关闭// 强制更新状态为 TERMINATED,这里不需要 CAS 了ctl.set(ctlOf(TERMINATED, 0));// 通知所有等待线程termination.signalAll();}return;}} finally {// 解锁mainLock.unlock();}}
}

更新成 TIDYING 或者 TERMINATED 状态的代码都在 tryTerminate() 方法中,而 tryTerminate() 方法在很多个地方被调用,比如 shutdown()、shutdownNow()、线程退出时,所以说几乎每个线程最后消亡的时候都会调用 tryTerminate() 方法,但最后只会有一个线程真正执行到修改状态为 TIDYING 的地方。
修改状态为 TIDYING 后执行 terminated() 方法,最后修改状态为 TERMINATED,标志着线程池真正消亡了。



总结

最后,使用流程图来呈现一下 ThreadPoolExecutor 运行机制的图:
在这里插入图片描述



参考与感谢

  • 美团线程池文章
  • Agodival 博主的线程池文章
  • 云深i不知处 博主的线程池文章

非常感谢以上的博主!!!





End



http://www.mrgr.cn/news/24596.html

相关文章:

  • 深入理解 JavaScript:进阶概念与实战技巧
  • Linux用户和组群账户管理
  • 使用libmpeg解码mp3格式文件
  • ORCAD位号,BOOM表
  • 【Linux】【Vim】Vim 基础
  • 【网络安全】-文件下载漏洞-pikachu
  • 什么是谷歌留痕?
  • windows下 MySQL8.4.2 LTS 解压版的安装使用
  • Python 资源管理的得力助手
  • 动态规划(算法)---03.斐波那契数列模型_最小花费爬楼梯
  • 记录一次NGINX和Java后端造成的CORS跨域BUG
  • (一)springboot2.6.13+mybatis-plus3.5.3.1+shardingsphere4.0.0-RC2
  • 如何通过海外云手机提升运营效率
  • 漏洞挖掘 | 产出如此简单?BigF5内网ip泄漏
  • k8s环境配置
  • Leetcode 701-二叉搜索树中的插入操作
  • 287. 寻找重复数(stl法)
  • 即插即用篇 | YOLOv8 引入并行的分块注意力 | 北京大学 2024 | 微小目标
  • QT设置闹钟超时播报
  • 1.简述语言建模LM、统计语言建模SLM、神经语言模型NLM、预训练语言模型PLM、大语言模型LLM