循序渐进丨使用 Python 向 MogDB 数据库批量操作数据的方法

news/2024/5/5 0:09:48

当我们有时候需要向数据库里批量插入数据,或者批量导出数据时,除了使用传统的gsql copy命令,也可以通过Python的驱动psycopg2进行批量操作。本文介绍了使用psycopg2里的executemany、copy_from、copy_to、copy_expert等方式来批量操作 MogDB 数据库里的数据的方法。

outside_default.png

安装psycopg2驱动

适配 MogDB 的psycopg2可以从以下链接下载:

https://www.mogdb.io/downloads/psycopg2/all

下载之前,先确定所用的Python版本:

python3 --version
Python 3.7.9

目前只支持Python 3.6以上版本:

8760cbf461f04f2ff00afb4b5007940e.png

下载之后,将文件上传到运行程序的主机上,并运行以下命令:

python3 -m pip install psycopg*.whl

当然,如果该主机可以联通互联网,也可以直接从以下链接获取下载地址:https://www.mogdb.io/downloads/psycopg2/all 

396e6482dc40bab9c1342474cbf6f5f9.png

不用真正下载,而是直接运行(其中url对应实际Python版本的url):

python3 -m pip install https://cdn-mogdb.enmotech.com/drivers/python/psycopg2-whl/5.0.0.3/psycopg2-5.0.0.3-cp37-cp37m-linux_aarch64.whl

数据库端创建测试用户和测试表:

gsql
create user pytest sysadmin password 'Enmotech@123';
create table test_psycopg2(id int,source varchar(32), create_time date,name varchar(32),val int);

outside_default.png

建立连接并创建cursor对象

Python中:

import psycopg2
conn=psycopg2.connect(database="postgres",user="pytest",password="Test@123",host="127.0.0.1",port=26000)
cur=conn.cursor()
conn.close()

参数就不用介绍含义了,一目了然。

outside_default.png

使用execute进行逐条插入

execute可用于逐条插入,它是cursor中的一个方法,接收的参数为语句+一个可索引的列表,列表中每一列对应语句中的 %s.

cursor.execute(sqltext,var_List)

假设现在我们需要往数据库中插入10000条随机数据,可以循环调用execute:

import psycopg2
import datetime
import random
import timeconn=psycopg2.connect(database="postgres",user="pytest",password="Test@123",host="127.0.0.1",port=26000)
cur=conn.cursor() 
currtime=datetime.datetime.now()
data1=[(id,"execute",(currtime+datetime.timedelta(minutes=id)).strftime('%Y-%m-%d %H:%M:%S'),"name"+str(id),random.randint(0,100000)) for id in range(1,10001)];start=time.perf_counter()
for row in data1:cur.execute ("insert into test_psycopg2 values(%s,%s,%s,%s,%s)",row); #此处有5个占位符,对应的row中有5列end=time.perf_counter();
print(f"use execute to insert 10000 rows, cost {int((end-start)*1000)} ms");
conn.commit()

26002c3dbeab15a5a6eefa1ef49e76ad.png

outside_default.png

使用executemany进行批量插入

executemany可用于批量插入,它是cursor中的一个方法,接收的参数为语句+一个嵌套了可索引的列表的列表,列表中每一行对应语句中的 %s.

cursor.executemany(sqltext,var_List)

假设现在我们需要往数据库中插入10000条随机数据,可以调用一次executemany:

import psycopg2
import datetime
import random
import time
conn=psycopg2.connect(database="postgres",user="pytest",password="Test@123",host="127.0.0.1",port=26000)
cur=conn.cursor()
currtime=datetime.datetime.now()
data2=[(id,"executemany",(currtime+datetime.timedelta(minutes=id)).strftime('%Y-%m-%d %H:%M:%S'),"name"+str(id),random.randint(0,100000)) for id in range(1,10001)];
start=time.perf_counter()
cur.executemany("insert into test_psycopg2 values(%s,%s,%s,%s,%s)",data2);
end=time.perf_counter();
print(f"use executemany to insert 10000 rows, cost {int((end-start)*1000)} ms");
conn.commit()

