当前位置: 首页 > news >正文

AI大模型编写多线程并发框架(六十三):监听器优化·上

系列文章目录


文章目录

  • 系列文章目录
  • 前言
    • 一、项目背景
    • 二、第七轮对话-补充异步执行代码
    • 三、第八轮对话-增加累计完成等字眼
    • 四、第九轮对话-线程安全优化
    • 五、第十轮对话-增加运行时数据
    • 七、参考文章


前言

在这个充满技术创新的时代,AI大模型正成为开发者们的新宠。它们可以帮助我们完成从简单的问答到复杂的编程任务,所以AI编程将会是未来的主流方向,利用AI大模型的能力,本文将介绍从零到一用AI大模型编写一个多线程并发框架。

一、项目背景

经过上两篇文章和AI的对话,我们基本捣鼓出来了多线程并发框架的雏形,接下来是比较重要的监控模块,本文将会通过比较长的篇幅来叙述。

本多线程框架使用示例如下:源码地址
1、引入依赖。

<dependency><groupId>io.github.vipjoey</groupId><artifactId>mmc-juc</artifactId><version>1.0</version>
</dependency>

2、使用示例。


// 创建一个MmcTaskExecutor实例,用于执行单次长耗时任务
// 下面是创建一个计算从1加到100的任务,总共100个任务,采用fork分治算法,阈值为10,总共任务为100 / 10 * 2 = 20个大任务,执行速率约为10/s
MmcTaskExecutor<Integer, Integer> mmcTaskExecutor = MmcTaskExecutor.<Integer, Integer>builder().taskSource(IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList())) // 设置任务源.taskProcessor(x -> x.stream().reduce(0, Integer::sum)) // 设置任务处理方法.taskMerger(Integer::sum) // 设置结果处理方法(可选).threshold(10) // 设置任务处理阈值(可选).taskName("mmcTaskExample") // 设置任务名称.rateLimiter(10, 20)  // 设置速率限制,容量为10,每秒产生令牌为20,休眠时间为10ms.forkJoinPoolConcurrency(4) // 设置ForkJoinPool的并发度为4.build();// 同步执行并打印结果        
System.out.println("result: " + mmcTaskExecutor.execute());// 任务执行过程监控
[mmcTaskExample] Tasks submitted. Total tasks: 100
[mmcTaskExample] Task started. Completed tasks: 14, remaining tasks: 86
[mmcTaskExample] Task started. Completed tasks: 14, remaining tasks: 86
[mmcTaskExample] Task started. Completed tasks: 20, remaining tasks: 80
[mmcTaskExample] Task started. Completed tasks: 26, remaining tasks: 74
[mmcTaskExample] Task started. Completed tasks: 33, remaining tasks: 67
[mmcTaskExample] Task started. Completed tasks: 40, remaining tasks: 60
[mmcTaskExample] Task started. Completed tasks: 46, remaining tasks: 54
[mmcTaskExample] Task started. Completed tasks: 52, remaining tasks: 48
[mmcTaskExample] Task started. Completed tasks: 58, remaining tasks: 42
[mmcTaskExample] Task started. Completed tasks: 64, remaining tasks: 36
[mmcTaskExample] Task started. Completed tasks: 70, remaining tasks: 30
[mmcTaskExample] Task started. Completed tasks: 76, remaining tasks: 24
[mmcTaskExample] Task started. Completed tasks: 82, remaining tasks: 18
[mmcTaskExample] Task started. Completed tasks: 88, remaining tasks: 12
[mmcTaskExample] Task started. Completed tasks: 94, remaining tasks: 6
[mmcTaskExample] Task started. Completed tasks: 100, remaining tasks: 0
[mmcTaskExample] Tasks completed. Elapsed time: 1107 ms 
result: 5050

二、第七轮对话-补充异步执行代码

