当前位置: 首页 > news >正文

细说 Java 线程池

一、 Java 线程池使用

1.1、线程池创建

在这里插入图片描述

1.2、线程池执行流程

提交任务到线程池时,调用execute 方法如下图所示:

在这里插入图片描述

1.3、 线程池关闭

  • shutdown()

    拒绝新任务,等待已提交任务(包括队列中的任务)执行完毕,不中断正在运行的任务。

    在这里插入图片描述

  • shutdownNow()

    强制关闭:中断所有线程(包括运行中的任务),返回未执行的任务列表,慎用。

    在这里插入图片描述

1.3.1、 优雅关闭

ExecutorService pool = Executors.newFixedThreadPool(5);// 1. 启动关闭流程,拒绝新任务
pool.shutdown(); try {// 2. 等待现有任务完成(超时时间根据业务设定)if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {// 3. 超时后强制终止List<Runnable> unfinishedTasks = pool.shutdownNow(); log.warn("强制关闭,未完成任务数: {}", unfinishedTasks.size());// 4. 再次等待线程响应中断if (!pool.awaitTermination(30, TimeUnit.SECONDS)) {log.error("线程池未能完全关闭");}}
} catch (InterruptedException e) {// 5. 处理中断异常,重新尝试强制关闭pool.shutdownNow();Thread.currentThread().interrupt(); 
}

1.4、线程池状态

线程有状态(详见Java线程状态详解),线程池也有状态。

1.4.1、状态概览

线程池状态如下所示:
状态类型为intRUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED

在这里插入图片描述

  • RUNNING:线程池创建后的状态,接收新任务,以及对已添加到队列中的任务进行处理。
  • SHUTDOWN:调用 shutdown 方法,线程池就会转换成 SHUTDOWN 状态,此时线程池不再接收新任务,继续处理任务队列中剩余的任务。
  • STOP:调用 shutdownNow 方法,线程池就会转换成 STOP 状态,不接收新任务,不处理队列中的剩余任务,尝试中断正在处理的任务的线程。
  • TIDYING:队列为空,线程已终止;
    • SHUTDOWN 状态下,所有任务(包括队列)均已完成,线程线程池会变为 TIDYING 状态;
    • STOP 状态下,队列任务均已导出,线程池会变为 TIDYING 状态。
  • TERMINATED:线程池彻底终止。线程池在 TIDYING 状态执行完 terminated详见下文)方法就会转变为 TERMINATED 状态。

1.4.2、线程池状态存储

    private static final int COUNT_BITS = Integer.SIZE - 3; // 32 - 3 = 29 ,线程数量为2^29, 高3位表示线程池状态,最多可以8种。private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// Packing and unpacking ctlprivate static int runStateOf(int c)     { return c & ~COUNT_MASK; }private static int workerCountOf(int c)  { return c & COUNT_MASK; }private static int ctlOf(int rs, int wc) { return rs | wc; }

线程池状态(runStateOf)和线程池中线程数量(workerCountOf)一起存在了 ctl 成员变量中,通过高位掩码提取状态(runStateOf)、低位掩码提取线程数(workerCountOf)的位运算提高效率(相较于传统条件判断或锁操作)。

  • 原子性:通过单个原子变量(AtomicInteger)的 CAS 操作,确保线程池状态(runState)和线程数(workerCount)的修改是原子的,避免状态与数量不一致导致的逻辑错误。

    举例:当线程池进入 SHUTDOWN 状态时,若状态和线程数分开维护,可能出现状态已更新但线程数意外增加的情况,导致错误地接受新任务。

  • 内存占用优化: 1个 AtomicInteger(4Byte)替代2个变量,减少内存占用。

  • 竞争条件规避:关闭线程池时,需确保状态变为 TERMINATED 的同时线程数为0,通过 ctl 的原子性操作可避免中间状态的不一致。

  • 减少锁开销:仅通过 一次 CAS 操作即可同时修改状态和线程数。

1.4.3、方法tryTerminate

在这里插入图片描述

  • 方法tryTerminate主要是做线程池的一些收尾工作,转换线程池状态为TERMINATED,彻底终止线程池。

    关闭线程池的方法 shutdown()shutdownNow() 在方法结束前均会调用此方法。其它调用此方法的方法如下图所示:

    在这里插入图片描述

二、 Java 线程池细节

2.1、线程池拒绝任务策略

当线程池中任务队列已满,并且线程数量达到了最大数量后,线程池将采取指定的拒绝策略来处理新提交的任务,具体策略如下。

2.1.1、DiscardPolicy

  • 直接丢弃任务

在这里插入图片描述

2.1.2、AbortPolicy

  • 丢弃任务后,抛出运行时异常;
    在这里插入图片描述
    AbortPolicy 是线程池默认拒绝策略
    在这里插入图片描述

2.1.3、DiscardOldestPolicy

  • 踢出队列最老的任务;
  • 再次提交当前任务;
    在这里插入图片描述

2.1.4、CallerRunsPolicy

  • 由提交任务的线程来执行任务;
    在这里插入图片描述

    该策略使用需注意,若 caller 为主线程,且该任务耗时,则会阻塞主线程,后续通过主线程提交到线程池的任务也会相应被阻塞。系统可能直接卡死。

2.2、线程池 Worker

  • 线程池内部使用HashSet来存储所有Worker;
  • 注意:注释说,需要配合锁(mainLock)来访问,这里了解即可,详见后文2.4小节。

在这里插入图片描述

2.2.1、线程池 Worker 继承了 AQS

