Python批量导入一亿条数据进Oracle数据库
源数据来源
源数据来源有点复杂,使用iot可视化服务,批量造二十万条数据,通过平台建采集任务,将iot的数据采集到hive数据库中,使用doris集群连接mysql数据库,需要使用多条SQL语句才能查询hive数据库中的数据。
switch hive_catalog_kerberos;REFRESH CATALOG hive_catalog_kerberos PROPERTIES("invalid_cache" = "true");SELECT * FROM test_hive_mrs_db.ods__skyh_root_bigdata_test_ten_million;
当然也可以不用这种方法,随便找个csv、excel等文件都可以,前提是数据量不少。
使用Python提取MySQL数据库查询到的数据
首先建立连接。
# 创建MYSQL连接coon = pymysql.connect(host='IP',user='用户名',passwd='密码',port=端口,database='数据库名')
在Python中实现多SQL执行(将SQL语句分行写下来,依次append进入列表中,使用for循环遍历并commit进行提交)。
# 写SQL语句,将SQL语句依次添加至sql_list列表中sql_list = []sql = "switch hive_catalog_kerberos"sql_list.append(sql)sql = "REFRESH CATALOG hive_catalog_kerberos PROPERTIES('invalid_cache' = 'true')"sql_list.append(sql)sql = "SELECT * FROM test_hive_mrs_db.ods__skyh_root_bigdata_test_ten_million limit 200"sql_list.append(sql)# 创建游标cur = coon.cursor()# 循环SQL列表,执行SQL语句for sql in sql_list:cur.execute(sql)coon.commit()
将查询到的数据fetchall提取出来(由于提取的为元组,后面写入oracle中需要使用list()将其转换为列表)
# 查询语句查看所有并赋值all_select = cur.fetchall()# 需要将元组转为列表data = list(all_select)
使用Python将MySQL数据库查询到的数据写入oracle数据库中
首先创建连接
# 创建ORACLE数据库连接conn = cx_Oracle.connect('用户名', '密码', 'IP:端口/服务名')# 获取操作游标cursor = conn.cursor()
添加数据sql
# 添加数据sqlsql_ora = "INSERT INTO ONE_HUNDRED_MILLION (TIMENOW, QQ, WW, EE, RR, TT, YY) VALUES (:1,:2,:3,:4,:5,:6,:7)"
写入Oracle数据库中,当含有日期类型的数据,需要执行时间类型固定SQL
# 需要执行sql语句,将时间类型固定sql_ora_1 = "alter session set nls_timestamp_format='YYYY-MM-DD HH24:MI:SS.FF'"cursor.execute(sql_ora_1)conn.commit()
执行写入SQL语句时,使用for循环,循环次数为mysql提取到的列表的长度,需要设置步长控制性能
# 设置并发数量batch_size = 100000# 使用循环执行SQL语句for i in range(0, len(data), batch_size):cursor.executemany(sql_ora, data[i:i + batch_size])
源码
# -*- coding: utf8 -*-
import cx_Oracle
import os
import pymysql
import time# 指定Oracle文件位置
cx_Oracle.init_oracle_client(lib_dir=r"E:\awork\rj\oracleeee\instantclient_19_24")# 编码转换
os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'def mysql_con():# 引用mysql数据库的数据,插入到oracle数据库# 创建MYSQL连接coon = pymysql.connect(host='IP',user='用户名',passwd='密码',port=端口,database='数据库名')# 写SQL语句,将SQL语句依次添加至sql_list列表中sql_list = []sql = "switch hive_catalog_kerberos"sql_list.append(sql)sql = "REFRESH CATALOG hive_catalog_kerberos PROPERTIES('invalid_cache' = 'true')"sql_list.append(sql)sql = "SELECT * FROM test_hive_mrs_db.ods__skyh_root_bigdata_test_ten_million limit 200000"sql_list.append(sql)# 创建游标cur = coon.cursor()# 循环SQL列表,执行SQL语句for sql in sql_list:cur.execute(sql)coon.commit()# 查询语句查看所有并赋值all_select = cur.fetchall()# 需要将元组转为列表data = list(all_select)print("mysql数据读取成功:" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))# 关闭游标cur.close()# 关闭连接coon.close()return datadef oracle_con():# 调用mysql数据库中的结果data = mysql_con()# 创建ORACLE数据库连接conn = cx_Oracle.connect('用户名', '密码', 'IP:端口/服务名')# 获取操作游标cursor = conn.cursor()# 添加数据sqlsql_ora = "INSERT INTO ONE_HUNDRED_MILLION (TIMENOW, QQ, WW, EE, RR, TT, YY) VALUES (:1,:2,:3,:4,:5,:6,:7)"# 需要执行sql语句,将时间类型固定sql_ora_1 = "alter session set nls_timestamp_format='YYYY-MM-DD HH24:MI:SS.FF'"cursor.execute(sql_ora_1)conn.commit()# 设置并发数量batch_size = 100000# 使用循环执行SQL语句for i in range(0, len(data), batch_size):try:cursor.executemany(sql_ora, data[i:i + batch_size])except Exception as e:print(e)# 提交SQL语句conn.commit()# 关闭连接,释放资源cursor.close()conn.close()print("oracle数据写入成功:" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))if __name__ == '__main__':for j in range(1, 500): # 每次执行二十万条print("第{0}次开始时间:".format(j) + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))oracle_con()
性能优化
使用threading函数,进行并发执行,使用分隔,将500个并发分隔为20个线程一组(由于数据库的断开和重连会消耗时间,需要联系运维将数据库的连接时间设置长一点)
import threading
from datetime import datetimedef thread_func():print("我是一个函数",datetime.now())#将之分为20个线程为一组
def thread_func_20():for i in range(20):thread_func()#500个线程分为25组20个
def many_thread():for i in range(25):t = threading.Thread(target=thread_func_20())t.start()if __name__ == '__main__':start = datetime.today().now()many_thread()end = datetime.today().now() - startprint(end)
其他几个点
安装cx_oracle包需要Python3.10以下的版本;
执行cx_oralce包需要配置Oracle环境,配置了环境变量,但还是找不到Oracle包,只有在代码中指定oracle文件位置;
# 指定Oracle文件位置
cx_Oracle.init_oracle_client(lib_dir=r"E:\awork\rj\oracleeee\instantclient_19_24")
