学成在线 - 第3章任务补偿机制实现 + 分块文件清理

news/2024/5/20 4:27:03

7.9 额外实现

7.9.1 任务补偿机制

问题:如果有线程抢占了某个视频的处理任务,如果线程处理过程中挂掉了,该视频的状态将会一直是处理中,其它线程将无法处理,这个问题需要用补偿机制。

单独启动一个任务找到待处理任务表中超过执行期限但仍在处理中的任务,将任务的状态改为执行失败。

任务执行期限是处理一个视频的最大时间,比如定为30分钟,通过任务的启动时间去判断任务是否超过执行期限。

大家思考这个sql该如何实现?

大家尝试自己实现此任务补偿机制。

数据库表结构

media_process:

在这里插入图片描述

根据status字段判断视频文件是否正在处理,如果status值为4(正在处理),且当前时间减去create_date超过30分钟,就把status的值修改为3(处理失败),并更新其他字段的值。

具体实现流程
  1. 编写检查超时的sql

    SELECT * FROM `media_process_history`
    WHERE TIMESTAMPDIFF(MINUTE,create_date,finish_date) < 30
    
  2. 在media_process的mapper接口中添加相应的接口

    MediaProcessMapper.java

    /*** 查询是否有执行超过30分钟的视频处理任务* @param nowDate 当前任务执行的时间* @return List<MediaProcess>*/
    @Select("SELECT * FROM media_process_history WHERE TIMESTAMPDIFF(MINUTE,create_date,#{date}) > 30")
    List<MediaProcess> selectTimeoutProcess(@Param("date")Date nowDate);
    
  3. 实现service代码

    /*** 超时任务列表* @param shardIndex 分片序号* @param shardTotal 分片总数* @param count 获取记录数(cpu核心数)* @return List<MediaProcess>*/
    @Override
    public List<MediaProcess> getTimeoutMediaProcessList(int shardIndex, int shardTotal, int count) {List<MediaProcess> mediaTimeoutProcessList = mediaProcessMapper.selectTimeoutProcess(LocalDateTime.now(), shardIndex, shardTotal, count);return mediaTimeoutProcessList;
    }
    
  4. 业务逻辑在task中实现

    查询相应超时记录,把status和errMsg字段更新后保存到media_process表里。

    /*** 处理视频超时任务* @throws Exception*/
    @XxlJob("videoTimeoutJobHandler")
    public void videoTimeoutJobHandler() throws Exception {// 分片参数int shardIndex = XxlJobHelper.getShardIndex();int shardTotal = XxlJobHelper.getShardTotal();List<MediaProcess> mediaTimeoutProcessList = null;int size = 0;try {// 取出cpu核心数作为一次检查超时的记录条数int availableProcessors = Runtime.getRuntime().availableProcessors();mediaTimeoutProcessList = mediaFileProcessService.getTimeoutMediaProcessList(LocalDateTime.now(), shardIndex, shardTotal, availableProcessors);size = mediaTimeoutProcessList.size();log.debug("取出的超时任务数量是 {} 条", size);if (size <= 0) {return ;}} catch (Exception e) {log.error("取出的超时任务超时", size);e.printStackTrace();}// 启动size个线程的线程池ExecutorService fixedThreadPool = Executors.newFixedThreadPool(size);// 定义size个插销CountDownLatch countDownLatch = new CountDownLatch(size);mediaTimeoutProcessList.forEach(mediaProcess -> {// 将任务加入线程池fixedThreadPool.execute(() -> {try {// 把status值设置为3,并把errMsg更新Long taskId = mediaProcess.getId();String fileId = mediaProcess.getFileId();mediaFileProcessService.saveProcessFinishStatus(taskId, "3", fileId, null, "处理视频超时(30分钟)");log.debug("删除超时任务");} catch (Exception e) {e.printStackTrace();log.error("删除超时任务失败,失败原因: {}", e.getMessage());} finally {countDownLatch.countDown();}});});// 等待,给一个充裕的超时事件,防止无限等待,到达超时时间还没有处理完成则结束任务countDownLatch.await(30, TimeUnit.MINUTES);
    }
    

7.9.2 达到最大失败次数(暂时未实现)

问题:需要找到前端对应的接口。

当任务达到最大失败次数时一般就说明程序处理此视频存在问题,这种情况就需要人工处理,在页面上会提示失败的信息,人工可手动执行该视频进行处理,或通过其它转码工具进行视频转码,转码后直接上传mp4视频。

7.9.3 分块文件清理问题

上传一个文件进行分块上传,上传一半不传了,之前上传到minio的分块文件要清理吗?怎么做的?

1、在数据库中有一张文件表记录minio中存储的文件信息。

