JUC并发工具---线程池
为什么使用线程池
每个任务都创建一个线程带来的问题:
- 反复创建线程系统开销比较大,每个线程创建和销毁都需要时间,如果任务比较简单,那么就有可能导致创建和销毁线程消耗的资源比线程执行任务本身消耗的资源还要大
- 过多的线程会占用过多的内存等资源,还会带来过多的上下文切换,同时还会导致系统不稳定
线程池解决问题思路
- 线程池用一些固定的线程一直保持工作状态并反复执行任务
- 针对过多线程占用太多内存资源的问题,线程池会根据需要创建线程,控制线程的总数量,避免占用过多内存资源。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class ThreadPoolDemo {public static void main(String[] args) {ExecutorService service = Executors.newFixedThreadPool(5);for (int i = 0; i < 10000; i++) {service.execute(new Task());}// 关闭线程池,等待所有任务完成service.shutdown();try {while (!service.isTerminated()) {// 等待所有任务完成}} catch (Exception e) {e.printStackTrace();}System.out.println("All tasks completed.");}static class Task implements Runnable {@Overridepublic void run() {System.out.println("Task executed by: " + Thread.currentThread().getName());}}
}
使用线程池的好处
- 线程池可以解决线程生命周期的系统开销问题,同时还可以加快响应速度。
- 线程池可以统筹内存和cpu的使用,避免资源使用不当
- 线程池可以统一管理资源
线程各个参数的含义
参数名 | 含义 |
---|---|
corePoolSize | 核心线程数(常驻线程数量) |
maxPoolSize | 最大线程数 |
keepAliveTime+时间单位 | 空闲线程的存活时间 |
ThreadFactory | 线程工厂、用来创建新线程 |
workQueue | 用于存放任务的队列 |
Handler | 处理被拒绝的任务 |
线程创建的时机
如果达到最大线程数后还有任务不断被提交,那线程池就会拒绝这些任务。队列为空后,线程池根据keepAliveTime参数来销毁线程。
线程池的4种拒绝策略
拒绝的时机
- 当我们调用shutdown等方法关闭线程池后,即使线程池内部有未完成的任务在执行,再向线程池内提交任务,会遭到拒绝。
- 线程池没有能力继续处理新提交的任务,也就是工作已经非常饱和的时候。
拒绝策略
DiscardPlolicy:当新任务被提交后直接被丢弃掉,也不会给你任何的通知,这样就会存在风险造成数据丢失
AbortPolicy:拒绝任务时,会直接抛出一个类型为RejectedExecutionException的RuntimeException让你感知到任务被拒绝了,于是你便可以根据业务逻辑选择重试或者放弃提交等策略
DiscardOldestPolicy:如果线程池没被关闭且没有能力执行,则会丢弃任务队列中的头结点,通常是存活时间最长的任务这种策略与DiscardPlolicy不同之处在于它丢弃的不是最新提交的,而是队列中存活时间最长的,这样就可以腾出空间给新提交的任务,但同理它也存在一定的数据丢失风险
CallerRunsPolicy:当有新任务提交后,如果线程池没被关闭且没有能力执行,则把这个任务交于提交任务的线程执行,也就是谁提交任务,谁就负责执行任务。
CallerRunsPolicy好处:
- 提交的任务不会被丢弃,这样也就不会造成业务损失。
- 谁提交任务谁就要负责执行任务,这样提交任务的线程就得负责执行任务,而执行任务又是比较耗时的,在这段期间,提交任务的线程被占用,也就不会再提交新的任务,减缓了任务提交的速度,相当于是一个负反馈。在此期间,线程池中的线程也可以充分利用这段时间来执行掉一部分任务,腾出一定的空间,相当于是给了线程池一定的缓冲期。
6种常见的线程池
FixedThreadPool
核心线程数和最大线程数是一样的,所以可以把它看作是固定线程数的线程池。
CachedThreadPool
可缓存线程池,在于线程数是几乎可以无限增加的(实际最大可以达到Integer.MAX_VALUE,为2^31-1,这个数非常大,所以基本不可能达到),而当线程闲置时还可以对线程进行回收。也就是说该线程池的线程数量不是固定不变的,当然它也有一个用于存储提交任务的队列,但这个队列是SynchronousQueue,队列的容量为0,实际不存储任何任务,它只负责对任务进行中转和传递,所以效率比较高。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class ThreadPoolDemo {public static void main(String[] args) {ExecutorService service = Executors.newCachedThreadPool();for (int i = 0; i < 10000; i++) {service.execute(new Task());}// 关闭线程池,等待所有任务完成service.shutdown();try {while (!service.isTerminated()) {// 等待所有任务完成}} catch (Exception e) {e.printStackTrace();}System.out.println("All tasks completed.");}static class Task implements Runnable {@Overridepublic void run() {System.out.println("Task executed by: " + Thread.currentThread().getName());}}
}
ScheduledThreadPool
周期执行线程池,可以执行线程的执行任务周期。
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class ScheduledThreadPoolDemo {public static void main(String[] args) {ScheduledExecutorService scheduledService = Executors.newScheduledThreadPool(5);// 定义任务Runnable task1 = () -> {System.out.println("Task 1 executed by: " + Thread.currentThread().getName() + " at " + System.currentTimeMillis());};Runnable task2 = () -> {System.out.println("Task 2 executed by: " + Thread.currentThread().getName() + " at " + System.currentTimeMillis());};Runnable task3 = () -> {System.out.println("Task 3 executed by: " + Thread.currentThread().getName() + " at " + System.currentTimeMillis());};// 安排任务在5秒后执行一次scheduledService.schedule(task1, 5, TimeUnit.SECONDS);// 安排任务每隔2秒执行一次scheduledService.scheduleAtFixedRate(task2, 0, 2, TimeUnit.SECONDS);// 安排任务每隔2秒执行一次,首次延迟1秒scheduledService.scheduleWithFixedDelay(task3, 1, 2, TimeUnit.SECONDS);// 关闭线程池try {Thread.sleep(20000); // 让主线程等待一段时间,以便观察任务执行情况} catch (InterruptedException e) {e.printStackTrace();}scheduledService.shutdown();try {while (!scheduledService.isTerminated()) {// 等待所有任务完成}} catch (Exception e) {e.printStackTrace();}System.out.println("All tasks completed.");}
}
SingleThreadExecutor
SingleThreadExecutor会使用唯一的线程去执行任务,原理和FixedThreadPool是一样的,只不过这里线程只有一个,如果线程在执行任务的过程中发生异常,线程池也会重新创建一个线程来执行后续的任务
SingleThreadScheduledExecutor
ScheduledThreadScheduledExecutor会使用唯一的线程去执行任务,原理和ScheduledThreadPool是一样的,只不过这里线程只有一个,如果线程在执行任务的过程中发生异常,线程池也会重新创建一个线程来执行后续的任务
ForkJoinPool
ForkJoinPool是java中用于执行分治算法的线程池,特别适合于处理可以分解成多个子任务的任务。
不同于其他线程池共享一个工作队列,ForkJoinPool中除了有一个全局工作队列,每一个线程都有一个自己独立双端的工作队列deque。
具体来说:
- 全局工作队列:当一个任务被提交给 ForkJoinPool 时,它会被放入全局工作队列中。全局工作队列通常是一个数组或类似的数据结构,每个线程可以从这个队列中获取任务。
- 本地双端队列:每个线程都有一个本地双端队列,用于存放它分解出来的子任务。这样可以减少任务调度的开销,并且提高并发性能。
import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;public class ForkJoinSumDemo {public static void main(String[] args) {int[] numbers = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};int numThreads = 4;// 创建一个ForkJoinPoolForkJoinPool forkJoinPool = new ForkJoinPool(numThreads);// 计算数组的总和int sum = forkJoinPool.invoke(new SumTask(numbers, 0, numbers.length));// 输出结果System.out.println("Sum of the array is: " + sum);// 关闭线程池forkJoinPool.shutdown();try {while (!forkJoinPool.isTerminated()) {// 等待所有任务完成}} catch (Exception e) {e.printStackTrace();}System.out.println("All tasks completed.");}// 自定义任务类,继承自RecursiveTaskstatic class SumTask extends RecursiveTask<Integer> {private static final int THRESHOLD = 2; // 阈值,小于这个值的任务直接执行private int[] numbers; // 数组private int start; // 子数组的起始索引private int end; // 子数组的结束索引// 构造方法public SumTask(int[] numbers, int start, int end) {this.numbers = numbers;this.start = start;this.end = end;}// 重写compute方法,实现任务的递归分解和执行@Overrideprotected Integer compute() {int length = end - start;if (length <= THRESHOLD) {// 如果子数组长度小于等于阈值,则直接计算子数组的总和return computeSum(start, end);} else {// 否则,将任务分解为两个子任务int mid = start + length / 2;SumTask leftTask = new SumTask(numbers, start, mid);SumTask rightTask = new SumTask(numbers, mid, end);// 调用invokeAll方法,同时执行两个子任务invokeAll(leftTask, rightTask);// 返回两个子任务的结果之和return leftTask.join() + rightTask.join();}}// 计算子数组的总和private int computeSum(int start, int end) {int sum = 0;for (int i = start; i < end; i++) {sum += numbers[i];}return sum;}}
}
线程池常见阻塞队列
线程池内部结构
- 线程池管理器:主要负责管理线程池的创建、销毁、添加任务等管理操作。
- 工作线程:线程从任务队列中获取任务并执行
- 任务队列:一种缓冲机制,线程池会把当下没有处理的任务放入任务队列中,由于多线程同时从任务队列中获取任务是并发场景,此时就需要任务队列满足线程安全的要求,所以线程池中任务队列采用BlockingQueue来保障线程安全
阻塞队列
线程池 | 阻塞队列 |
---|---|
FixedThreadPool | LinkedBlockingQueue |
SingleThreadExecutor | LinkedBlockingQueue |
CachedThreadPool | SynchronousQueue |
ScheduledThreadPool | DelayedWorkQueue |
SingleThreadScheduledExecutor | DelayedWorkQueue |
LinkedBlockingQueue:使用的阻塞队列是容量为Integer.MAX_VALUE的LinkedBlockingQueue,可以认为是无界队列。由于线程池的任务队列永远不会放满,所以线程池只会创建核心线程数量的线程,此时的最大线程数对线程池来说没有意义,因为并不会触发生成多于核心线程数的线程。
SynchronousQueue:对应的线程池是CachedThreadPool线程池CachedThreadPool的最大线程数是Integer的最大值,可以理解为线程数是可以无限扩展的CachedThreadPool和上一种线程池FixedThreadPool的情况恰恰相反,FixedThreadPool的情况是阻塞队列的容量是无限的,而这里CachedThreadPool是线程数可以无限扩展所以CachedThreadPool线程池并不需要一个任务队列来存储任务,因为一旦有任务被提交就直接转发给线程或者创建新线程来执行,而不需要另外保存它们
DelayedWorkQueue:DelayedWorkQueue的特点是内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构之所以线程池ScheduledThreadPool和SingleThreadScheduledExecutor选择DelayedWorkQueue,是因为它们本身正是基于时间执行任务的,而延迟队列正好可以把任务按时间进行排序,方便任务的执行
为什么不应该自动创建线程池?
因为调用new的方法的话,创建的线程池都是没有限制的,当任务过多时,都有可能导致OOM。
合适的线程数量是多少?
任务分类
- cpu密集型任务
比如:加密、解密、压缩、计算等一系列需要大量耗费CPU资源的任务
对于这样的任务最佳的线程数为CPU核心数的1~2倍,如果设置过多的线程数,实际上并不会起到很好的效果
问题
此时假设我们设置的线程数量是CPU核心数的2倍以上,因为计算任务非常重,会占用大量的CPU资源,所以这时CPU的每个核心工作基本都是满负荷的,而我们又设置了过多的线程,每个线程都想去利用CPU资源来执行自已的任务,这就会造成不必要的上下文切换,此时线程数的增多并没有让性能提升,反而由于线程数量过多会导致性能下降
- 耗时io型任务
《Java并发编程实战》的作者BrainGoetz推荐的计算方法:线程数=CPU核心数 *(1+平均等待时间/平均工作时间)
通过这个公式,可以计算出一个合理的线程数量,如果任务的平均等待时间长,线程数就随之增加,而如果平均工作时间长,也就是对于我们上面的CPU密集型任务,线程数就随之减少
合适的线程参数设置
核心线程数
比如可能一段时间是CPU密集型,另一段时间是IO密集型,或是同时有两种任务相互混搭在这种情况下,可以把最大线程数设置成核心线程数的几倍,以便应对任务突发情况
更好的办法是用不同的线程池执行不同类型的任务,让任务按照类型区分开,而不是混杂在一起这样就可以按照上一课时估算的线程数或经过压测得到的结果来设置合理的线程数了,达到更好的性能
阻塞队列
LinkedBlockingQueue或者SynchronousQueue或者DelayedWorkQueue还有一种常用的阻塞队列叫ArrayBlockingQueue,也经常被用于线程池中这种阻塞队列内部是用数组实现的,在新建对象的时候要求传入容量值,且后期不能扩容,所以ArrayBlockingQueue的最大的特点就是容量是有限的。
线程工厂
创建时可以设置生成的线程名字的固定格式,方便我们根据业务定位线程问题。
拒接策略
除了AbortPolicy,DiscardPolicy,DiscardOldestPolicy或 CallerRunsPolicy还可以通过实现RejectedExecutionHandler接口来实现自己的拒绝策略
在接口中我们需要实现rejectedExecution方法,在rejectedExecution方法中,执行例如打印日志、暂存任务、重新执行等自定义的拒绝策略,以便满足业务需求
如何正确关闭线程池
shutdown()
它可以安全地关闭一个线程池,调用shutdown()方法之后线程池并不是立刻就被关闭,因为这时线程池中可能还有很多任务正在被执行,或是任务队列中有大量正在等待被执行的任务,调用shutdown()方法后线程池会在执行完正在执行的任务和队列中等待的任务后才彻底关闭。但这并不代表shutdown()操作是没有任何效果的调用shutdown()方法后如果还有新的任务被提交,线程池则会根据拒绝策略直接拒绝后续新提交的任务
isShutdown()
可以返回true或者false来判断线程池是否已经开始了关闭工作也就是是否执行了shutdown或者shutdownNow方法这里需要注意,如果调用isShutdown()方法的返回的结果为true并不代表线程池此时已经彻底关闭了,这仅仅代表线程池开始了关闭的流程也就是说,此时可能线程池中依然有线程在执行任务,队列里也可能有等待被执行的任务.
isTerminated()
这个方法可以检测线程池是否真正“终结”了
这不仅代表线程池已关闭,同时代表线程池中的所有任务都已经都执行完毕了比如此时已经调用了shutdown方法,但是有一个线程依然在执行任务,那么此时调用isShutdown方法返回的是true,而调用isTerminated方法返回的便是false,因为线程池中还有任务正在被执行,线程池并没有真正“终结”
直到所有任务都执行完毕了,调用isTerminated()方法才会返回true,这表示线程池已关闭并且线程池内部是空的,所有剩余的任务都执行完毕了
awaitTermination()
本身并不是用来关闭线程池的,而是主要用来判断线程池状态的
比如我们给awaitTermination方法传入的参数是10秒,那么它就会陷入10秒钟的等待
直到发生以下三种情况之一:
- 等待期间(包括进入等待状态之前)线程池已关闭并且所有已提交的任务(包括正在执行的和队列中等待的)都执行完毕,相当于线程池已经“终结”了,方法便会返回true
- 等待超时时间到后,第一种线程池“终结”的情况始终未发生,方法返回false
- 等待期间线程被中断,方法会抛出InterruptedException异常
shutdownNow()
功能类似与shutdown(),但是速度可以更快终止。
线程池线程复用原理
线程复用原理
线程池可以把线程和任务进行解耦,线程归线程,任务归任务,摆脱了之前通过Thread创建线程时的一个线程必须对应一个任务的限制。
其核心原理在于线程池对Thread进行了封装,并不是每次执行任务都会调用Thread.start()来创建新线程,而是让每个线程去执行一个“循环任务”,在这个“循环任务”中,不停地检查是否还有任务等待被执行,如果有则直接去执行这个任务,也就是调用任务的run方法,把run方法当作和普通方法一样的地位去调用,相当于把每个任务的run()方法串联了起来,所以线程数量并不增加。
线程复用源码解析
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 检查当前活动线程数是否少于核心线程数 // workerCountOf(c):计算当前活动线程数。if (workerCountOf(c) < corePoolSize) {// 尝试添加一个带有任务的新线程,并允许核心线程超时。if (addWorker(command, true))// 如果成功添加新线程,则直接返回。return;// 如果添加失败(可能是由于线程数已经达到核心线程数),重新获取 ctl 的值。c = ctl.get();}// isRunning(c):检查当前线程池是否处于运行状态。workQueue.offer(command):尝试将任务放入任务队列。if (isRunning(c) && workQueue.offer(command)) {// 如果成功放入任务队列,则进行以下检查:// 再次获取 ctl 的值。int recheck = ctl.get();// 如果线程池不再运行(可能已经关闭),则从任务队列中移除任务并拒绝任务。if (!isRunning(recheck) && remove(command))reject(command);// 如果当前没有活动线程else if (workerCountOf(recheck) == 0)// 则尝试添加一个空闲线程(不带任务)addWorker(null, false);}// 如果无法将任务放入任务队列,则尝试添加一个带有任务的新线程,并不允许核心线程超时。else if (!addWorker(command, false))// 如果添加失败,则拒绝任务。reject(command);
}
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&!wt.isInterrupted()))) {// If pool is stopping, ensure threads are interrupted// and task is null.w.cancel(false);continue;}try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}
}