如下图所示,Worker 通过继承 AQS(更多细节查阅自旋、CLH、AQS浅析),来实现一个非重入的独占锁,来确保执行任务期间不会被线程池的其它方法中断。

注释中给了我们一个可能会中断线程的方法setCorePoolSize

在这里插入图片描述

Worker 覆写 AQS 的方法如下图所示:

在这里插入图片描述

2.2.1.1、方法 setCorePoolSize

源码如下,该方法用于设置线程池的核心线程数量,当传入值小于当前线程池的核心线程数时,会通过方法interruptIdleWorkers()销毁空闲线程。

在这里插入图片描述

2.2.1.2、方法 interruptIdleWorkers()

在这里插入图片描述

在这里插入图片描述
如上图所示,方法 interruptIdleWorkers() 就是负责中断线程的。

中断线程前会尝试加锁,正是Worker对AQS实现的锁。

接下来看下Worker这把锁发挥威力的地方:方法runWorker() 是用来执行任务的。

如下图所示,每次执行任务前加锁,执行完任务后释放锁。

在这里插入图片描述

2.2.2、线程池 Worker 实现了 Runnable

覆写了Runnable中的run()方法如下图示,正是调用了我们前文提到的runWorker方法。

在这里插入图片描述

2.2.2.1 Worker 在哪里被启动的

谁创建Worker ,谁负责启动Worker。
是谁,到底是谁创建的,当然是我们在提交任务(execute)时通过方法addWorker创建的。

在这里插入图片描述

在这里插入图片描述
至此,我们的Woker成功创建并启动起来干活了。

2.3、线程池队列

2.3.1、线程池队列获取任务

线程获取任务是通过getTask()方法,如下图所示。

在这里插入图片描述

  • 若队列有任务就获取任务后返回;

  • 若队列为空,就阻塞线程;一直到线程被中断后,继续下次循环;

  • 线程被谁中断呢?

    • 前文提到的方法 interruptIdleWorkers()中会设置线程中断。
      在这里插入图片描述
    • 方法interruptWorkers()也会设置线程中断。
      在这里插入图片描述
      在这里插入图片描述
  • 线程被中断(比如线程池调用shutdownNow() 或者 shutdown() )后,进入下一次循环,发现线程池需要关闭,所以就减少线程数量,返回null

    • runWorker方法中while (task != null || (task = getTask()) != null) 获取到的任务为空,结束循环,线程结束。
      在这里插入图片描述

2.4、线程池锁

在这里插入图片描述

在2.2小节一开始,我们提到过Worker的集合HashSet需要配合mainLock使用,接下来,看下为什么线程池内部要使用锁?

  • 避免中断风暴

    interruptIdleWorkers方法内部是串行化(循环遍历)中断Worker的。

  • 维持线程池内部状态一致性,如成员largestPoolSize用于统计线程池的内Worker曾经达到的最大数量,如果不加锁,该值的准确性无法保证 。

    • 更新largestPoolSize
      在这里插入图片描述
    • 读取largestPoolSize
      在这里插入图片描述
  • 确保线程池的Workers集合稳定。

    • 方法addWorker中添加worker
      在这里插入图片描述
    • 方法addWorkerFailed中移除worker
      在这里插入图片描述
      如上所示,这两个方法需要加锁来实现,因为Workers集合HashSet不是线程安全的。

    当然,这里可以使用线程安全的集合替代。但是Workers集合还要配合之前提到的largestPoolSize使用,不如使用一把锁直接解决来的更方便,更高效;

    这里的锁使用ReentrantLock而非内置锁synchronized是因为:

    • 灵活性ReentrantLock支持可中断的锁获取、公平性策略等,更适合复杂的线程控制。
    • 代码结构:显式锁的使用(配合try-finally块)使得加锁/解锁的边界更清晰,便于维护。

2.5、线程池钩子方法

每个任务执行前,我们可以定制两个方法:

  • beforeExecute
  • afterExecute
    在这里插入图片描述

定制举例:

  • 重新初始化ThreadLocal;
  • 数据统计;
  • 日志处理;

在这里插入图片描述

三、线程池工具类Executors

Executors 中常用的获取线程池的方法如下:

这些方法均没有对资源(线程数量或者队列长度)做合理限制,使用时会增加OOM的风险。

所以使用时需注意明确定义线程池的 7 个参数。

在这里插入图片描述


http://www.mrgr.cn/news/92717.html

相关文章:

  • AI 数据集生成和模型微调框架 Distilabel 高级指南:深度功能与最佳实践
  • Apache Spark中的依赖关系与任务调度机制解析
  • 数据库MySQL,在终端输入后,提示不是内部命令等
  • 《机器学习数学基础》补充资料:矩阵的LU分解
  • 硬编码(三)经典变长指令一
  • 千峰React:函数组件使用(3)
  • STL 算法库中的 min_element 和 max_element
  • React 中 useState 的 基础使用
  • LeetCode 热题 100_有效的括号(69_20_简单_C++)(栈;栈+哈希表(建立左右括号的对应关系))
  • Token相关设计
  • Hive配置
  • MQ 笔记
  • 初阶数据结构习题【3】(1时间和空间复杂度)——203移除链表元素
  • Android APK组成编译打包流程详解
  • CMU15445(2023fall) Project #2 - Extendible Hash Index 匠心分析
  • 学术小助手智能体
  • Python面试(八股)
  • 爬虫和逆向教程-专栏介绍和目录
  • 常见深度学习算法图解笔记
  • 【戒抖音系列】短视频戒除-4-为什么刷短视频停不下来?从多巴胺陷阱到沉浸式交互拆解上瘾机制的底层逻辑