2、文件开始上传时会写入文件表,状态为上传中,上传完成会更新状态为上传完成。(实现过程中并没有往文件表中插入正在上传的文件记录,只是在media_minio_files临时保存了分块信息)

3、当一个文件传了一半不再上传了说明该文件没有上传完成,会有定时任务去查询文件表中的记录,如果文件未上传完成则删除minio中没有上传成功的文件目录。

实现思路

难点:怎么判断文件上传了一半不再上传了?上传分块文件只在上传视频时有用,上传分块文件时不会往media_files表里添加记录,只有所有分块上传完成,并合并成功后,才会往media_files里插入数据。所以要建立一个media_minio_files表,每上传一个分块就往media_minio_files表里插入一条分块信息,记录包括分块的上传时间。如果超过30分钟分块记录还没有被删除,说明上传到一半不传了,把minio里的分块目录删除,并删除对应的数据库里的记录。在文件合并成功后,数据库里的分块文件上传记录也要删除。

media_minio_files表结构

在这里插入图片描述

具体实现流程
  1. 上传文件块的时候,上传一个文件块完毕要把这个块的信息保存到表里。

    /*** 把文件分块信息入库* @param fileMd5 文件md5* @param fileName 文件名称* @param bucket minio桶* @param objectName minio中块的存储路径* @param chunkSize 块大小* @return MediaMinioFiles*/
    @Transactional
    @Override
    public MediaMinioFiles addMediaChunkToDb(String fileMd5, String fileName, String bucket, String objectName, Long chunkSize) {MediaMinioFiles mediaMinioFile = new MediaMinioFiles();mediaMinioFile.setFileId(fileMd5);mediaMinioFile.setFilename(fileName);mediaMinioFile.setBucket(bucket);mediaMinioFile.setFilePath(objectName);mediaMinioFile.setFileSize(chunkSize);mediaMinioFile.setStatus("4");  // 上传中int insert = mediaMinioFilesMapper.insert(mediaMinioFile);if (insert < 0) {log.error("保存分块文件信息到数据库失败,{}", mediaMinioFile.toString());XueChengPlusException.cast("保存分块文件信息失败");}log.debug("保存分块文件信息到数据库成功,{}", mediaMinioFile.toString());return mediaMinioFile;
    }/*** 上传分块* @param fileMd5 文件md5* @param chunk 分块序号* @param localChunkFilePath 分块文件本地路径* @return RestResponse*/
    @Override
    public RestResponse uploadChunk(String fileMd5, int chunk, String localChunkFilePath) {// 得到分块文件的目录路径String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);// 得到分块文件的路径String chunkFilePath = chunkFileFolderPath + chunk;// mimeTypeString mimeType = getMimeType(null);// 将文件存储到 minioboolean b = addMediaFilesToMinIO(localChunkFilePath, mimeType, bucketVideoFiles, chunkFilePath);// 将分块信息存到数据库中currentProxy.addMediaChunkToDb(fileMd5, null, bucketVideoFiles, chunkFilePath, 1024 * 5L);if (!b) {log.debug("上传分块文件失败: {}", chunkFilePath);return RestResponse.validfail(false, "上传分块失败");}log.debug("上传分块文件成功: {}", chunkFilePath);return RestResponse.success(true);
    }
    
  2. 如果没有上传失败,在成功合并块文件之后,要把上面数据库中记录删除。

    /*** 删除数据库中的分块文件上传记录* @param fileMd5 文件md5*/
    @Transactional
    public void clearChunkFromDb(String fileMd5) {LambdaQueryWrapper<MediaMinioFiles> deleteQueryWrapper = new LambdaQueryWrapper<>();deleteQueryWrapper.eq(MediaMinioFiles::getFileId, fileMd5);try {mediaMinioFilesMapper.delete(deleteQueryWrapper);log.debug("删除分块上传记录成功");} catch (Exception e) {e.printStackTrace();log.error("删除分块上传记录失败");}
    }
    

    在合并分块的最后几行:

    // 文件入库
    currentProxy.addMediaFilesToDb(companyId, fileMd5, uploadFileParamsDto, bucketVideoFiles, mergeFilePath);
    // 清除文件分块
    clearChunkFiles(chunkFileFolderPath, chunkTotal);
    // 清除数据库中文件分块上传信息
    currentProxy.clearChunkFromDb(fileMd5);
    return RestResponse.success(true);
    
  3. 如果上传文件一半失败,要根据数据库中media_minio_files表中存的块的路径删除所有超时块。

    首先查询所有的超时块,得到上传的超时块列表:

    /*** 拿到数据库中所有上传超时的文件分块信息* @param time 当前任务执行时间* @return List<MediaMinioFiles>*/
    @Override
    public List<MediaMinioFiles> getChunkTimeoutFiles(LocalDateTime time) {List<MediaMinioFiles> mediaMinioFiles = mediaMinioFilesMapper.selectTimeoutChunks(DateUtil.toDateTime(time));return mediaMinioFiles;
    }
    

    删除minio中所有超时块,并且删除所有超时记录

    @XxlJob("videoChunkTimeoutJobHandler")
    public void videoChunkTimeoutJobHandler() {log.debug(">>>>>>>>>> 开始执行检查上传超时块任务");// 拿到所有的超时分块任务List<MediaMinioFiles> chunkTimeoutFiles = mediaFileService.getChunkTimeoutFiles(LocalDateTime.now());if (chunkTimeoutFiles == null) {  // 没有超时任务log.debug("没有超时任务");return ;}// 根据记录中的file_path,删除minio中的所有文件chunkTimeoutFiles.forEach(chunk -> {String filePath = chunk.getFilePath();mediaFileService.clearSingleChunkFile(filePath);});// 删除数据库中对应的记录mediaFileService.clearChunkFromDb(null, LocalDateTime.now());
    }
    

