DataX-Oracle新增writeMode支持update

news/2024/4/29 23:00:20

目录

前言

第一步下载源码

第二步修改源码

1、Oraclewriter

2、WriterUtil

 2.1、修改getWriteTemplate方法

 2.2、新增onMergeIntoDoString与getStrings方法

3、CommonRdbmsWriter

 3.1、修改startWriteWithConnection

 3.2、修改doBatchInsert

 3.3、修改fillPreparedStatement

第三步打包

第四步脚本修改

修改后jar包地址 




前言

目前 DataX更新到datax_v202309版本还不能支持Oracle写入的update,只通过DataX只能修改源码。

原理:oracle 不支持类似 MySQL的 REPLACE INTO 和 INSERT … ON DUPLICATE KEY UPDATE,所以只支持 insert 配置项。要实现此功能,需要利用 Oracle 的 merge 语句,先来看下 merge 语法。

MERGE INTO [target-table] A USING [source-table sql] B 
ON([conditional expression] and [...]...) 
WHEN MATCHED THEN[UPDATE sql] 
WHEN NOT MATCHED THEN [INSERT sql]

第一步下载源码

 地址:datax_v202309。

第二步修改源码

一共修改3个文件

1、Oraclewriter

 

找到该代码直接注释掉就行。 

2、WriterUtil
 2.1、修改getWriteTemplate方法
public static String getWriteTemplate(List<String> columnHolders, List<String> valueHolders, String writeMode, DataBaseType dataBaseType, boolean forceUseUpdate) {boolean update = writeMode.trim().toLowerCase().startsWith("update");boolean isWriteModeLegal = writeMode.trim().toLowerCase().startsWith("insert")|| writeMode.trim().toLowerCase().startsWith("replace")|| update;if (!isWriteModeLegal) {throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE,String.format("您所配置的 writeMode:%s 错误. 因为DataX 目前仅支持replace,update 或 insert 方式. 请检查您的配置并作出修改.", writeMode));}// && writeMode.trim().toLowerCase().startsWith("replace")String writeDataSqlTemplate;if (forceUseUpdate || update) {//update只在mysql下使用if (dataBaseType == DataBaseType.MySql || dataBaseType == DataBaseType.Tddl) {writeDataSqlTemplate = new StringBuilder().append("INSERT INTO %s (").append(StringUtils.join(columnHolders, ",")).append(") VALUES(").append(StringUtils.join(valueHolders, ",")).append(")").append(onDuplicateKeyUpdateString(columnHolders)).toString();}//update在Oracle下使用else if (dataBaseType == DataBaseType.Oracle) {writeDataSqlTemplate = onMergeIntoDoString(writeMode, columnHolders, valueHolders) + "INSERT (" +StringUtils.join(columnHolders, ",") +") VALUES(" + StringUtils.join(valueHolders, ",") +")";}else {throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE,String.format("当前数据库不支持 writeMode:%s 模式.", writeMode));}} else {//这里是保护,如果其他错误的使用了update,需要更换为replaceif (update) {writeMode = "replace";}writeDataSqlTemplate = new StringBuilder().append(writeMode).append(" INTO %s (").append(StringUtils.join(columnHolders, ",")).append(") VALUES(").append(StringUtils.join(valueHolders, ",")).append(")").toString();}return writeDataSqlTemplate;}
 2.2、新增onMergeIntoDoString与getStrings方法

代码作用:对Oracle进行update的MERGE拼接

