当前位置: 首页 > news >正文

线程池

文章目录

  • 线程池
    • 手写简化版线程池
    • ThreadPoolExecutor
      • 核心构造参数
      • 重要方法
      • 工作队列
      • 任务拒绝策略
      • 示例
    • newFixedThreadPool
    • newCachedThreadPool
    • newSingleThreadExecutor
    • ScheduledExecutorService

线程池

手写简化版线程池

import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;@Slf4j
public class Test07 {public static void main(String[] args) throws InterruptedException {MyThreadPoll myThreadPoll = new MyThreadPoll(2, 2, 1000, TimeUnit.MILLISECONDS, (queue, task) -> {log.error("任务添加失败:{}", task);});List<Thread> threads = new ArrayList<>();for(int i=0;i<6;i++){int j = i;Thread thread = new Thread(() -> {log.info("正在执行任务{}", j + 1);});threads.add(thread);}for(Thread t: threads){myThreadPoll.execute(t);}}
}class BlockQueue<T>{private Deque<T> queue = new ArrayDeque<>();private Lock lock = new ReentrantLock();private Condition condition1 = lock.newCondition();private Condition condition2 = lock.newCondition();private int capacity;public BlockQueue(int capacity){this.capacity = capacity;}public T take() {lock.lock();try {while(true){if(!queue.isEmpty()){T poll = queue.poll();condition2.signal();return poll;}try {condition1.await();} catch (InterruptedException e) {throw new RuntimeException(e);}}} finally {lock.unlock();}}public T poll(long time, TimeUnit unit) {lock.lock();try {while(true){if(!queue.isEmpty()){T poll = queue.poll();condition2.signal();return poll;}if(time <=0 )return null;try {time = condition1.awaitNanos(unit.toNanos(time));} catch (InterruptedException e) {throw new RuntimeException(e);}}} finally {lock.unlock();}}public void add(T element) {lock.lock();try {while (true){if(queue.size()<capacity){queue.offer(element);condition1.signal();return;}try {condition2.await();} catch (InterruptedException e) {throw new RuntimeException(e);}}} finally {lock.unlock();}}public void offer(T element,long time,TimeUnit unit) {lock.lock();try{while(true){if(queue.size()<capacity){queue.offer(element);condition1.signal();return;}if(time <=0 )return;try {time = condition2.awaitNanos(unit.toNanos(time));} catch (InterruptedException e) {throw new RuntimeException(e);}}}finally {lock.unlock();}}public void tryput(T element,RejectPolicy rejectPolicy){lock.lock();try {if(queue.size() < capacity){queue.add(element);condition1.signal();}else{rejectPolicy.reject(this,element);}} finally {lock.unlock();}}public int size(){lock.lock();try {return queue.size();} finally {lock.unlock();}}}class MyThreadPoll{private BlockQueue<Runnable> workQueue;private HashSet<Work> works = new HashSet<>();private int coreSize;private long time;private TimeUnit unit;private RejectPolicy<Runnable> rejectPolicy;public MyThreadPoll(int queueSize,int coreSize,long time,TimeUnit unit,RejectPolicy rejectPolicy){workQueue = new BlockQueue<>(queueSize);this.coreSize = coreSize;this.time = time;this.unit = unit;this.rejectPolicy = rejectPolicy;}public void execute(Runnable runnable){if(works.size() < coreSize){Work work = new Work(runnable);work.start();works.add(work);} else{workQueue.tryput(runnable,rejectPolicy);}}class Work extends Thread{private Runnable task;public Work(Runnable runnable){this.task = runnable;}@Overridepublic void run() {while(task != null || (task = workQueue.poll(time,unit)) != null){try {task.run();} catch (Exception e) {e.printStackTrace();} finally {task = null;}}synchronized (works){works.remove(this);}}}}@FunctionalInterface
interface RejectPolicy<T>{void reject(BlockQueue<T> queue,T task);
}

请添加图片描述

ThreadPoolExecutor

ThreadPoolExecutor 提供了一个线程池管理器,可以用来执行异步任务。线程池可以有效地重用线程,减少了线程创建和销毁的开销,提高系统响应速度,并允许控制并发任务的数量。