http://www.mrgr.cn/p/30156425

相关文章

ReSharper 显示使用的颜色

在代码里面输入类似于 Colors.Red 的代码,将会自动在代码后面显示一个对应颜色的小方块。本文将告诉大家这个功能的开关在哪里如 ReSharper 的官方文档描述,此功能的效果如下或如下此功能名叫 “Highlight color usages” 可以对代码里面的颜色进行颜色标识,比如在代码提示或…

2009-2022年上市公司华证ESG评级评分数据(含细分项)

2009-2022年上市公司华证ESG评级评分数据&#xff08;含细分项&#xff09; 1、时间&#xff1a;2009-2022年 2、来源&#xff1a;华证ESG 3、指标&#xff1a;证券代码、证券简称、综合评级、年度、综合得分、E评级、E得分、S评级、S得分、G评级、G得分 4、范围&#xff1…

[开发|安卓] Android Studio 开发环境配置

Android Studio下载 Android Studio下载地址 下载SDK依赖 1.点击左上角菜单 2.选择工具 3.打开SDK管理中心 4.下载项目目标Android版本的SDK 配置安卓虚拟机 1.打开右上角的设备管理 2.选择合适的手机规格 3.下载并选择项目目标Android系统 4.点击完成配置 …

Hive Views 视图

Hive Views 视图 在Hive中&#xff0c;视图&#xff08;Views&#xff09;是虚拟表&#xff0c;它只包含查询定义&#xff0c;而不包含实际的数据。视图可以简化复杂查询&#xff0c;隐藏数据结构&#xff0c;提供安全性&#xff0c;以及促进数据访问和重用。 创建视图的语法如…

DI-engine强化学习入门(十又二分之一)如何使用RNN——数据处理、隐藏状态、Burn-in

一、数据处理 用于训练 RNN 的 mini-batch 数据不同于通常的数据。 这些数据通常应按时间序列排列。 对于 DI-engine, 这个处理是在 collector 阶段完成的。 用户需要在配置文件中指定 learn_unroll_len 以确保序列数据的长度与算法匹配。 对于大多数情况&#xff0c; learn_un…

独有病眼花,春风吹不落。 (二维坐标压缩成一个点,并查集)

本题链接&#xff1a;登录—专业IT笔试面试备考平台_牛客网 题目&#xff1a; 样例&#xff1a; 输入 3 8 1 1 D 1 1 R 1 2 D 2 1 D 2 2 R 3 1 R 3 2 R 2 3 D 输出 8 思路&#xff1a; 根据题意&#xff0c;要求连接线段后&#xff0c;操作多少次&#xff0c;连接的线段闭合&…

Transformer详解:从放弃到入门(二)

多头注意力 上篇文章中我们了解了词编码和位置编码&#xff0c;接下来我们介绍Transformer中的核心模块——多头注意力。 自注意力 首先回顾下注意力机制&#xff0c;注意力机制允许模型为序列中不同的元素分配不同的权重。而自注意力中的"自"表示输入序列中的输入相…

C++ list 介绍

&#x1f308;一、认识list这个模版 ist是一个模版&#xff0c;需要结合一个具体的数据类型作为模版参数&#xff0c; 即list < T > <T> <T>&#xff0c;才能成为一个类类型。list是双向循环链表&#xff0c;是序列容器&#xff0c;允许在序列中的任何位置进…

AI图书推荐:给自媒体创作者的ChatGPT使用指南

你是否厌倦了花费数小时盯着空白屏幕&#xff0c;努力为你的内容想出新鲜点子&#xff1f;想要将你的写作提升到下一个水平&#xff1f;有了ChatGPT&#xff0c;你可以告别写作障碍、无休止的修订和浪费的时间。 在这本全面的指南中&#xff0c;你将学到关于ChatGPT你需要知道…

