当前位置: 首页 > news >正文

Java 入门指南:Java 并发编程 ——自定义 Java 线程池

线程池

线程池(ThreadPool)是一种用于管理和复用线程的机制,它可以预先创建一批线程,并维护一个线程队列,用于执行提交的任务。

点击查看:Java 线程池详解

线程池的主要目的是提高多线程应用程序的性能和效率,通过重用已创建的线程,减少线程的创建和销毁开销,避免频繁地创建线程和线程上下文切换的性能损耗。

线程池其实是一种池化的技术实现,池化技术的核心思想就是实现资源的复用,避免资源的重复创建和销毁带来的性能开销。

线程池可以管理一系列线程,让线程执行完任务之后不进行销毁,而是继续去处理其它线程已经提交的任务。

线程池的构造

Java 提供了一个内置的线程池实现,即 java.util.concurrent.Executors 类。通过 Executors 类可以创建不同类型的线程池,例如固定大小线程池、缓存线程池、定时线程池等。

![[Pasted image 20240904221918.png]]

  • corePoolSize:线程池中用来工作的核心线程数量。

  • maximumPoolSize:最大线程数,线程池允许创建的最大线程数。

  • keepAliveTime:超出 corePoolSize 后创建的线程存活时间或者是所有线程最大存活时间,取决于配置。

  • unit:keepAliveTime 的时间单位。

  • workQueue:任务队列,是一个阻塞队列,当线程数达到核心线程数后,会将任务存储在阻塞队列中。

  • threadFactory :线程池内部创建线程所用的工厂。

  • handler:拒绝策略;当队列已满并且线程数量达到最大线程数量时,会调用该方法处理任务。

自定义线程池

点击查看:Java 并发编程 —— Executor、Executors 构建线程池

虽然 Java 的 Executors 类提供了多种静态工厂方法来创建线程池,但在某些特定场景下,这些默认的线程池配置可能并不满足我们的需求。例如,我们可能需要自定义线程池的核心线程数、最大线程数、线程存活时间、任务队列容量等参数,以便更好地适应我们的业务场景和系统资源。此时,自定义线程池就显得尤为重要。

继承 ThreadPoolExecutor 类

首先继承 ThreadPoolExecutor 类,重写 execute() 方法,定义任务的执行逻辑。

自定义的线程池参数

之后,需要合理的设置线程池参数,包括核心线程数、最大线程数、线程闲置时间等参数。

ThreadPoolExecutor customExecutor = new ThreadPoolExecutor(2, // 核心线程数5, // 最大线程数60, // 非核心线程的空闲超时时间(秒)TimeUnit.SECONDS, // 时间单位new LinkedBlockingQueue<>(10), // 工作队列new CustomRejectedExecutionHandler() // 自定义拒绝策略
);

自定义线程池时,首先需要确定以下几个关键参数:

  • 核心线程数(corePoolSize):正常情况下,系统能同时工作的线程数。
  • 最大线程数(maximumPoolSize):极限情况下,线程池能拥有的最大线程数。
  • 空闲线程存活时间(keepAliveTime):非核心线程在没有任务的情况下,过多久要销毁。
  • 时间单位(unit):空闲线程存活时间的单位,如秒、分钟等。
  • 任务队列(workQueue):用于存放待执行的任务的队列。
  • 线程工厂(threadFactory):用于创建新线程的工厂,可以自定义线程的创建逻辑。
  • 拒绝策略(handler):当任务队列已满且无法再创建新线程时,对新增任务的处理策略。
线程数

线程数的设置主要取决于业务是 IO 密集型还是 CPU 密集型

  • CPU 密集型:指的是任务主要使用来进行大量的计算,没有什么导致线程阻塞。一般这种场景的线程数设置为 C P U 核心数 + 1 CPU 核心数+1 CPU核心数+1

  • IO 密集型:当执行任务需要大量的 IO,比如磁盘 IO,网络 IO,可能会存在大量的阻塞,所以在 IO 密集型任务中使用多线程可以大大地加速任务的处理。一般线程数设置为 2 × C P U 核心数 {2} \times {CPU 核心数} 2×CPU核心数

在 Java 中,可以通过 Runtime.getRuntime().availableProcessors() 获取 CPU 核心数

线程工厂

自定义一个线程工厂 threadFactory,实现 ThreadFactory 接口并重写其 newThread() 方法。newThread() 方法用于创建新的线程实例并返回。