核心构造参数

  • corePoolSize:线程池中的核心线程数,即使它们是空闲的,也会保持存活。
  • maximumPoolSize:线程池中允许的最大线程数。
  • keepAliveTime:当线程数大于核心线程数时,这是多余空闲线程在终止前等待新任务的最长时间。
  • unitkeepAliveTime 参数的时间单位。
  • workQueue:用于在执行任务之前保存任务的队列。这个队列仅保存由 execute 方法提交的 Runnable 任务。
  • threadFactory:执行程序创建新线程时使用的工厂。
  • handler:当线程池已经达到最大大小并且队列已满时,用于处理无法执行的任务的处理器。

重要方法

  • execute(Runnable command):执行给定的任务。
  • submit(Runnable task)submit(Callable<T> task):提交任务以执行,并返回代表该任务的 Future
  • shutdown():平滑地关闭线程池,不再接受新任务,但会执行完已提交的任务。
  • shutdownNow():尝试停止所有正在执行的任务,停止处理正在等待的任务,并返回等待执行的任务列表。

工作队列

ThreadPoolExecutor 支持多种类型的队列,如:

  • SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等待另一个线程的对应移除操作。
  • LinkedBlockingQueue:基于链表结构的可选有界队列。
  • ArrayBlockingQueue:基于数组结构的有界队列。

任务拒绝策略

当线程池和队列都满了,新提交的任务将会被拒绝。ThreadPoolExecutor 提供了以下几种拒绝策略:

  • AbortPolicy:默认策略,抛出 RejectedExecutionException 异常。
  • CallerRunsPolicy:由调用者线程直接执行该任务。
  • DiscardPolicy:默默地丢弃无法执行的任务。
  • DiscardOldestPolicy:丢弃在队列中最旧的任务,然后重试执行程序(如果再次失败,重复此过程)。

示例

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
public class Test08 {public static void main(String[] args) {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,5000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(3),                                                              Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardPolicy());List<Future<Long>> list = new ArrayList<>();for(int i=0;i<10;i++){Future<Long> res = threadPoolExecutor.submit(new Task());list.add(res);}for(Future<Long> future:list){try {System.out.println(future.get());} catch (InterruptedException e) {throw new RuntimeException(e);} catch (ExecutionException e) {throw new RuntimeException(e);}}threadPoolExecutor.shutdown();}public static class Task implements Callable<Long>{@Overridepublic Long call() throws Exception {return Thread.currentThread().getId();}}
}

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}

特点:核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间。阻塞队列是无界的,可以放任意数量的任务,适用于任务量已知,相对耗时的任务

newCachedThreadPool

public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}

特点:

  • 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着 全部都是救急线程(60s 后可以回收)
  • 救急线程可以无限创建 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货)
  • 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线程。 适合任务数比较密集,但每个任务执行时间较短的情况

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}

使用场景: 希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程 也不会被释放。

区别:

  • 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一 个线程,保证池的正常工作 Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改
  • FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因 此不能调用 ThreadPoolExecutor 中特有的方法
  • Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改 对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改

ScheduledExecutorService

任务调度线程池:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// 添加两个任务,希望它们都在 1s 后执行
executor.schedule(() -> {System.out.println("任务1,执行时间:" + new Date());try { Thread.sleep(2000); } catch (InterruptedException e) { }
}, 1000, TimeUnit.MILLISECONDS);
executor.schedule(() -> {System.out.println("任务2,执行时间:" + new Date());
}, 1000, TimeUnit.MILLISECONDS);

以固定速率执行:

ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleAtFixedRate(() -> {log.debug("running...");
}, 1, 1, TimeUnit.SECONDS);

http://www.mrgr.cn/news/20602.html

相关文章:

  • RISC-V (八)定时器中断
  • PWR电源控制(低功耗模式)
  • 【CSS】background样式没有生效
  • Redis 事务揭秘:如何确保数据一致性
  • LeetCode 3174.清除数字:一个不用栈的方法
  • 嵌入式OpenHarmony系统的一些特点
  • 高级java每日一道面试题-2024年9月04日-前端篇-前端的框架分类有哪些?
  • Linux驱动环境配置
  • 【灰度图图像间转换】
  • (二十九)STL map容器(映射)与STL pair容器(值对)
  • Excel如何把表格变成图表
  • 数据结构(15)——哈希表(2)
  • pyro.optim pyro ppl 概率编程 优化器 pytorch
  • 《机器学习》—— PCA降维
  • pytorch对不同的可调参数,分配不同的学习率
  • c# Csv文件读写示例,如果文件存在追加写入
  • Word封面对齐技巧
  • 【PyQt6 应用程序】解说+原声视频混剪无显卡精简版,无显卡可用
  • 每日OJ_牛客_解读密码(简单模拟)
  • QT教程:start()和startTimer()的区别