public static String onMergeIntoDoString(String merge, List<String> columnHolders, List<String> valueHolders) {String[] sArray = getStrings(merge);StringBuilder sb = new StringBuilder();sb.append("MERGE INTO %s A USING ( SELECT ");boolean first = true;boolean first1 = true;StringBuilder str = new StringBuilder();StringBuilder update = new StringBuilder();for (String columnHolder : columnHolders) {if (Arrays.asList(sArray).contains(columnHolder)) {if (!first) {sb.append(",");str.append(" AND ");} else {first = false;}str.append("TMP.").append(columnHolder);sb.append("?");str.append(" = ");sb.append(" AS ");str.append("A.").append(columnHolder);sb.append(columnHolder);}}for (String columnHolder : columnHolders) {if (!Arrays.asList(sArray).contains(columnHolder)) {if (!first1) {update.append(",");} else {first1 = false;}update.append(columnHolder);update.append(" = ");update.append("?");}}sb.append(" FROM DUAL ) TMP ON (");sb.append(str);sb.append(" ) WHEN MATCHED THEN UPDATE SET ");sb.append(update);sb.append(" WHEN NOT MATCHED THEN ");return sb.toString();}public static String[] getStrings(String merge) {merge = merge.replace("update", "");merge = merge.replace("(", "");merge = merge.replace(")", "");merge = merge.replace(" ", "");return merge.split(",");}
3、CommonRdbmsWriter
 3.1、修改startWriteWithConnection
        // 替换原先的代码块public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) {this.taskPluginCollector = taskPluginCollector;List<String> columns = new LinkedList<>();if (this.dataBaseType == DataBaseType.Oracle && writeMode.trim().toLowerCase().startsWith("update") ) {String merge = this.writeMode;String[] sArray = WriterUtil.getStrings(merge);this.columns.forEach(column->{if (Arrays.asList(sArray).contains(column)) {columns.add(column);}});this.columns.forEach(column->{if (!Arrays.asList(sArray).contains(column)) {columns.add(column);}});}columns.addAll(this.columns);// 用于写入数据的时候的类型根据目的表字段类型转换this.resultSetMetaData = DBUtil.getColumnMetaData(connection, this.table, StringUtils.join(columns, ","));// 写数据库的SQL语句calcWriteRecordSql();List<Record> writeBuffer = new ArrayList<Record>(this.batchSize);int bufferBytes = 0;try {Record record;while ((record = recordReceiver.getFromReader()) != null) {if (record.getColumnNumber() != this.columnNumber) {// 源头读取字段列数与目的表字段写入列数不相等,直接报错throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,String.format("列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.",record.getColumnNumber(),this.columnNumber));}writeBuffer.add(record);bufferBytes += record.getMemorySize();if (writeBuffer.size() >= batchSize || bufferBytes >= batchByteSize) {doBatchInsert(connection, writeBuffer);writeBuffer.clear();bufferBytes = 0;}}if (!writeBuffer.isEmpty()) {doBatchInsert(connection, writeBuffer);writeBuffer.clear();bufferBytes = 0;}} catch (Exception e) {throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);} finally {writeBuffer.clear();bufferBytes = 0;DBUtil.closeDBResources(null, null, connection);}}
 3.2、修改doBatchInsert
 protected void doBatchInsert(Connection connection, List<Record> buffer)throws SQLException{PreparedStatement preparedStatement = null;try {connection.setAutoCommit(false);preparedStatement = connection.prepareStatement(this.writeRecordSql);if (this.dataBaseType == DataBaseType.Oracle && !"insert".equalsIgnoreCase(this.writeMode)) {String merge = this.writeMode;String[] sArray = WriterUtil.getStrings(merge);for (Record record : buffer) {List<Column> recordOne = new ArrayList<>();for (int j = 0; j < this.columns.size(); j++) {if (Arrays.asList(sArray).contains(this.columns.get(j))) {recordOne.add(record.getColumn(j));}}for (int j = 0; j < this.columns.size(); j++) {if (!Arrays.asList(sArray).contains(this.columns.get(j))) {recordOne.add(record.getColumn(j));}}for (int j = 0; j < this.columns.size(); j++) {recordOne.add(record.getColumn(j));}for (int j = 0; j < recordOne.size(); j++) {record.setColumn(j, recordOne.get(j));}preparedStatement = fillPreparedStatement(preparedStatement, record);preparedStatement.addBatch();}}else {for (Record record : buffer) {preparedStatement = fillPreparedStatement(preparedStatement, record);preparedStatement.addBatch();}}preparedStatement.executeBatch();connection.commit();}catch (SQLException e) {LOG.warn("回滚此次写入, 采用每次写入一行方式提交. 因为: {}", e.getMessage());connection.rollback();doOneInsert(connection, buffer);}catch (Exception e) {throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);}finally {DBUtil.closeDBResources(preparedStatement, null);}}
 3.3、修改fillPreparedStatement
  protected PreparedStatement fillPreparedStatement(PreparedStatement preparedStatement, Record record)throws SQLException{for (int i = 0; i < record.getColumnNumber(); i++) {int columnSqltype = this.resultSetMetaData.getMiddle().get(i);String typeName = this.resultSetMetaData.getRight().get(i);preparedStatement = fillPreparedStatementColumnType(preparedStatement, i,columnSqltype, typeName,record.getColumn(i));}return preparedStatement;}