1、参考限流器的对话,我们知道AI是不会主动帮忙抽象公共接口的,所以这里我们干脆要求AI给我们定义好监听器接口,方便后续扩展。
在这里插入图片描述

2、下面是AI给的回复。

  • 【为了实现监听器功能,我们首先需要创建一个监听器接口MmcTaskListener,然后在MmcTaskExecutor类中使用它。我们还将提供一个默认实现DefaultMmcTaskListener,并允许用户通过Builder类替换监听器。首先,我们创建一个MmcTaskListener接口。】
public interface MmcTaskListener {void onTaskStarted(int totalTasks, int remainingTasks);void onTaskCompleted(int totalTasks, int remainingTasks, long elapsedTime);
}
  • 【接下来,我们创建一个默认的监听器实现DefaultMmcTaskListener。】
public class DefaultMmcTaskListener implements MmcTaskListener {@Overridepublic void onTaskStarted(int totalTasks, int remainingTasks) {System.out.println("Task started. Total tasks: " + totalTasks + ", remaining tasks: " + remainingTasks);}@Overridepublic void onTaskCompleted(int totalTasks, int remainingTasks, long elapsedTime) {System.out.println("Task completed. Total tasks: " + totalTasks + ", remaining tasks: " + remainingTasks + ", elapsed time: " + elapsedTime + " ms");}
}
  • 【然后,我们修改MmcTaskExecutor类以使用MmcTaskListener接口。】
import java.util.List;
import java.util.concurrent.ForkJoinPool;public class MmcTaskExecutor<T, R> {// ...省略其他成员变量private MmcTaskListener taskListener;private MmcTaskExecutor(Builder<T, R> builder) {// ...省略其他初始化代码this.taskListener = builder.taskListener;}// ...省略其他方法public static class Builder<T, R> {// ...省略其他成员变量private MmcTaskListener taskListener = new DefaultMmcTaskListener(); // 默认使用DefaultMmcTaskListener// ...省略其他方法public Builder<T, R> taskListener(MmcTaskListener taskListener) {this.taskListener = taskListener;return this;}// ...省略其他方法}
}
  • 【最后,我们需要修改MmcTask类以调用监听器的方法。】
import java.util.List;
import java.util.concurrent.RecursiveTask;public class MmcTask<T, R> extends RecursiveTask<R> {// ...省略其他成员变量private MmcTaskListener taskListener;public MmcTask(List<T> taskSource, MmcTaskProcessor<T, R> taskProcessor, MmcTaskMerger<R> taskMerger, int threshold, int start, int end, RateLimiter rateLimiter, MmcTaskListener taskListener) {// ...省略其他初始化代码this.taskListener = taskListener;}@Overrideprotected R compute() {// ...省略其他代码// 在处理任务之前调用onTaskStarted方法taskListener.onTaskStarted(taskSource.size(), end - start);// ...省略其他代码// 在任务完成后调用onTaskCompleted方法long elapsedTime = System.currentTimeMillis() - startTime;taskListener.onTaskCompleted(taskSource.size(), end - start - 1, elapsedTime);// ...省略其他代码}
}

本轮AI的回复有些欠缺,它并没有GET到我的诉求,可能跟我的提问方式有关。实际上我需要监控的是TaskSource集合被执行完成的进度,AI只是监控了compute方法,也就是监控每个阈值下的小任务的执行,这明显是错的。

三、第八轮对话-增加累计完成等字眼

1、由于上一轮对话,可能是提问方式不对,我重新增加【首次】、【累计】这些字眼,希望AI能理解我的意思。
在这里插入图片描述

2、下面是AI大模型给的回复。

  • 【为了满足您的要求,我们需要修改MmcTaskListener接口以添加新的事件方法,然后修改DefaultMmcTaskListener以实现这些方法。接下来,我们将在MmcTaskExecutor类和MmcTask类中调用这些方法。首先,我们修改MmcTaskListener接口以添加新的事件方法。】