14df9a64ed6c39a5276e5b2ebf6b4b3a.png

从执行时间看executemany和execute相差无几,并没有使用类似于JDBC驱动中的批量绑定功能。

outside_default.png

使用copy_from(文件)从文件中批量插入

无论是execute还是executemany,性能都不够理想。如果对性能有要求,批量插入的时候,应该使用copy相关的方法,如copy_from。

copy_from接收多个参数,主要是前面的4个参数:

copy_from(input_file, table_name, sep, columns)

其中第一个参数设计上是文件句柄,但是也可以用其他带有read()和readline()方法的对象来代替。

第三个参数则是分隔符,因为使用copy时,需要自己先按行把数据拼好的,列之间用分隔符来分隔,通常可以使用"tab"或者",". 当然,这需要考虑你的数据中会不会包含这两个字符,避免二义性。

第四个参数则是列名的列表,如果不提供,则是所有列,但是要注意列顺序。

假设现在我们需要往数据库中插入10000条随机数据,可以先生成一个文件,再调用copy_from(文件):

import psycopg2
import datetime
import random
import time
from io import StringIO
conn=psycopg2.connect(database="postgres",user="pytest",password="Enmotech@123",host="127.0.0.1",port=26000)
cur=conn.cursor()
currtime=datetime.datetime.now()
tmpfile=open("test_psycopg2_from.csv","w");
filesize=0
for id in range(1,10001):filesize=filesize+tmpfile.write(f"{id},copy_from_File,{(currtime+datetime.timedelta(minutes=id)).strftime('%Y-%m-%d %H:%M:%S')},name{id},{random.randint(0,100000)}\n")tmpfile.close()tmpfile2=open("test_psycopg2_from.csv","r");start=time.perf_counter()
cur.copy_from(tmpfile2,"test_psycopg2",sep=",");
end=time.perf_counter();
print(f"use copy_from(File) to insert 10000 rows, cost {int((end-start)*1000)} ms");
conn.commit()
tmpfile.close()

af73baac17c8d4ce6238a9581fd93715.png

19ms,相比execute和executemany,有百倍的提升。

outside_default.png

使用copy_from(StringIO)进行批量插入

对于在内存中处理的数据,专门放到文件系统再取出来显然没有必要,因此,copy_from第一参数可以使用StringIO这个模块来模拟内存中的一个类文件对象(关于StringIO,限于篇幅,不展开解释)。

假设现在我们需要往数据库中插入10000条随机数据,可以把记录按行写到StringIO对象,再调用copy_from(StringIO):

import psycopg2
import datetime
import random
import time
from io import StringIO
conn=psycopg2.connect(database="postgres",user="pytest",password="Enmotech@123",host="127.0.0.1",port=26000)
cur=conn.cursor()
currtime=datetime.datetime.now()
tmpfile=StringIO("");
filesize=0
for id in range(1,10001):filesize=filesize+tmpfile.write(f"{id},copy_from_StringIO,{(currtime+datetime.timedelta(minutes=id)).strftime('%Y-%m-%d %H:%M:%S')},name{id},{random.randint(0,100000)}\n")tmpfile.seek(0)
start=time.perf_counter()
cur.copy_from(tmpfile,"test_psycopg2",sep=",");
end=time.perf_counter();
print(f"use copy_from(StringIO) to insert 10000 rows, cost {int((end-start)*1000)} ms");
conn.commit()

2b577bd0d528a3a9e840bf49f4de288d.jpeg

同样19ms。

这里面需要注意的是,StringIO写完了(write)之后,必须调用seek(0)回到文件头,否则,相当于从文件末端开始,是无法完成copy的操作的。

outside_default.png

使用copy_to(文件)导出到文件

有进就有出,pyscopg2里面也支持把表copy到文件中。

copy_to接收多个参数,主要是前面的4个参数。

copy_to(out_file, table_name, sep, columns)

其中第一个参数设计上是文件句柄,但是也可以用其他带有write()方法的对象来代替。

第三个参数则是分隔符。

第四个参数则是列名的列表,如果不提供,则是所有列。