第三步打包

1、只需要在idea里面打包修改的两个程序就可以

 2、打包成功后获取两个jar包

 3、将包替换到datax的插件里面

 将oraclewriter-0.0.1-SNAPSHOT.jar替换到datax\plugin\writer\oraclewriter

 将plugin-rdbms-util-0.0.1-SNAPSHOT.jar替换到datax\plugin\writer\oraclewriter\libs

第四步脚本修改

{"job": {"setting": {"speed": {"byte": 1048576},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "${r_username}","password": "${r_password}","connection": [{	   "querySql": ["SELECT f_year,f_code,f_name,f_order FROM tableName"],"jdbcUrl": ["${r_jdbcUrl}"]}]}},"writer": {"name": "oraclewriter","parameter": {"writeMode": "update(f_year,f_code)","username": "${w_username}","password": "${w_password}","column": ["f_year","f_code","f_name","f_order"],"session": [],"preSql": [],"connection": [{"jdbcUrl": "${w_jdbcUrl}","table": ["tableName"]}]}}		   }]}
}

参数 "writeMode": "update(f_year,f_code)" 里面f_year,f_code就是主键, 参数上不要加/"

update(\"f_year\",\"f_code\")这样是拼不上sql的,这个问题调试了好久才解决。

这时候运行就成功了

参考文章DataX 二次开发支持 Oracle 更新数据icon-default.png?t=N7T8https://blog.csdn.net/xch_yang/article/details/128250190?utm_medium=distribute.pc_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-0-128250190-blog-106881907.235%5Ev43%5Epc_blog_bottom_relevance_base8&spm=1001.2101.3001.4242.1&utm_relevant_index=3Datax oracle 支持增量并且支持全量更新icon-default.png?t=N7T8https://blog.csdn.net/weixin_41250031/article/details/122615271?spm=1001.2101.3001.6650.5&utm_medium=distribute.pc_relevant.none-task-blog-2~default~CTRLIST~Rate-5-122615271-blog-129723622.235%5Ev43%5Epc_blog_bottom_relevance_base8&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2~default~CTRLIST~Rate-5-122615271-blog-129723622.235%5Ev43%5Epc_blog_bottom_relevance_base8&utm_relevant_index=7

修改后jar包地址 

懒得修改可以直接下载两个jar替换到你们的datax对应目录。

https://download.csdn.net/download/qq_36802726/89046154icon-default.png?t=N7T8https://download.csdn.net/download/qq_36802726/89046154


http://www.mrgr.cn/p/73165554

相关文章

Flutter 拦截系统键盘,显示自定义键盘

一、这里记录下在开发过程中&#xff0c;下单的时候输入金额需要使用自定义的数字键盘 参考链接: https://juejin.cn/post/7166046328609308685 效果图 二、屏蔽系统键盘 怎样才能够在输入框获取焦点的时候&#xff0c;不让系统键盘弹出呢&#xff1f;同时又显示我们自定义的…

【SpringBoot从入门到精通】01_SpringBoot概述

一、Spring与SpringBoot 1.1 Spring Spring 是一款目前主流的 Java EE 轻量级开源框架&#xff0c;是 Java 世界最为成功的框架之一。Spring 由“Spring 之父”Rod Johnson(罗宾约翰逊) 提出并创立&#xff0c;其目的是用于简化 Java 企业级应用的开发难度和开发周期。 广义…

新网站收录时间是多久,新建网站多久被百度收录

对于新建的网站而言&#xff0c;被搜索引擎收录是非常重要的一步&#xff0c;它标志着网站的正式上线和对外开放。然而&#xff0c;新网站被搜索引擎收录需要一定的时间&#xff0c;而且时间长短受多种因素影响。本文将探讨新网站收录需要多长时间&#xff0c;以及新建网站多久…

Prometheus +Grafana +node_exporter可视化监控Linux + windows虚机

1、介绍 待补充 2、架构图 Prometheus &#xff1a;主要是负责存储、抓取、聚合、查询方面。 node_exporter &#xff1a;主要是负责采集物理机、中间件的信息。 3、搭建过程 配置要求&#xff1a;1台主服务器 n台从服务器 &#xff08;被监控的linux或windows虚机&am…

【分布式】——CAPBASE理论

CAP&BASE理论 ⭐⭐⭐⭐⭐⭐ Github主页&#x1f449;https://github.com/A-BigTree 笔记链接&#x1f449;https://github.com/A-BigTree/tree-learning-notes ⭐⭐⭐⭐⭐⭐ Spring专栏&#x1f449;https://blog.csdn.net/weixin_53580595/category_12279588.html Sprin…

数仓 - [04] 数仓分层

数仓分层是一种将数据仓库按照不同的层级进行组织和管理的方法。每个层级具有不同的功能和目的,旨在支持数据分析、报告和决策等不同的业务需求。一、数仓分层的意义和目的数仓分层的主要目的是实现数据的高效访问和分析,提高数据的可用性和性能,同时提供更好的灵活性和可扩…

TheMoon 恶意软件短时间感染 6,000 台华硕路由器以获取代理服务

文章目录 针对华硕路由器Faceless代理服务预防措施 一种名为"TheMoon"的新变种恶意软件僵尸网络已经被发现正在侵入全球88个国家数千台过时的小型办公室与家庭办公室(SOHO)路由器以及物联网设备。 "TheMoon"与“Faceless”代理服务有关联&#xff0c;该服务…

Hybrid-PSC:基于对比学习的混合网络,解决长尾图片分类 | CVPR 2021

论文提出新颖的混合网络用于解决长尾图片分类问题,该网络由用于图像特征学习的对比学习分支和用于分类器学习的交叉熵分支组成,在训练过程逐步将训练权重调整至分类器学习,达到更好的特征得出更好的分类器的思想。另外,为了节省内存消耗,论文提出原型有监督对比学习。从实…

axios发送get请求但参数中有数组导致请求路径多出了“[]“的处理办法

一、情况 使用axios发送get请求携带了数组参数时&#xff0c;请求路径中就会多出[]字符&#xff0c;而在后端也会报错 二、解决办法 1、安装qs 当前项目的命令行中安装 npm install qs2、引入qs库(使用qs库来将参数对象转换为字符串) // 全局 import qs from qs Vue.proto…

STM32使用USART发送数据包指令点亮板载LED灯

电路连接&#xff1a; 连接显示屏模块&#xff0c;显示屏的SCL在B10&#xff0c;SDA在B11。 程序目的&#xff1a; 发送LED_ON指令打开板载LED灯&#xff0c;发送LED_OFF关闭板载LED灯&#xff0c;与上一个博客不同&#xff0c;这个实际上是实现串口收发文本数据包。 …

使用DBever连接人大金仓数据库

下载安装DBever首先需要下载并安装DBever,可以在DBever官网上下载最新版的安装程序,根据提示进行安装即可。 打开人大金仓数据库服务 在连接人大金仓数据库之前,需要确保人大金仓数据库服务已经启动。可以在服务列表中找到人大金仓数据库服务,并启动它。使用DBever连接人大…

【数据库】postgresql截取最后一个字符之前的所有字符,如V1.0.0.20230731110947中取V1.0.0

在PostgreSQL中,我们可以使用position函数和split_part函数来截取最后一个.之前的所有字符。这两个函数都非常有用,尤其是在处理文本数据时。 position函数 position函数用于查找一个字符串中某个子串的位置。它的语法如下: POSITION(substring IN string)其中,substring是…

DER编码

一、任务详情参考附件中图书p120 中7.1的实验指导,完成DER编码 Name实例中,countryName改为“CN”,organization Name-"你的学号" commoaName="你的姓名拼音" 用echo -n -e "编码" > 你的学号.der中,用OpenSSL asn1parse 分析编码的正确性…

揭秘’在家答答题,无需经验、无论男女、单号轻松日产200+的一个玩法

项目简介 公众号&#xff1a;老A程序站 这个项目是人人可参与的&#xff0c;无需支付任何费用&#xff0c;只需投入时间即可。每天的任务主要是回答问题。 项目 地 址 &#xff1a; laoa1.cn/1457.html 如果遇到不会的问题&#xff0c;可以直接使用百度进行搜索。我们通过…

YOLOF:单层特征检测也可以比FPN更出色 | CVPR 2021

论文通过分析发现FPN的成功在于divide-and-conquer策略解决了目标检测的优化问题,借此研究设计了仅用单层特征预测的高效检测网络YOLOF。YOLOF在结构上没有很多花哨的结构,却在准确率、推理速度和收敛速度上都有不错的提升,相对于眼花缭乱的FPN魔改结构,十分值得学习 来源:…

原型链-(前端面试 2024 版)

来讲一讲原型链 原型链只存在于函数之中 四个规则 1、引用类型&#xff0c;都具有对象特性&#xff0c;即可自由扩展属性。 2、引用类型&#xff0c;都有一个隐式原型 __proto__ 属性&#xff0c;属性值是一个普通的对象。 3、引用类型&#xff0c;隐式原型 __proto__ 的属…

RSTP环路避免实验(华为)

思科设备参考&#xff1a;RSTP环路避免实验&#xff08;思科&#xff09; 一&#xff0c;技术简介 RSTP (Rapid Spanning Tree Protocol) 是从STP发展而来 • RSTP标准版本为IEEE802.1w • RSTP具备STP的所有功能&#xff0c;可以兼容STP运行 • RSTP和STP有所不同 减少了…

TitanIDE与传统 IDE 比较

与传统IDE的比较 TitanIDE 和传统 IDE 属于不同时代的产物&#xff0c;在手工作坊时代&#xff0c;一切都是那么的自然&#xff0c;开发者习惯 Windows 或 MacOS 原生 IDE。不过&#xff0c;随着时代的变迁&#xff0c;软件行业已经步入云原生时代&#xff0c;TitanIDE 是顺应…

使用MergeKit创建自己的专家混合模型:将多个模型组合成单个MoE

由于Mixtral的发布,专家混合(MoE)架构在最近几个月变得流行起来。虽然Mixtral和其他MoE架构是从头开始预训练的,但最近出现了另一种创建MoE的方法:Arcee的MergeKit库可以通过集成几个预训练模型来创建moe。这些人通常被称为frankenMoEs或MoErges,以区别于预先训练的MoEs。 …

Redis命令-List命令

4.6 Redis命令-List命令 Redis中的List类型与Java中的LinkedList类似&#xff0c;可以看做是一个双向链表结构。既可以支持正向检索和也可以支持反向检索。 特征也与LinkedList类似&#xff1a; 有序元素可以重复插入和删除快查询速度一般 常用来存储一个有序数据&#xff…