public interface MmcTaskListener {void onTasksSubmitted(int totalTasks, long startTime);void onTaskStarted(int completedTasks, int remainingTasks);void onTasksCompleted(long elapsedTime, long endTime);
}
  • 【接下来,我们修改DefaultMmcTaskListener以实现新的事件方法。】
public class DefaultMmcTaskListener implements MmcTaskListener {@Overridepublic void onTasksSubmitted(int totalTasks, long startTime) {System.out.println("Tasks submitted. Total tasks: " + totalTasks + ", start time: " + startTime);}@Overridepublic void onTaskStarted(int completedTasks, int remainingTasks) {System.out.println("Task started. Completed tasks: " + completedTasks + ", remaining tasks: " + remainingTasks);}@Overridepublic void onTasksCompleted(long elapsedTime, long endTime) {System.out.println("Tasks completed. Elapsed time: " + elapsedTime + " ms, end time: " + endTime);}
}
  • 【然后,我们需要修改MmcTaskExecutor类以调用onTasksSubmitted和onTasksCompleted方法。】
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;public class MmcTaskExecutor<T, R> {// ...省略其他成员变量和方法private AtomicInteger completedTasks = new AtomicInteger(); // 添加一个原子整数以跟踪已完成的任务数量// 同步执行并返回结果public R execute() {MmcTask<T, R> mmcTask = new MmcTask<>(taskSource, taskProcessor, taskMerger, threshold, 0, taskSource.size(), rateLimiter, taskListener, completedTasks);// 调用onTasksSubmitted方法taskListener.onTasksSubmitted(taskSource.size(), System.currentTimeMillis());R result = forkJoinPool.invoke(mmcTask);// 调用onTasksCompleted方法taskListener.onTasksCompleted(mmcTask.getElapsedTime(), System.currentTimeMillis());return result;}// ...省略其他方法
}
  • 【最后,我们需要修改MmcTask类以调用onTaskStarted方法。】
import java.util.List;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.atomic.AtomicInteger;public class MmcTask<T, R> extends RecursiveTask<R> {// ...省略其他成员变量和方法private AtomicInteger completedTasks;public MmcTask(List<T> taskSource, MmcTaskProcessor<T, R> taskProcessor, MmcTaskMerger<R> taskMerger, int threshold, int start, int end, RateLimiter rateLimiter, MmcTaskListener taskListener, AtomicInteger completedTasks) {// ...省略其他初始化代码this.completedTasks = completedTasks;}@Overrideprotected R compute() {// 在处理任务之前调用onTaskStarted方法taskListener.onTaskStarted(completedTasks.get(), end - start);// ...省略其他代码// 在任务完成后更新已完成任务的计数completedTasks.incrementAndGet();// ...省略其他代码}
}

从本次AI大模型的回复来看,存在以下问题:

  • 它似乎不理解任务拆分前后计数的区别,实际上后续我尝试了N种问法,它都不理解,它理解的任务累计完成个数就是completedTasks.incrementAndGet();而不是我想要的taskRuntime.getCompletedTasks().addAndGet(end - start);
  • 它不会主动发现线程安全问题,这个跟我们提问的要求有关,毕竟是生成式AI,所以它不会考虑任务提交时需要重置AtomicInteger。
  • 同理,它的剩余任务个数的计算方式也是错误的,它的答案是end - start,而不是我想要的taskSource.size() - 累计完成任务个数

不过,这些都是小问题,不用太纠结,我们继续提诉求去优化。

四、第九轮对话-线程安全优化

1、从AI大模型给出的代码来看,是没有考虑线程安全问题的,我们需要补充一下诉求:
在这里插入图片描述