通过实例化自定义的线程工厂,并传入自定义的线程名前缀,然后将其作为参数传递给线程池的构造函数,以创建指定的线程工厂。

自定义线程工厂能对线程进行更多的自定义设置,比如命名、守护线程、优先级等。这样可以更好地控制线程的属性和行为。

例如,构建线程的时候设置线程的名称,这样在查日志的时候就方便知道是哪个线程执行的代码。

public class MyThreadFactory implements ThreadFactory {private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;public MyThreadFactory(String poolName) {namePrefix = poolName + "-Thread";}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement());// 设置线程为守护线程(可选)t.setDaemon(false);// 设置线程优先级(可选)t.setPriority(Thread.NORM_PRIORITY);return t;}
}// 创建自定义线程工厂实例
ThreadFactory threadFactory = new MyThreadFactory("MyThreadPool");// 创建线程池并传入自定义线程工厂
ExecutorService executorService = Executors.newFixedThreadPool(5, threadFactory);// 提交任务给线程池执行
executorService.execute(new MyTask());
有界队列

需要设置有界队列的大小,比如 LinkedBlockingQueue 在构造的时候可以传入参数来限制队列中任务数据的大小,这样就不会因为无限向阻塞队列中添加任务导致系统的 OutofMemoryError

重写 afterExecute

对于线程池中的线程异常终止时的处理,可以重写 afterExecute() 方法来记录任务的执行状态、进行异常处理。

public class MyThreadPoolExecutor extends ThreadPoolExecutor {public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);}@Overrideprotected void afterExecute(Runnable r, Throwable t) {super.afterExecute(r, t);// 执行自定义操作if (t != null) {// 处理异常System.out.println("任务执行出现异常:" + t.getMessage());} else {// 记录任务的执行状态System.out.println("任务执行完成");}}
}

自定义 RejectedExecutionHandler

在创建自定义线程池实例时,使用自定义的 RejectedExecutionHandler 处理无法处理的任务。

public class MyRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {// 执行自定义操作System.out.println("任务被拒绝");// 可以选择抛出异常或者将任务放入队列中等待执行throw new RejectedExecutionException("任务被拒绝");}
}

实现自定义线程池

下面是一个详细的示例,展示如何创建一个自定义的线程池,并且在其中实现一些自定义的功能,如自定义线程工厂、拒绝策略等。

import java.util.concurrent.*;public class CustomThreadPoolExecutor extends ThreadPoolExecutor {public CustomThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);}@Overrideprotected void beforeExecute(Thread t, Runnable r) {super.beforeExecute(t, r);System.out.println("任务 " + r.hashCode() + " 即将由线程 " + t.getName() + " 执行");}@Overrideprotected void afterExecute(Runnable r, Throwable t) {super.afterExecute(r, t);System.out.println("任务 " + r.hashCode() + " 已经由线程 " + Thread.currentThread().getName() + " 执行完毕");}@Overrideprotected void terminated() {super.terminated();System.out.println("线程池已终止");}public static void main(String[] args) {// 创建一个自定义线程池CustomThreadPoolExecutor customExecutor = new CustomThreadPoolExecutor(2, // 核心线程数5, // 最大线程数60, // 非核心线程的空闲超时时间(秒)TimeUnit.SECONDS, // 时间单位new LinkedBlockingQueue<>(10), // 工作队列new NamedThreadFactory("CustomThreadPool"), // 自定义线程工厂new CustomRejectedExecutionHandler() // 自定义拒绝策略);// 提交任务给线程池for (int i = 0; i < 10; i++) {final int taskId = i;customExecutor.execute(new Runnable() {@Overridepublic void run() {processTask(taskId);}});}// 关闭线程池shutdownAndAwaitTermination(customExecutor);}private static void processTask(final int taskId) {System.out.println("正在执行任务 " + taskId + ",线程名称:" + Thread.currentThread().getName());try {// 模拟任务执行时间Thread.sleep((long) (Math.random() * 1000));} catch (InterruptedException e) {Thread.currentThread().interrupt();System.err.println("任务 " + taskId + " 被中断");}}private static void shutdownAndAwaitTermination(ThreadPoolExecutor executor) {executor.shutdown(); // 关闭线程池,不再接受新的任务try {if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {executor.shutdownNow(); // 如果等待超时,则强制关闭所有线程if (!executor.awaitTermination(60, TimeUnit.SECONDS))System.err.println("线程池未在规定时间内终止");}} catch (InterruptedException ie) {executor.shutdownNow(); // (Re-)Cancel if current thread also interruptedThread.currentThread().interrupt(); // Preserve interrupt status}}// 自定义线程工厂static class NamedThreadFactory implements ThreadFactory {private final String namePrefix;private int threadId = 1;public NamedThreadFactory(String namePrefix) {this.namePrefix = namePrefix;}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r, namePrefix + "-" + threadId++);t.setDaemon(false); // 设置为后台线程return t;}}// 自定义拒绝策略static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {if (!executor.isShutdown()) {System.err.println("任务 " + r.toString() + " 被拒绝,当前线程池状态:"+ "核心线程数=" + executor.getCorePoolSize()+ ",当前线程数=" + executor.getActiveCount()+ ",最大线程数=" + executor.getMaximumPoolSize()+ ",队列长度=" + executor.getQueue().size());}}}
}

