当前位置: 首页 > news >正文

python工具--mysql2doris的datax json生成工具

一、说明

要做大量的datax来同步mysql-doris,会需要写很多datax的json文件,为了省事,写了工具,只要提供doris的建表语句即可生产json。

二、文件说明

一共用到了五个文档

2.1 conf.json

这里是Mysql和doris的链接信息
其中 table_prefix 字段是因为mysql到doris时表会加前缀,如果你是同名表 就用不到。

{"mysql": {"host": "","port": 3306,"user": "root","password": "","database": "","table_prefix": ""},"doris": {"host": "","port": 9030,"user": "","password": "","loadUrl": ["xxx:8030"],"preSql": []}
}

2.2 datax_example.json

这里是一个标准的json文件,生产是会用这个做模版修改成你想要的json
如果你想提高速度,可以在这里修改channel
其他关于模版的修改 也在这里改。


{"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "","password": "","splitPk": "","connection": [{"querySql": [],"jdbcUrl": []}]}},"writer": {"name": "doriswriter","parameter": {"username": "","password": "","loadUrl": [],"TwoPhaseCommit": "true","column": [],"preSql": [],"flushInterval": 30000,"connection": [{"table": [""],"jdbcUrl": "","selectedDatabase": ""}],"loadProps": {"format": "json","strip_outer_array": "true","line_delimiter": "\\x02"}}}}]}
}

2.3 datax_output.json

这是最终生成的json文件,不需要提前创建

2.4 dorisDDL.sql

这里是doris的建表语句。需要注意的是,请带上库名,datax json需要写库名,是从这里解析的。

CREATE TABLE `test`.`merchant` (`merchant_code` VARCHAR(60) NOT NULL COMMENT '商户编码',`merchant_name` VARCHAR(100) DEFAULT NULL COMMENT '商户名称',`merchant_type` VARCHAR(60) DEFAULT NULL COMMENT '商户类型'
) ENGINE = OLAP
UNIQUE KEY(`merchant_code`)
DISTRIBUTED BY HASH(`merchant_code`)   BUCKETS 20
PROPERTIES ("replication_num" = "1","storage_type" = "COLUMN"
);

2.5 Mysql2dorisDataxTools.py

这就是核心代码了

import json
import re# 读取JSON文件
def read_and_format_json(file_path):# 读取文件内容with open(file_path, 'r', encoding='utf-8') as file:content = file.read()# 将内容转换为Python字典return json.loads(content)# 解析DDL
def parse_create_table_sql(file):sql = ''with open(file, 'r', encoding='utf-8') as file:sql = file.read()# 移除SQL中的注释sql = re.sub(r'--.*$', '', sql, flags=re.MULTILINE)sql = re.sub(r'/\*.*?\*/', '', sql, flags=re.DOTALL)# 提取库名和表名table_pattern = r'CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?(?:(`?)(\w+)\1\.)?(`?)(\w+)\3'table_match = re.search(table_pattern, sql, re.IGNORECASE)if not table_match:return Nonedatabase_name = table_match.group(2)table_name = table_match.group(4)# 提取字段定义部分column_section = re.search(r'\((.*?)\)[^)]*$', sql, re.DOTALL)if not column_section:return None# 提取字段名column_pattern = r'`?(\w+)`?\s+(?:\w+)(?:\(.*?\))?(?:\s+.*?)?(?:,|$)'columns = re.findall(column_pattern, column_section.group(1))return {'database': database_name,'table': table_name,'columns': columns}# def get_select()
def get_select(columns,table_name,tablename_prefix):columns = [s.strip() for s in columns]table_name = table_name.replace(tablename_prefix,'')return 'SELECT  ' +'`'+ '`,`'.join(columns) +'`'+ ' FROM ' + table_namedef get_column(columns):return ['`' + c + '`' for c in columns]conf = read_and_format_json('conf.json')
js_demo = read_and_format_json('datax_example.json')
ddl_info = parse_create_table_sql('dorisDDL.sql')
select_sql = get_select(ddl_info['columns'],ddl_info['table'],conf['mysql']['table_prefix'])
column = get_column(ddl_info['columns'])# reader部分
js_demo['job']['content'][0]['reader']['parameter']['username'] = conf['mysql']['user']
js_demo['job']['content'][0]['reader']['parameter']['password'] = conf['mysql']['password']
js_demo['job']['content'][0]['reader']['parameter']['connection'][0]['querySql'] = [select_sql]
jdbc_url_mysql = 'jdbc:mysql://'+conf['mysql']['host']+':'+str(conf['mysql']['port'])+'/'+conf['mysql']['database']+'?useSSL=false&serverTimezone=Asia/Shanghai'
js_demo['job']['content'][0]['reader']['parameter']['connection'][0]['jdbcUrl'] = [jdbc_url_mysql]# writer部分
js_demo['job']['content'][0]['writer']['parameter']['username'] = conf['doris']['user']
js_demo['job']['content'][0]['writer']['parameter']['password'] = conf['doris']['password']
js_demo['job']['content'][0]['writer']['parameter']['loadUrl'] = conf['doris']['loadUrl']
js_demo['job']['content'][0]['writer']['parameter']['column'] = column
js_demo['job']['content'][0]['writer']['parameter']['preSql'] = conf['doris']['preSql']
js_demo['job']['content'][0]['writer']['parameter']['connection'][0]['table'] = [ddl_info['table']]
jdbc_url_doris = 'jdbc:mysql://'+conf['doris']['host']+':'+str(conf['doris']['port'])+'/'+ddl_info['database']
js_demo['job']['content'][0]['writer']['parameter']['connection'][0]['jdbcUrl'] = jdbc_url_doris
js_demo['job']['content'][0]['writer']['parameter']['connection'][0]['selectedDatabase'] = ddl_info['database']print(json.dumps(js_demo))# 写入JSON文件
with open('datax_output.json', 'w', encoding='utf-8') as file:json.dump(js_demo, file, ensure_ascii=False, indent=4)

三、使用

只要把以上文件放在一个目录里,直接执行Mysql2dorisDataxTools.py 就可以了。

如果你懒得写,直接用下面的压缩包吧。

https://download.csdn.net/download/weixin_45399602/89644991?spm=1001.2014.3001.5503


http://www.mrgr.cn/news/1565.html

相关文章:

  • C语言整数溢出的问题
  • 智能驾驶时代的中控屏UI设计创新
  • android13隐藏调节声音进度条下面的设置按钮
  • 开放式耳机哪种好用?开放式种草测评!
  • 网络热门编程项目导学:尚医通
  • 【C++深度探索】unordered_set、unordered_map封装
  • 全栈笔记_浏览器工作原理篇(浏览器架构)
  • Spring 中 @EnableAspectJAutoProxy 注解未生效的原因及解决方案
  • 【概率论与数理统计】学习线路
  • 支付网站屡遭CC攻击,高防ip能防CC攻击吗?
  • 分布式ID-一窥雪花算法的原生实现问题与解决方案(CosId)
  • 【大数据】基础认知入门
  • Hashmap 和 hashtable ConcurrentHashMap 区别
  • 哪款蓝牙耳机最具有性价比呢?四款喜爱度爆表百元耳机推荐!
  • 销售预测数据挖掘实战V2.0
  • 表单自定义规则的校验
  • Kali Linux 三种网络攻击方法总结(DDoS、CC 和 ARP 欺骗)
  • 如何评估和选择适合自己风险承受能力的期权合约类型?
  • Hive3:常用的虚拟列(Virtual Columns)
  • 【手撕数据结构】链式二叉树