2、下面是AI的回复。

  • 【为了满足您的要求,我们将对MmcTaskExecutor类进行修改以确保线程安全,并在每次提交任务时重置计数器。我们将使用原子整数AtomicInteger来实现线程安全的计数器。首先,我们将修改MmcTaskExecutor类以添加一个原子整数completedTasks以跟踪已完成的任务数量。我们还将在每次执行任务时重置completedTasks。】
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;public class MmcTaskExecutor<T, R> {// ...省略其他成员变量和方法private AtomicInteger completedTasks = new AtomicInteger(); // 添加一个原子整数以跟踪已完成的任务数量// 同步执行并返回结果public R execute() {completedTasks.set(0); // 重置已完成任务的计数器MmcTask<T, R> mmcTask = new MmcTask<>(taskSource, taskProcessor, taskMerger, threshold, 0, taskSource.size(), rateLimiter, taskListener, completedTasks);// 调用onTasksSubmitted方法taskListener.onTasksSubmitted(taskSource.size(), System.currentTimeMillis());R result = forkJoinPool.invoke(mmcTask);// 调用onTasksCompleted方法taskListener.onTasksCompleted(mmcTask.getElapsedTime(), System.currentTimeMillis());return result;}// ...省略其他方法
}
  • 【接下来,我们将修改MmcTask类以确保线程安全并调用监听器的方法。】
import java.util.List;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.atomic.AtomicInteger;public class MmcTask<T, R> extends RecursiveTask<R> {// ...省略其他成员变量和方法private AtomicInteger completedTasks;public MmcTask(List<T> taskSource, MmcTaskProcessor<T, R> taskProcessor, MmcTaskMerger<R> taskMerger, int threshold, int start, int end, RateLimiter rateLimiter, MmcTaskListener taskListener, AtomicInteger completedTasks) {// ...省略其他初始化代码this.completedTasks = completedTasks;}@Overrideprotected R compute() {// 在处理任务之前调用onTaskStarted方法taskListener.onTaskStarted(completedTasks.get(), end - start);// ...省略其他代码// 在任务完成后更新已完成任务的计数int completed = completedTasks.incrementAndGet();// 调用onTaskStarted方法,以便在每个小任务完成时更新已完成任务的计数taskListener.onTaskStarted(completed, end - start - completed);// ...省略其他代码}
}

这次AI大模型的回复也是不尽人意,如果多个MmcTask任务同时执行的话,计数器是不准确的,我们继续耐心优化一下我们的提示词。

五、第十轮对话-增加运行时数据

1、由于我们设计的初衷,任务可以多次提交的。所以每个任务运行时应该产生运行时数据,我们监控的也是运行时数据,运行时数据需要一个唯一标识,也就是任务名称来与之关联。另外,由于MmcTask的构造函数过长,我在其它对话中让它优化过了,这里就不详细讲述。
在这里插入图片描述

2、以下是AI的回复。

  • 【为了满足您的要求,我们可以创建一个新的类TaskRuntime,它将包含任务名称、任务源总数、已完成任务数和剩余任务数等信息。然后,我们将修改MmcTask和MmcTaskListener以使用TaskRuntime。首先,我们创建一个新的类TaskRuntime。】
public class TaskRuntime {private String taskName;private int totalTasks;private AtomicInteger completedTasks;private AtomicInteger remainingTasks;public TaskRuntime(String taskName, int totalTasks) {this.taskName = taskName;this.totalTasks = totalTasks;this.completedTasks = new AtomicInteger();this.remainingTasks = new AtomicInteger(totalTasks);}// ...省略getter和setter方法
}
  • 【接下来,我们修改MmcTaskListener接口以使用TaskRuntime。】
public interface MmcTaskListener {void onTasksSubmitted(TaskRuntime taskRuntime, long startTime);void onTaskStarted(TaskRuntime taskRuntime);void onTasksCompleted(TaskRuntime taskRuntime, long elapsedTime, long endTime);
}
  • 【然后,我们修改MmcTask以使用TaskRuntime。】