假设现在我们把表导出成本地文件,可以直接调用copy_to(文件):

import psycopg2
import time
conn=psycopg2.connect(database="postgres",user="pytest",password="Enmotech@123",host="127.0.0.1",port=26000)
cur=conn.cursor()
currtime=datetime.datetime.now()
tmpfile=open("test_psycopg2_to.csv","w");
start=time.perf_counter()
cur.copy_to(tmpfile,"test_psycopg2",sep=",");
end=time.perf_counter();
print(f"use copy_to(File) to dump tables, cost {int((end-start)*1000)} ms");
tmpfile.close()

aed001013d8a9217e774bcf5d218f2de.png

58ms,不过表里面目前已经有4万条记录。

outside_default.png

使用copy_to(StringIO)导出到内存对象

同理,也可以把表导出到StringIO的内存对象中,调用copy_to(StringIO),如下所示:

import psycopg2
import time
from io import StringIO
conn=psycopg2.connect(database="postgres",user="pytest",password="Enmotech@123",host="127.0.0.1",port=26000)
cur=conn.cursor()
currtime=datetime.datetime.now()
tmpfile=StringIO("");
start=time.perf_counter()
cur.copy_to(tmpfile,"test_psycopg2",sep=",");
end=time.perf_counter();
print(f"use copy_to(StringIO) to dump tables, cost {int((end-start)*1000)} ms");
print(f"filelength:{len(tmpfile.getvalue())}")
conn.commit()
tmpfile.close()

db138d9fcf2554110fbec412120ecb2b.png

outside_default.png

使用copy_expert()进行复杂copy操作

gsql的copy命令其实提供了很多复杂的功能,但显然copy_from/copy_to无法全部做到,比如,从表中根据条件过滤出需要的数据并进行copy。

因此,除了copy_from/copy_to, pyscopg2提供了更专业的copy_expert函数,更接近原始的copy命令,它接收两个参数,第一个是SQL语句,第二个是类文件句柄, 你可以认为是把它嫁接给了gsql里面的STDIN/STDOUT。

copy_export(sql,file_handle)

假设现在我们需要把之前的source为"copy_from_File"的部分取出来,然后,对表的数据做一定运算,还希望带表header, 可以调用copy_expert,进行精细调控:

import psycopg2
import time
conn=psycopg2.connect(database="postgres",user="pytest",password="Enmotech@123",host="127.0.0.1",port=26000)
cur=conn.cursor()
currtime=datetime.datetime.now()
tmpfile=open("test_psycopg2_copy_expert.csv","w");
start=time.perf_counter()
cur.copy_expert("copy (select *,sqrt(val) val2 from test_psycopg2 test_psycopg2 where source = 'copy_from_File') to STDIN with(format 'csv', header on ) ",tmpfile);
end=time.perf_counter();
print(f"use copy_expert() to dump tables, cost {int((end-start)*1000)} ms");
conn.commit()
tmpfile.close()

f6da111643db536a8aa64397ebfc7400.png

看看导出效果,没问题:

81ba0d4363eecca0c134569f8077af58.png

outside_default.png

参考文档

  • https://docs.mogdb.io/zh/mogdb/v5.0/1-psycopg-based-development

  • https://www.psycopg.org/docs/cursor.html

关于作者

罗海雄,云和恩墨数据库研发架构师,性能优化专家,2012年 ITPUB 全国SQL大赛冠军。他拥有超十年企业级系统设计与优化经验,对SQL优化理解尤其深入,曾服务于甲骨文公司。

f03719b383c77a81f33e6c4955117909.gif

数据驱动,成就未来,云和恩墨,不负所托!


云和恩墨创立于2011年,是业界领先的“智能的数据技术提供商”。公司总部位于北京,在国内外35个地区设有本地办公室并开展业务。

云和恩墨以“数据驱动,成就未来”为使命,致力于将创新的数据技术产品和解决方案带给全球的企业和组织,帮助客户构建安全、高效、敏捷且经济的数据环境,持续增强客户在数据洞察和决策上的竞争优势,实现数据驱动的业务创新和升级发展。

