当前位置: 首页 > news >正文

异构数据同步 datax (3) xxl-job 分布式任务调度

datax 需要手动执行 python 脚本来满足需求,可通过XXL-JOB 进行任务调度实现,满足自动化数据同步需求。

1、nacos 配置

# 配置
xxxx:job:runScript: /home/midware/datax/script/mysql_ps_test.shpythonScriptPath: /home/midware/datax/bin/datax.py jobFilePath: /home/midware/datax/job/mysql_postgres_job_update.json

2、编写shell脚本

mysql_ps_test.sh 

#!/bin/bash# 检查参数数量
if [ "$#" -ne 2 ]; thenecho "Usage: $0 <script_path> <config_file_path>"exit 1
fi# 获取传入的参数
script_path="$1"
config_file_path="$2"# 运行 Python 脚本
python "$script_path" "$config_file_path"# 如果您希望在执行后检查状态码并采取进一步行动,可以添加如下代码:
#if [ $? -ne 0 ]; then
#    echo "Error occurred while running the script."
#    # 可以在这里添加错误处理逻辑
#fi

使用说明:

  1. 将上述脚本保存到一个文件中,例如 mysql_ps_test.sh
  2. 给这个文件添加可执行权限:chmod +x mysql_ps_test.sh.
  3. 运行脚本并传递参数:./run_datax.sh /home/midware/datax/job/mysql_postgres_job_update.py /home/midware/datax/job/mysql_postgres_job_update.json

3、job编写

mysql_postgres_job_update.json

{"job":{"content":[{"reader":{"name":"mysqlreader","parameter":{"username": "root","password": "xxxxxx","connection": [{"jdbcUrl": ["jdbc:mysql://192.168.5.180:3306/xxxx?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&serverTimezone=GMT%2B8"],"querySql": ["  SELECT * FROM sys_test_copy2 WHERE create_time BETWEEN NOW() - INTERVAL 5 MINUTE AND NOW() "]}}},"writer":{"name":"postgresqlwriter","parameter":{"writeMode": "update!@#(user_id)!@#(email)","column":["id","name"],"connection":[{"jdbcUrl":"jdbc:postgresql://127.0.0.1:5432/postgres","table":["sys_test_copy2"]}],"password":"xxxx","username":"postgres"}}}],"setting":{"speed":{"channel":6}}}
}

4、编写一个xxl-job 任务

@Component
public class SampleXxlJob {private static final Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);@Resourceprivate ScriptProperties scriptProperties;/*** 3、命令行任务*/@XxlJob("commandJobHandler")public void commandJobHandler() {executeCommandForScript(Arrays.asList("sh", scriptProperties.getRunScript(),scriptProperties.getPythonScriptPath(),scriptProperties.getJobFilePath()));}private void executeCommandForScript(List<String> command) {// 使用ProcessBuilder执行命令try {logger.info("---------->>>>command:{}", StringUtils.join(command, " "));ProcessBuilder pb = new ProcessBuilder(command);Process process = pb.start();           // 等待命令执行完成int exitCode = process.waitFor();if (exitCode == 0) {XxlJobHelper.log("Command completed successfully.");} else {XxlJobHelper.handleFail("Command exit value(" + exitCode + ") is failed");}            } catch (IOException | InterruptedException e) {XxlJobHelper.handleFail("Command is failed " + e.getMessage());}}
}

定时任务可设置每5分钟跑一次,这样就可以近实时同步数据了

0 */5 * * * ?

5、小结:

至此,从datax 到 job 编写,到postgres plug 源码调整,到xxl-job 定时执行。完成了一个定时数据同步功能。


http://www.mrgr.cn/news/3229.html

相关文章:

  • KeyShot 2024.2:卓越的Mac与Windows 3D渲染与动画制作软件
  • 鸿蒙内核源码分析(中断切换篇) | 系统因中断活力四射
  • 如何提高研发效能?思码逸 信通院给你答案
  • 2020 位示图
  • mybatis druid postgresql statement超时原理原理
  • Error: ReferenceError: ReadableStream is not defined
  • 【ACM独立出版】第四届大数据、人工智能与风险管理国际学术会议 (ICBAR 2024)
  • Android 开发中常用的布局类型及其选择指南
  • 分支dev项目合并到master
  • Redis 的 List 结构非常适合用于实现消息队列php
  • LORA通信详解
  • Qt 学习第六天:页面布局
  • CSS方向选择的艺术:深入探索:horizontal和:vertical伪类
  • 【现代操作系统】1. intro
  • Sentinel入门与进阶:微服务流量控制的最佳实践 ( 五 )
  • Jenkins入门以及安装
  • 【SQL】仅出现一次的最大数据
  • 构建Docker镜像时,遇到从`deb.debian.org`下载软件包速度很慢
  • MySQL 如何保证事务的原子性
  • 知识竞赛中PK答题环节竞赛规则有哪些设计方案