Java线程
Java 线程池详解
线程池是多线程编程中一种常用的技术,它用于管理和复用线程,减少线程创建和销毁的开销。本文将介绍 Java 中的线程池创建方式,并提供示例代码、参数说明以及如何选择合适的参数。
1、指定参数创建线程池
// 配置线程池参数
int corePoolSize = 10; // 核心线程数
int maxPoolSize = 30; // 最大线程数
long keepAliveTime = 5000; // 空闲线程存活时间 (毫秒)
TimeUnit unit = TimeUnit.MILLISECONDS; // 时间单位
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(50); // 任务队列RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy(); // 拒绝策略// 创建线程池
ThreadPoolExecutor executorService = new ThreadPoolExecutor(corePoolSize,maxPoolSize,keepAliveTime,unit,workQueue,Executors.defaultThreadFactory(),handler
);// 提交任务到线程池
for (int i = 0; i < 100; i++) {final int taskId = i;final Future<String> submit = executorService.submit(() -> {System.out.println("Task " + taskId + " is running in thread " + Thread.currentThread().getName());return "aaa";// Thread.sleep(1000); // 模拟任务执行时间});final String name = submit.get();System.out.println("name:"+ name);
}
2、使用Java 提供了 Executors 工具类来方便地创建线程池
最常用的方法有:
- newFixedThreadPool(int nThreads):创建一个固定大小的线程池。
- newCachedThreadPool():创建一个可缓存的线程池。
- newSingleThreadExecutor():创建一个单线程的线程池。
- newScheduledThreadPool(int corePoolSize):创建一个定时任务线程池。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class SimpleThreadPoolExample {public static void main(String[] args) {// 创建一个固定大小的线程池ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);for (int i = 0; i < 10; i++) {final int taskId = i;fixedThreadPool.submit(() -> {System.out.println("Task " + taskId + " is executed by " + Thread.currentThread().getName());});}fixedThreadPool.shutdown();}
}
3、springboot项目中创建
3.1、异步执行
- 创建配置类
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;@Configuration
public class ThreadPoolConfig {@Beanpublic ThreadPoolTaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(5); // 核心线程数executor.setMaxPoolSize(10); // 最大线程数executor.setQueueCapacity(50); // 任务队列容量executor.setThreadNamePrefix("MyExecutor-"); // 线程名称前缀executor.initialize(); // 进行初始化return executor;}
}
- 创建异步任务
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;import java.util.concurrent.TimeUnit;@Service
public class MyAsyncService {@Async("taskExecutor") // 指定使用的线程池public void performAsyncTask(String taskName) {System.out.println(taskName + " is executing in thread: " + Thread.currentThread().getName());try {// 模拟长时间的任务TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(taskName + " completed in thread: " + Thread.currentThread().getName());}
}
- 开启异步
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;@SpringBootApplication
@EnableAsync // 启用异步功能
public class AsyncApplication {public static void main(String[] args) {SpringApplication.run(AsyncApplication.class, args);}
}
3.2、同步执行
- 创建配置类
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;@Configuration
public class ThreadPoolConfig {@Beanpublic ThreadPoolTaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(5); // 核心线程数executor.setMaxPoolSize(10); // 最大线程数executor.setQueueCapacity(50); // 任务队列容量executor.setThreadNamePrefix("MyExecutor-"); // 线程名称前缀executor.initialize(); // 进行初始化return executor;}
}
- 创建同步服务
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;@Service
public class MySyncService {@Autowiredprivate ThreadPoolTaskExecutor taskExecutor;public void performTask(String taskName) {// 这里使用 submit 方法同步地执行任务Future<String> future = taskExecutor.submit(() -> {System.out.println(taskName + " is executing in thread: " + Thread.currentThread().getName());// 模拟长时间的任务try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}return taskName + " completed";});try {// 阻塞主线程直到任务完成String result = future.get();System.out.println(result);} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}}
}
上面使用线程池执行任务的逻辑写在一块了,也可以把任务放到单独的地方
- 实现Runnable或者Callable接口,在实现的方法中写具体的任务逻辑,最后把任务放入线程池即可
下面展示了两种方式执行任务
package com.example.demo;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** @author chenwx* @date 2024/9/12*/
public class RunnableExample {public static void main(String[] args) {final Runnable runnableRun = () -> System.out.println("Runnable run");// 1、使用Thread 执行线程new Thread(runnableRun).start();// 2、使用Executors 执行线程final ExecutorService executorService = Executors.newFixedThreadPool(1);executorService.execute(runnableRun);}}
- 使用lambda表达式创建任务,最后把任务放入线程池即可
package com.example.demo;import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;/*** @author chenwx* @date 2024/9/12*/
public class CallableExample {public static void main(String[] args) {final ExecutorService executorService = Executors.newFixedThreadPool(1);final Callable<Integer> callable = () -> {return 1 + 1;};final Future<Integer> future = executorService.submit(callable);try {System.out.println(future.get());} catch (Exception e) {e.printStackTrace();} finally {executorService.shutdown();}}
}
4、Runnable 和 Callable
Runnable 和 Callable 是 Java 中两个代表可执行任务的接口,它们有一些相似之处,但也有几个重要的区别。以下是它们之间的主要区别:
4.1、返回值
Runnable: Runnable 接口的 run() 方法没有返回值。它的设计目的是不需要返回任何结果,通常用于执行一些任务但不关心结果。
public interface Runnable {void run();
}
Callable: Callable 接口的 call() 方法可以返回一个结果,并且可以抛出检查异常。它可以用于执行任务同时也可以获取结果。
public interface Callable<V> {V call() throws Exception;
}
4.2、 异常处理
Runnable: run() 方法不能抛出检查异常(checked exceptions)。如果需要处理异常,必须在方法内部捕获并处理。
Callable: call() 方法可以抛出检查异常。这为处理可能发生的异常提供了更多灵活性。
4.3、用法
Runnable: 通常与 Thread 类配合使用,也可以通过 ExecutorService 提交以执行线程。
Runnable task = () -> {System.out.println("Running a Runnable task");
};
new Thread(task).start();
Callable: 通常与 ExecutorService 接口配合使用,可以使用 Future 接口获取结果。
Callable<Integer> task = () -> {return 5 + 3;
};
Future<Integer> future = executorService.submit(task);
Integer result = future.get(); // 获取结果
4.4、 适用场景
-
Runnable: 当你只需要执行某个任务而不关心任务的执行结果时,可以使用 Runnable。例如,处理日志、执行清理任务等。
-
Callable: 当你需要执行任务且关心结果或可能遇到的异常时,应该使用 Callable。例如,进行复杂计算或获取从外部资源(如数据库、网络)读取的数据。
5、线程池参数
5.1、详解
- corePoolSize:核心线程数,线程池中始终保持的线程数量。
- maximumPoolSize:最大线程数,线程池中允许的最大线程数量。
- keepAliveTime:当线程池中的线程数量超过核心线程数时,多余的空闲线程在此时间内会被终止。
- workQueue:任务队列,用于存放等待执行的任务。
- threadFactory:线程工厂,用于创建新线程。
- handler:饱和策略,当任务队列满且达到最大线程数时,采取的策略(如:丢弃任务、抛出异常等)。
5.2、如何选择合适的参数
corePoolSize:可以根据系统的 CPU 核心数来设置,一般使用 Runtime.getRuntime().availableProcessors()。
maximumPoolSize:通常设置为 corePoolSize * 2,以避免系统过度负载。
keepAliveTime:设置为较长的时间(如 60s),以确保可以应对突发任务。
workQueue:如采用 LinkedBlockingQueue 可以处理较多的任务,但会增加内存占用。
5.3、线程池中创建线程
在核心线程在第一次提交任务时才会被创建,线程池创建时不会自动创建核心线程
假设核心线程3个,最大线程5个,队列长度10
- 目前没有任务的时候:核心线程还没被创建,核心线程为0
- 任务1被提交:创建一个线程,核心线程为1
- 任务2被提交:创建一个线程,核心线程为2
- 任务3被提交:创建一个线程,核心线程为3
- 任务4被提交:此时核心线程已被占满,新提交的线程会放入队列,队列存在线程数量为1
- 任务5被提交:此时核心线程已被占满,新提交的线程会放入队列,队列存在线程数量为2
- 任务6被提交:此时核心线程已被占满,新提交的线程会放入队列,队列存在线程数量为3
- 任务7被提交:此时核心线程已被占满,新提交的线程会放入队列,队列存在线程数量为4
- 任务8被提交:此时核心线程已被占满,新提交的线程会放入队列,队列存在线程数量为5
- 任务9被提交:此时核心线程已被占满,新提交的线程会放入队列,队列存在线程数量为6
- 任务10被提交:此时核心线程已被占满,新提交的线程会放入队列,队列存在线程数量为7
- 任务11被提交:此时核心线程已被占满,新提交的线程会放入队列,队列存在线程数量为8
- 任务12被提交:此时核心线程已被占满,新提交的线程会放入队列,队列存在线程数量为9
- 任务13被提交:此时核心线程已被占满,新提交的线程会放入队列,队列存在线程数量为10
- 任务14被提交:此时核心线程已被占满,队列也被占满。新提交的任务会创建线程去执行。核心线程为3,队列线程为10,非核心活跃线程为1
- 任务15被提交:此时核心线程已被占满,队列也被占满。新提交的任务会创建线程去执行。核心线程为3,队列线程为10,非核心活跃线程为2
-
- 任务15被提交:此时核心线程已被占满,队列也被占满,活跃线程数量达到最大线程。开始执行饱和策略
6、总结
CPU密集型任务(普通计算):
尽量使用较小的线程池,一般为CPU核心数+1。
因为CPU密集型任务使得CPU使用率很高,若开过多的线程数,只能增加上下文切换的次数,因此会带来额外的开销。
IO密集型任务(文件,数据库操作):
可以使用稍大的线程池,一般为2*CPU核心数+1。
因为IO操作不占用CPU,不要让CPU闲下来,应加大线程数量,因此可以让CPU在等待IO的时候去处理别的任务,充分利用CPU时间。
混合型任务:
可以将任务分成IO密集型和CPU密集型任务,然后分别用不同的线程池去处理。
只要分完之后两个任务的执行时间相差不大,那么就会比串行执行来的高效。
因为如果划分之后两个任务执行时间相差甚远,那么先执行完的任务就要等后执行完的任务,最终的时间仍然取决于后执行完的任务,而且还要加上任务拆分与合并的开销,得不偿失
依赖其他资源
如某个任务依赖数据库的连接返回的结果,这时候等待的时间越长,则CPU空闲的时间越长,那么线程数量应设置得越大,才能更好的利用CPU。
借鉴别人的文章 对线程池大小的估算公式:
最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目
比如平均每个线程CPU运行时间为0.5s,而线程等待时间(非CPU运行时间,比如IO)为1.5s,CPU核心数为8,那么根据上面这个公式估算得到:((0.5+1.5)/0.5)*8=32。
可以得出一个结论:
线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。