自成立以来,云和恩墨专注于数据技术领域,根据不断变化的市场需求,创新研发了系列软件产品,涵盖数据库、数据库存储、数据库云管和数据智能分析等领域。这些产品已经在集团型、大中型、高成长型客户以及行业云场景中得到广泛应用,证明了我们的技术和商业竞争力,展现了公司在数据技术端到端解决方案方面的优势。

在云化、数字化和智能化的时代背景下,云和恩墨始终以正和多赢为目标,感恩每一位客户和合作伙伴的信任与支持,“利他先行”,坚持投入于数据技术核心能力,为构建数据驱动的智能未来而不懈努力。

我们期待与您携手,共同探索数据力量,迎接智能未来。

5dc8bf55ca34ffbfa93bf5dfad3641ab.gif


http://www.mrgr.cn/p/24567835

相关文章

2024 年 GPLT 团体程序设计天梯赛(个人感受 + 题解)

前言 去年第一次参加天梯赛,拿了 \(158\) 分,没有个人奖,团队也差点打铁(最后应该是递补省三)。 今年天梯赛拿了 \(224\) 分,幸运地拿了个人国二。这次担任的队长,团队也拿到了国二,感谢队友们的努力付出。 今年好像很多人断网,不过没有发生在我身上哈哈哈(也许是因为…

2024年GPLT团体程序设计天梯赛(个人感受 + 题解)

前言 去年第一次参加天梯赛,拿了 \(158\) 分,没有个人奖,团队也差点打铁(最后应该是递补省三)。 今年天梯赛拿了 \(224\) 分,幸运地拿了个人国二。这次担任的队长,团队也拿到了国二,感谢队友们的努力付出。 今年好像很多人断网,不过没有发生在我身上哈哈哈(也许是因为…

【论文笔记】设计一款针对情境障碍的视力减弱型文件浏览应用程序(下)

论文关键点 ps:这篇文章可以学习的内容比较多,笔记内容也比较丰富因此本次论文笔记会分为上下期 提出的问题: 不方便视觉障碍的情况下(通勤、走路、眼睛疲劳也算)会用text-speech 系统,但是这种听觉是线性的,不适合skim 推导出了融合听觉和…

基于Google Gemini 探索大语言模型在医学领域应用评估和前景

概述 近年来,大规模语言模型(LLM)在理解和生成人类语言方面取得了显著的飞跃,这些进步不仅推动了语言学和计算机编程的发展,还为多个领域带来了创新的突破。特别是模型如GPT-3和PaLM,它们通过吸收海量文本…

vue封装请求、合并js、合并多个js

vue封装请求、合并js、合并多个js 作为一个后端开发,写前端时发现,每次导入api接口都会有一堆代码,像下面这样: import {footprintList, footprintDelete} from /api/userApi.js import {addressList} from /api/userApi.js impor…

四月二十五日 Android studio关于使用sqlite数据库

昨天早上六点就起来要去排队考科目一,实在是困得很,昨天晚上早早就睡了,其实解释为什么昨天没有博客。 一个好消息就是我顺利的考过了,刚到90,还是很惊险。 还是说一下最近在干什么,之前是一直用的MySQL连接我的Android studio,最近在学习使用它自带的一个sqlite数据库,…

双向循环链表的插入和删除

数据结构 线性表--双向循环链表操作 ~~**注意!!!**怎么说,今天(2024.4.24)找一个小小的运行bug(没有报错)找了非常之久,明天继续把这些代码补齐,啊啊啊,但是感谢还是把这个bug找出来(这段话我不会删的)~~插入 1)头插/******************************************…

前端零代码开发实践:页面嵌套+逻辑连线0开发扩展组件,实现切换开关控制扇叶转动。能无代码封装扩展组件,有别于常规的web组态或低代码平台

前言: 官网:http://www.uiotos.net/ 什么是 UIOTOS? 这是一款拥有独创专利技术的前端零代码工具,专注于解决前端界面开发定制难题,原型即应用!具有页面嵌套、属性继承、节点连线等全新特性,学习门槛低…

Ubuntu Pycharm安装

下载PyCharm,https://www.jetbrains.com/pycharm/download/?sectionlinux 然后按照下图执行安装: 安装的时候可能出现的问题: 问题1:No JDK found. Please validate either PYCHARM_JDK, JDK_HOME or JAVA_HOME environment var…

div探索1

当我在div里加了文本内容,嗯?好家伙所?以你是根据文本行高,定点在内容的顶部边缘,然后往下延伸的????想在顶部写一个水平导航来着,结果文字标签的垂直方向怎么也不能居中。 试验了一下,空的div在inline-block的时候是一字排开的,外在显示效果是长板效应,谁高谁撑开…

为什么要使用分账系统

不少现实生活中的场景都可能需要结合分账这一模式来实现结算,这篇文章里,作者就简单讲述了分账适用的场景,以及平台型企业选择分账系统的5个问题,一起来看看吧,或许会帮助你更了解分账系统的重要性。举个例子:在传统收单中,客户消费十块,这十块就直接结算给商家,这就结…

MSE实现全链路灰度实践

技术架构包括以下基础设施和云服务: 1个地域:ACK集群、微服务应用、MSE实例均部署在同一地域下。 1个专有网络VPC:形成云上私有网络,确保核心云资源的网络环境,如容器服务ACK、微服务引擎MSE。 ACK集群:简单…

壁纸测试

本文来自博客园,作者:舟清颺,转载请注明原文链接:https://www.cnblogs.com/zqingyang/p/18156634

文献学习-37-动态场景中任意形状针的单目 3D 位姿估计:一种高效的视觉学习和几何建模方法

On the Monocular 3D Pose Estimation for Arbitrary Shaped Needle in Dynamic Scenes: An Efficient Visual Learning and Geometry Modeling Approach Authors: Bin Li,† , Student Member, IEEE, Bo Lu,† , Member, IEEE, Hongbin Lin, Yaxiang Wang, Fangxun Zhong, Me…

第二期书生浦语大模型训练营第四次笔记

大模型微调技术 大模型微调是一种通过在预训练模型的基础上,有针对性地微调部分参数以适应特定任务需求的方法。 微调预训练模型的方法 微调所有层:将预训练模型的所有层都参与微调,以适应新的任务。 微调顶层:只微调预训练模型…

Net8微服务之Consul、Ocelot、IdentityServer4

前言 情绪的尽头是沉默 1.微服务概念 1.1微服务发展 分布式解决性能问题,微服务解决维护性、扩展性、灵活性。1.2微服务概念 微服务(或称微服务架构),是一种现代化的软件架构方法,它将一个应用程序分解为多个小型、独立的服务单元,每个服务都负责特定的业务功能,并且可以独…

后台管理系统加水印(react)

效果 代码图片 代码 window.waterMark function (config) {var defaultConfig {content: 我是水印,fontSize: 16px,opacity: 0.3,rotate: -15,color: #ADADAD,modalId: J_waterMarkModalByXHMAndDHL,};config Object.assign({}, defaultConfig, config);var existMarkModal…

Net8微服务实战

前言 学习杨中科老师开源项目在线英语网站微服务 1.需求 服务拆分2.项目源码项目 类 说明Peng.ASPNETCore DistributedCacheHelper 分布式缓存帮助类MemoryCacheHelper 内存缓存帮助类UnitOfWorkFilter 工作单元筛选器Peng.Commons Validators文件夹 FluentValidation的扩展类L…

OpenCV 如何实现边缘检测器

返回:OpenCV系列文章目录(持续更新中......) 上一篇:OpenCV如何实现拉普拉斯算子的离散模拟 下一篇 :OpenCV系列文章目录(持续更新中......) 目标 在本教程中,您将学习如何: 使用 OpenCV 函数…

java实现wav的重采样

原因是之前写的TTS文件,需要指定采样率和单声道 但是TTS是用的Jacob调用COMsapi实现的 javaWNI10JACOB方式 SAPI底层支持的是C,C#【官方文档】 SpAudioFormat SetWaveFormatEx method (SAPI 5.4) | Microsoft Learn 用C实现的方式【可指定输出的WAV…