当前位置: 首页 > news >正文

Python批量导入一亿条数据进Oracle数据库

源数据来源

        源数据来源有点复杂,使用iot可视化服务,批量造二十万条数据,通过平台建采集任务,将iot的数据采集到hive数据库中,使用doris集群连接mysql数据库,需要使用多条SQL语句才能查询hive数据库中的数据。

switch hive_catalog_kerberos;REFRESH CATALOG hive_catalog_kerberos PROPERTIES("invalid_cache" = "true");SELECT * FROM test_hive_mrs_db.ods__skyh_root_bigdata_test_ten_million;

        当然也可以不用这种方法,随便找个csv、excel等文件都可以,前提是数据量不少。

使用Python提取MySQL数据库查询到的数据

        首先建立连接。
# 创建MYSQL连接coon = pymysql.connect(host='IP',user='用户名',passwd='密码',port=端口,database='数据库名')
        在Python中实现多SQL执行(将SQL语句分行写下来,依次append进入列表中,使用for循环遍历并commit进行提交)。
# 写SQL语句,将SQL语句依次添加至sql_list列表中sql_list = []sql = "switch hive_catalog_kerberos"sql_list.append(sql)sql = "REFRESH CATALOG hive_catalog_kerberos PROPERTIES('invalid_cache' = 'true')"sql_list.append(sql)sql = "SELECT * FROM test_hive_mrs_db.ods__skyh_root_bigdata_test_ten_million limit 200"sql_list.append(sql)# 创建游标cur = coon.cursor()# 循环SQL列表,执行SQL语句for sql in sql_list:cur.execute(sql)coon.commit()
        将查询到的数据fetchall提取出来(由于提取的为元组,后面写入oracle中需要使用list()将其转换为列表)
# 查询语句查看所有并赋值all_select = cur.fetchall()# 需要将元组转为列表data = list(all_select)

使用Python将MySQL数据库查询到的数据写入oracle数据库中

        首先创建连接
# 创建ORACLE数据库连接conn = cx_Oracle.connect('用户名', '密码', 'IP:端口/服务名')# 获取操作游标cursor = conn.cursor()
         添加数据sql
# 添加数据sqlsql_ora = "INSERT INTO ONE_HUNDRED_MILLION (TIMENOW, QQ, WW, EE, RR, TT, YY) VALUES (:1,:2,:3,:4,:5,:6,:7)"
        写入Oracle数据库中,当含有日期类型的数据,需要执行时间类型固定SQL
# 需要执行sql语句,将时间类型固定sql_ora_1 = "alter session set nls_timestamp_format='YYYY-MM-DD HH24:MI:SS.FF'"cursor.execute(sql_ora_1)conn.commit()
        执行写入SQL语句时,使用for循环,循环次数为mysql提取到的列表的长度,需要设置步长控制性能
# 设置并发数量batch_size = 100000# 使用循环执行SQL语句for i in range(0, len(data), batch_size):cursor.executemany(sql_ora, data[i:i + batch_size])

源码


# -*- coding: utf8 -*-
import cx_Oracle
import os
import pymysql
import time# 指定Oracle文件位置
cx_Oracle.init_oracle_client(lib_dir=r"E:\awork\rj\oracleeee\instantclient_19_24")# 编码转换
os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'def mysql_con():# 引用mysql数据库的数据,插入到oracle数据库# 创建MYSQL连接coon = pymysql.connect(host='IP',user='用户名',passwd='密码',port=端口,database='数据库名')# 写SQL语句,将SQL语句依次添加至sql_list列表中sql_list = []sql = "switch hive_catalog_kerberos"sql_list.append(sql)sql = "REFRESH CATALOG hive_catalog_kerberos PROPERTIES('invalid_cache' = 'true')"sql_list.append(sql)sql = "SELECT * FROM test_hive_mrs_db.ods__skyh_root_bigdata_test_ten_million limit 200000"sql_list.append(sql)# 创建游标cur = coon.cursor()# 循环SQL列表,执行SQL语句for sql in sql_list:cur.execute(sql)coon.commit()# 查询语句查看所有并赋值all_select = cur.fetchall()# 需要将元组转为列表data = list(all_select)print("mysql数据读取成功:" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))# 关闭游标cur.close()# 关闭连接coon.close()return datadef oracle_con():# 调用mysql数据库中的结果data = mysql_con()# 创建ORACLE数据库连接conn = cx_Oracle.connect('用户名', '密码', 'IP:端口/服务名')# 获取操作游标cursor = conn.cursor()# 添加数据sqlsql_ora = "INSERT INTO ONE_HUNDRED_MILLION (TIMENOW, QQ, WW, EE, RR, TT, YY) VALUES (:1,:2,:3,:4,:5,:6,:7)"# 需要执行sql语句,将时间类型固定sql_ora_1 = "alter session set nls_timestamp_format='YYYY-MM-DD HH24:MI:SS.FF'"cursor.execute(sql_ora_1)conn.commit()# 设置并发数量batch_size = 100000# 使用循环执行SQL语句for i in range(0, len(data), batch_size):try:cursor.executemany(sql_ora, data[i:i + batch_size])except Exception as e:print(e)# 提交SQL语句conn.commit()# 关闭连接,释放资源cursor.close()conn.close()print("oracle数据写入成功:" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))if __name__ == '__main__':for j in range(1, 500):     # 每次执行二十万条print("第{0}次开始时间:".format(j) + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))oracle_con()

性能优化

        使用threading函数,进行并发执行,使用分隔,将500个并发分隔为20个线程一组(由于数据库的断开和重连会消耗时间,需要联系运维将数据库的连接时间设置长一点)

import threading
from datetime import datetimedef thread_func():print("我是一个函数",datetime.now())#将之分为20个线程为一组
def thread_func_20():for i in range(20):thread_func()#500个线程分为25组20个
def many_thread():for i in range(25):t = threading.Thread(target=thread_func_20())t.start()if __name__ == '__main__':start = datetime.today().now()many_thread()end = datetime.today().now() - startprint(end)

其他几个点

        安装cx_oracle包需要Python3.10以下的版本;
        执行cx_oralce包需要配置Oracle环境,配置了环境变量,但还是找不到Oracle包,只有在代码中指定oracle文件位置;
# 指定Oracle文件位置
cx_Oracle.init_oracle_client(lib_dir=r"E:\awork\rj\oracleeee\instantclient_19_24")

        


http://www.mrgr.cn/news/2886.html

相关文章:

  • nginx的详细介绍及配置
  • Coze的卡片制作和使用
  • 计算机Java项目|基于SpringBoot的实习管理系统的设计与实现
  • 通讯专题-RS232
  • Hadoop入门基础(三):如何巧妙划分Hadoop集群,全面提升数据处理性能?
  • Matlab处理H5文件
  • 销售crm客户管理软件排名:2024年顶尖的9款
  • linux 定时任务
  • 开始使用 AWS SAM CLI
  • 【漏洞复现】微商城系统 goods SQL注入漏洞
  • 高性能web服务器2——Nginx概述
  • springMVC访问不同位置的静态资源
  • Jmeter基础与概念详解
  • IP报文详解
  • datasets库一些基本方法:filter、map、select等
  • 【秋招笔试】8.18大疆秋招(第三套)-三语言题解
  • SpringSecurity
  • 实习三十:ansible
  • 【中项第三版】系统集成项目管理工程师 | 第 15 章 组织保障
  • C++特殊类设计