Elastic 通过 AI 驱动的安全分析改变 SIEM 游戏

作者&#xff1a;Santosh Krishnan, Jennifer Ellard 借助由搜索 AI 提供支持的新攻击发现功能&#xff0c;优先考虑攻击&#xff0c;而不是警报。 传统的安全信息与事件管理系统&#xff08;SIEM&#xff09;在很大程度上依赖屏幕背后的人类才能取得成功。警报、仪表盘、威胁…

hadoop学习---基于Hive的教育平台数据仓库分析案例(二)

衔接第一部分&#xff0c;第一部分请点击&#xff1a;基于Hive的教育平台数据仓库分析案例&#xff08;一&#xff09; 意向用户模块&#xff08;全量分析&#xff09;&#xff1a; 需求指标&#xff1a; 需求一: 计期内&#xff0c;新增意向客户&#xff08;包含自己录入的意…

[转帖]Oracle Linux 9.3 正式版发布 - Oracle 提供支持 RHEL 兼容发行版

sysin2023-11-21 上海 阅读 5 分钟 Oracle Linux 9.3 正式版发布 - Oracle 提供支持 RHEL 兼容发行版 Oracle Linux with Unbreakable Enterprise Kernel (UEK) & Red Hat compatible kernel (RHCK) 请访问原文链接:https://sysin.org/blog/oracle-linux-9/,查看最新版。…

智启算力平台基本操作

智启算力平台 智启算力平台路径搭载数据集搭载镜像配置 智启算力平台 开发文档 帮助文档 - OpenI - 启智AI开源社区 路径搭载 OpenIOSSG/promote: 启智AI协作平台首页推荐组织及推荐项目申请。 - notice/Other_notes/SDKGetPath.md at master - promote - OpenI - 启智AI开…

【RT-DETR有效改进】 主干篇 | 2024.5全新的移动端网络MobileNetV4改进RT-DETR(含MobileNetV4全部版本改进)

&#x1f451;欢迎大家订阅本专栏&#xff0c;一起学习RT-DETR&#x1f451; 一、本文介绍 本文给大家带来的改进机制是MobileNetV4&#xff0c;其发布时间是2024.5月。MobileNetV4是一种高度优化的神经网络架构&#xff0c;专为移动设备设计。它最新的改动总结主要有两点&…

IOS离线打包uniapp的信息时报错如下的解决方法

IOS离线打包uniapp的信息时报错如下的解决方法 问题描述&#xff1a; Extract app intents metadata 0.1 seconds XExtractAppIntentsMetadata(in target HBuilder from project HBuilder-Hello)cd /Users/whb/space/vpt/vptios/HBuilder-Hello/Applications/Xcode.app/Conte…

win10下,svn上传.so文件失败

问题&#xff1a;win10下使用TortoiseSVN&#xff0c;svn上传.so文件失败 解决&#xff1a;右键&#xff0c;选择Settings&#xff0c;Global ignore pattern中删除*.so&#xff0c;保存即可。

项目经理【过程】概念

系列文章目录 【引论一】项目管理的意义 【引论二】项目管理的逻辑 【环境】概述 【环境】原则 【环境】任务 【环境】绩效 【人】概述 【人】原则 【人】任务 【人】绩效 【过程】概念 一、过程是什么 1.1 项目管理五大过程组 1.2 五大过程组之间的相互作用 1.3 项目阶段VS过…

使用图网络和视频嵌入预测物理场

文章目录 一、说明二、为什么要预测&#xff1f;三、流体动力学模拟的可视化四、DeepMind神经网络建模五、图形编码六、图形处理器七、图形解码器八、具有不同弹簧常数的轨迹可视化九、预测的物理编码和推出轨迹 一、说明 这是一篇国外流体力学专家在可视化流体物理属性的设计…

聊聊 ASP.NET Core 中间件(三):如何创建自己的中间件?

前言 本质上&#xff0c;中间件类也是一个普通的 .NET 类&#xff0c;它不需要继承任何父类或者实现任何接口。 但是有几个约定&#xff1a; 需要有一个构造方法构造方法至少要有一个 RequestDelegate 类型的参数&#xff0c;用来指向下一个中间件。需要定义一个名字为 Invo…

Linux 认识与学习Bash——3

在Linux bash中&#xff0c;数据流重定向是指将命令的输出从默认的标准输出&#xff08;通常是终端&#xff09;重定向到其他位置&#xff0c;如文件或另一个命令的输入。这是通过使用特定的符号来实现的。例如&#xff0c;>用于将输出重定向到文件&#xff0c;而<用于将…