import java.util.List;
import java.util.concurrent.RecursiveTask;public class MmcTask<T, R> extends RecursiveTask<R> {// ...省略其他成员变量和方法private TaskRuntime taskRuntime;public MmcTask(Builder<T, R> builder) {// ...省略其他初始化代码this.taskRuntime = new TaskRuntime(builder.taskName, builder.taskSource.size());}@Overrideprotected R compute() {// 在处理任务之前调用onTaskStarted方法taskListener.onTaskStarted(taskRuntime);// ...省略其他代码// 在任务完成后更新已完成任务的计数taskRuntime.getCompletedTasks().incrementAndGet();taskRuntime.getRemainingTasks().decrementAndGet();// ...省略其他代码}
}
  • 【最后,我们修改MmcTaskExecutor类以使用TaskRuntime。】
import java.util.List;
import java.util.concurrent.ForkJoinPool;public class MmcTaskExecutor<T, R> {// ...省略其他成员变量和方法// 提交新任务public R submitTask(MmcTask<T, R> mmcTask) {// 调用onTasksSubmitted方法taskListener.onTasksSubmitted(mmcTask.getTaskRuntime(), System.currentTimeMillis());R result = forkJoinPool.invoke(mmcTask);// 调用onTasksCompleted方法taskListener.onTasksCompleted(mmcTask.getTaskRuntime(), mmcTask.getElapsedTime(), System.currentTimeMillis());return result;}// 同步执行并返回结果public R execute() {MmcTask<T, R> mmcTask = new MmcTask.Builder<T, R>()// ...省略其他方法调用.build();return submitTask(mmcTask);}
}

这轮AI的回答还是有问题,例如下面:

    public MmcTask(Builder<T, R> builder) {// ...省略其他初始化代码this.taskRuntime = new TaskRuntime(builder.taskName, builder.taskSource.size());}

它在MmcTask来初始化运行时数据,这明显是不对的,继续让它修改。下一篇,继续优化监听器 - -

七、参考文章

  • 《AI大模型编写多线程并发框架(六十一):从零开始搭建框架》
  • 《AI大模型编写多线程并发框架(六十二):限流和并发度优化》
  • 《AI大模型编写多线程并发框架(六十三):监听器优化·上》
  • 《AI大模型编写多线程并发框架(六十四):监听器优化·下》
  • 《AI大模型编写多线程并发框架(六十五):发布和应用》

加我加群(备注csdn)一起交流学习!更多干货下载、项目源码和大厂内推等着你


http://www.mrgr.cn/news/15776.html

相关文章:

  • 在高质量视频生成文本、图像生成文本的GLM-4V-Plus技术加持下医疗未来的方向
  • CSS线性渐变拼接,一个完整的渐变容器(div),要拆分成多个渐变容器(div),并且保持渐变效果一致
  • MySQL必会知识精华5(WHERE简单子句)
  • day04-面向对象-常用API时间Arrays
  • 带你了解RS485通讯网关-天拓四方
  • 数学建模--皮尔逊相关系数、斯皮尔曼相关系数
  • 汽车信息安全--MCU Flash读保护真的没有后门吗?
  • 深入浅出Entity-Component-System:重塑游戏开发的未来
  • 软考高级证书拿到手了,怎么评职称?
  • 突发:Runway 从 HuggingFace 上删库跑路,究竟发生了什么?
  • 1.8.1、加密和认证技术
  • 精准删除:掌握SQL中的DELETE语句
  • 【人工智能】AI虚拟主播制作初体验:从生成数字人到视频创作全流程
  • 卖家避坑!亚马逊运营技巧及注意事项
  • MQTT学习:MQTT vs AMQP,mosquitto安装,调试工具mqttfx mqttx
  • Java线程生命周期详解_(1)
  • Error running tomcat: Can‘t find catalina.jar
  • SQL存储过程:数据库编程的瑞士军刀
  • 【微信小程序】如何触发按钮事件,例如调起微信客服
  • 【C++ Primer Plus习题】7.6