线程池说明:

  1. 继承 ThreadPoolExecutor:创建了一个名为 CustomThreadPoolExecutor 的类,它继承自 ThreadPoolExecutor。这样做允许我们在 ThreadPoolExecutor 的基础上增加自定义的行为。
  2. 构造方法:在构造方法中,我们调用了父类 ThreadPoolExecutor 的构造方法,并传递了相同的参数,这些参数包括核心线程数、最大线程数、非核心线程的空闲超时时间、时间单位、工作队列、线程工厂以及拒绝策略。这些参数定义了线程池的行为。
  3. 重写 beforeExecute 方法 beforeExecute 方法在任务即将被执行之前被调用。在这里我们覆盖了该方法,以便在任务执行前记录一些日志信息。这可以帮助我们跟踪任务何时被分配给线程执行。
  4. 重写 afterExecute 方法afterExecute 方法在任务执行完毕后被调用。我们也覆盖了这个方法,以便在任务执行后记录一些信息。这对于了解任务的执行情况非常有用,特别是在调试阶段。
  5. 重写 terminated 方法:当线程池关闭并且所有任务都已执行完毕后,terminated 方法会被调用。我们覆盖了这个方法来记录线程池已经终止的信息。
  6. 任务执行方法 (processTask):这个方法定义了实际的任务逻辑。每个任务输出其任务 ID 和当前执行线程的名称,并模拟任务执行时间为随机的 0 到 1000 毫秒。如果任务执行过程中被中断,则设置当前线程的中断状态,并输出中断信息。
  7. 关闭线程池的方法 (shutdownAndAwaitTermination):这个方法负责关闭线程池,并等待所有任务执行完毕。它首先调用 executor.shutdown() 方法关闭线程池,不再接受新的任务。然后使用 awaitTermination 方法等待指定的时间,如果所有任务在此期间未能完成,则调用 shutdownNow 方法强制关闭所有线程。如果在等待过程中发生中断,则重新调用 shutdownNow 方法并设置当前线程的中断状态。
  8. 自定义线程工厂 (NamedThreadFactory):这个类用于创建带有命名前缀的新线程。每次调用 newThread 方法时,都会创建一个新的线程,并为其设置一个带有前缀和编号的名字,便于识别和调试。
  9. 自定义拒绝策略 (CustomRejectedExecutionHandler):当线程池无法接受新任务时,拒绝策略就会被触发。我们定义了一个自定义的拒绝策略,当任务被拒绝时,它会输出一条错误信息,并显示当前线程池的状态信息,包括核心线程数、当前活动线程数、最大线程数和队列长度。

http://www.mrgr.cn/news/20570.html

相关文章:

  • Linux-(系统启动、用户管理)
  • show命令监控分析mysql实例信息
  • 【FreeRTOS】内存管理
  • 内存管理篇-22 高端内存和低端内存的分界线
  • Newman生成测试报告排版混乱
  • Spring 源码解读:手动实现BeanFactory的加载与管理
  • C++的四种规范的类型转换
  • 动手学深度学习(pytorch)学习记录24-填充和步幅[学习记录]
  • java开发简历详解
  • 2024国赛数学建模C题思路模型
  • 设计模式之装饰器模式:让对象功能扩展更优雅的艺术
  • 为什么这么多物联网项目都失败了?
  • Python
  • OpenCV 旋转矩形边界
  • HTTP Cookie 和 session
  • 2024 年全国大学生数学建模竞赛(国赛)浅析
  • 利用高德API获取整个城市的公交路线并可视化(四)
  • 数据链路层认识以太网
  • 使用C语言实现字符推箱子游戏
  • C++学习笔记(7)