Python3操作redis百万级数据迁移,单机到单机,集群到集群

news/2024/5/13 16:59:41

Python3操作redis大量数据迁移 脚本

  • 背景
  • 使用前
  • 使用注意事项
  • 脚本
  • 总结

背景

之前写过一个用python迁移redis的脚本,但是对于小批量的数据、单节点的,还可以用。对于大量数据,集群模式的就不太适合了。所以写了下面的脚本,而且做了一定的优化,使用的pipeline和多线程,保证了迁移数据的速度,本人测试,大概2分钟复制了110万键值对的数据,差不多是每秒一万键值对的复制速度。

使用前

注意:

  1. 用的是Python3环境,Python2的大概需要改一下print输出
  2. 安装相关的模块
pip install redis rediscluster
  1. 可以在windows、linux环境下使用,注意修改里面的一些设置

使用注意事项

下面是一些需要注意的:

  1. 下面的脚本,如果redis数据超过500万键值对,很可能会有瓶颈,因为是一次性取的redis的所有键组成列表,列表很大,大概率会有阻塞。这样就不建议这种迁移方式了
  2. 下面的脚本,是建立在新的redis实例是空数据,或者说没有与原redis实例键值重复的情况下,要不然会重写。
  3. 下面的脚本,是迁移了所有键值对,没有做一些键的匹配,不过改起来不复杂
  4. 下面的脚本,多线程和批量提交的参数,如有必要可以改一下,对比测试。包括一些redis实例的参数配置也一样,如有必要可以改。
  5. 下面的脚本,单机到集群等各种迁移模式,要根据实际情况进行修改。
  6. 这个脚本,没办法实现实时复制,如果原redis数据库一直有增量数据,而且比较大,最好用其他方式迁移。因为脚本读取的redis Key值列表,只是那一个时间的,新库老库数据会有几分钟的差异。如果是把redis当做持久化库而不是缓存库的情况,也不适合。

暂时就这些了。

脚本

内容如下,根据实际情况进行调整

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# 2024/4/24from datetime import datetime
import time
import threading
import redis
from rediscluster import RedisClusterdef split_list(big_list, num=1):"""原来是[1,2,3,4,5,6]的列表,拆分成[[1,2], [3,4], [5,6]]小列表,主要是为了多线程:param big_list: 大列表:param num: 拆分多少个列表,这个主要对应后面的线程数,或者说redis的连接数,不能设置的太大,否则会报错Too many connections:return: 新的列表"""list_len = len(big_list)new_list = []if list_len > num:if list_len % num == 0:small_list_len = list_len // numelse:small_list_len = list_len // num + 1start = 0for i in range(num):# print(i)new_list.append(big_list[start: start + small_list_len])start += small_list_lenelse:new_list.append(big_list)return new_listdef redis_get_set(redis_source, redis_target, redis_list,  batch_size=100):"""读取redis“键”列表,获取Key/Value值,写入到新的redis:param redis_source: 原redis实例:param redis_target: 新redis实例:param redis_list: 要迁移的redis Key值列表:param batch_size: 使用pipeline写入新的redis实例,提高写入效率:return:"""count = 0with redis_target.pipeline() as pipe:for k in redis_list:data_type = redis_source.type(k)# 判断key值数据类型,分别处理,没有stream数据类型的处理,后面有必要再添加if data_type == 'string':v = redis_source.get(k)redis_target.set(k, v)elif data_type == 'list':v = redis_source.lrange(k, 0, -1)redis_target.lpush(k, *v)elif data_type == 'set':v = redis_source.smembers(k)redis_target.sadd(k, *v)elif data_type == 'hash':fields = redis_source.hgetall(k)pipe.hset(k, mapping=fields)elif data_type == 'zset':v = redis_source.zrange(k, 0, -1, withscores=True)# 需要将元组数据转化为字典数据redis_target.zadd(k, dict(v))else:print('not known type')count += 1# 如果数据量较大,循环batch_size次数后提交一次if count % batch_size == 0:print(f'\n当前时间:{datetime.now()},进程:{threading.current_thread()},已完成{count}对读写操作')pipe.execute()pipe.reset()# 最后再提交一次pipelinepipe.execute()pipe.reset()print(f'\n当前时间:{datetime.now()},进程:{threading.current_thread()},已完成所有读写操作!')def redis_copy(redis_source, redis_target, thread_num=5, batch_size=100):"""将原始redis的Key值大列表进行拆分,然后拆分后的列表进行多线程处理:param redis_source: 原redis实例:param redis_target: 新redis实例:param thread_num: 线程数,将大列表拆分为几个小列表,这个数不要太大,一般10个就行,不然程序会报错:param batch_size::return:"""# 检查两个redis是否可用try:redis_source.ping()redis_target.ping()print("Redis节点可连接")except Exception as e:print(f"连接Redis失败: {e}")redis_target = None# 线程列表threads = []if redis_target:new_list = split_list(redis_source.keys('*'), thread_num)for data in new_list:t = threading.Thread(target=redis_get_set, args=(redis_source, redis_target, data, batch_size))threads.append(t)t.start()for t in threads:t.join()print("所有线程执行完毕")def single_to_single(thread_num, batch_size):"""单节点迁移到单节点"""# 原始redis,单节点source_pool = redis.ConnectionPool(host='192.168.10.1',port=6379,db=0,password='123456',encoding='utf-8',decode_responses=True,socket_timeout=10,max_connections=100)redis_source = redis.Redis(connection_pool=source_pool)# 目标redis,单节点target_pool = redis.ConnectionPool(host='192.168.10.2',port=6369,db=0,password='123456',encoding='utf-8',decode_responses=True,socket_timeout=10,max_connections=100)redis_target = redis.Redis(connection_pool=target_pool)redis_copy(redis_source, redis_target, thread_num=10, batch_size=10000)def single_to_cluster(thread_num, batch_size):"""单节点迁移到单节点"""# 原始redis,单节点source_pool = redis.ConnectionPool(host='192.168.10.1',port=6379,db=0,password='123456',encoding='utf-8',decode_responses=True,socket_timeout=10,max_connections=100)redis_source = redis.Redis(connection_pool=source_pool)# 目标redis,集群target_node_list = [{"host": "192.168.11.1", "port": "6379"},{"host": "192.168.11.2", "port": "6379"},{"host": "192.168.11.3", "port": "6379"},{"host": "192.168.11.4", "port": "6379"},{"host": "192.168.11.5", "port": "6379"},{"host": "192.168.11.6", "port": "6379"},]# 创建RedisCluster的实例# decode_responses设置为True会自动将响应数据解码为utf-8编码的字符串redis_cluster_target = RedisCluster(startup_nodes=target_node_list,decode_responses=True,password='123456')redis_copy(redis_source, redis_cluster_target, thread_num=10, batch_size=10000)def cluster_to_single(thread_num, batch_size):"""集群迁移到集群"""# 原始redis,集群source_node_list = [{"host": "192.168.0.1", "port": "6379"},{"host": "192.168.0.2", "port": "6379"},{"host": "192.168.0.3", "port": "6379"},{"host": "192.168.0.4", "port": "6379"},{"host": "192.168.0.5", "port": "6379"},{"host": "192.168.0.6", "port": "6379"},]# 创建RedisCluster的实例# decode_responses设置为True会自动将响应数据解码为utf-8编码的字符串redis_cluster_source = RedisCluster(startup_nodes=source_node_list,decode_responses=True,password='123456')# 目标redis,单节点target_pool = redis.ConnectionPool(host='192.168.10.2',port=6369,db=0,password='123456',encoding='utf-8',decode_responses=True,socket_timeout=10,max_connections=100)redis_target = redis.Redis(connection_pool=target_pool)redis_copy(redis_cluster_source, redis_target, thread_num=10, batch_size=10000)def cluster_to_cluster(thread_num, batch_size):"""集群迁移到集群"""# 原始redis,集群source_node_list = [{"host": "192.168.0.1", "port": "6379"},{"host": "192.168.0.2", "port": "6379"},{"host": "192.168.0.3", "port": "6379"},{"host": "192.168.0.4", "port": "6379"},{"host": "192.168.0.5", "port": "6379"},{"host": "192.168.0.6", "port": "6379"},]# 创建RedisCluster的实例# decode_responses设置为True会自动将响应数据解码为utf-8编码的字符串redis_cluster_source = RedisCluster(startup_nodes=source_node_list,decode_responses=True,password='123456')# 目标redis,集群target_node_list = [{"host": "192.168.11.1", "port": "6379"},{"host": "192.168.11.2", "port": "6379"},{"host": "192.168.11.3", "port": "6379"},{"host": "192.168.11.4", "port": "6379"},{"host": "192.168.11.5", "port": "6379"},{"host": "192.168.11.6", "port": "6379"},]# 创建RedisCluster的实例# decode_responses设置为True会自动将响应数据解码为utf-8编码的字符串redis_cluster_target = RedisCluster(startup_nodes=target_node_list,decode_responses=True,password='123456')redis_copy(redis_cluster_source, redis_cluster_target, thread_num=10, batch_size=10000)if __name__ == '__main__':# 性能与效率控制# 线程数thread_num = 10# 写入批量提交数batch_size = 10000start_time = time.perf_counter()# 单节点迁移到单节点single_to_single(thread_num, batch_size)# 单节点迁移到集群# single_to_cluster(thread_num, batch_size)# 集群迁移到单节点# cluster_to_single(thread_num, batch_size)# 集群迁移到集群# cluster_to_cluster(thread_num, batch_size)end_time = time.perf_counter()# 计算执行时间execution_time = end_time - start_timeprint(f"代码执行时间: {execution_time} 秒")

总结

上面的代码,为了优化性能,改了好几次。刚开始的时候,50万键值对数据(5个数据类型各10万左右),迁移复制大概需要300s-400s左右,平均每秒钟大约复制1300-1700的键值对,经过多次优化,平均每秒钟大约复制9000的键值对,提升了6-7倍左右。

优化思路:

  1. 使用pipeline,批量提交键值对到新的库
  2. 在数据量比较大的情况下,原redis的键整体取出后,是一个比较大的列表,先将这个大列表拆分为较为平均的几个小列表,然后使用多线程分别对小列表数据读写,可能大大提高读写效率。但是线程数也没必要设置太高,一个是redis连接数有限制,还有一个是我试了一下,比如从10提升到20,读写速度提升的不是太明显。

其它思考:
1 还能进行哪些优化呢?我看有些商业软件能做到每秒钟10万级别KV的复制,想不出来怎么做的。


http://www.mrgr.cn/p/58272870

相关文章

webpack 打包优化 - splitChunks

打包时会遇到的问题: 打包文件过大,首屏加载时间过长,js阻塞页面渲染导致白屏改动业务代码后,对于第三方库也会一并重新打包到一个出口文件,浏览器无法利用缓存来减少请求和加载的时间 针对以上两个问题,…

03-修饰符-监听属性-发送Ajax请求-生命周期钩子

事件修饰符事件修饰符 作用.stop 只处理自己的事件,父控件冒泡的事件不处理(阻止事件冒泡),一般用在子元素类上.self 只处理自己的事件,子控件冒泡的事件不处理,一般用在父元素上.prevent 阻止a连接的跳转.once 事件只会触发一次(适用于抽奖页面)使用修饰符时,顺序很重…

attempt to compare nil with number -- 黑马点评出现问题

问题情况 : 主要问题 : 调用lua执行redis时,有一个值会接受nil(因为redis中没有该数据)或者数值,当该值为nil时执行报错,因为会用到将该值与其他数字比较,故报错attempt to compare nil with number 当然…

数据结构 - 队列 [动画+代码注释超详解],萌新轻松上手!!!

一. 队列的概念 队列是一种特殊的线性表,用于存储元素,并且按照先进先出(First In First Out)的顺序进行管理,这意味着最先加入队列的元素将会是最先从队列中被移除的元素 队列的原型:只允许在一端进行插入数据的操作&#xff0c…

【项目实战】基于高并发服务器的搜索引擎

【项目实战】基于高并发服务器的搜索引擎 目录 【项目实战】基于高并发服务器的搜索引擎搜索引擎部分代码index.htmlindex.hpplog.hppparser.cc(用于对网页的html文件切分且存储索引关系)searcher.hpputil.hpphttp_server.cc(用于启动服务器和…

python作业 切片逆转

题目: (反转显示一个整数)编写下面的函数,反向显示一个整数。 列如:reserse(3456)。编写一个测试程序,提示用户输入一个整数,然后显示它的反向数。 第一步定义一个函数: def rev…

区块链安全应用------压力测试

测试要求: 1. 对以下AccountManager智能合约进行压测(基础要求set函数测试,balanceOf涵为20分加分项)2. 在本地链进行测试,需要监控本地进程的资源使用情况。每个进程的multiOutput属性为Avg3. 需要将每一个更改的配置文件截图,和…

vscode 打代码光标特效

vscode 打代码光标特效 在设置里面找到settings 进入之后在代码最下方加入此代码 "explorer.confirmDelete": false,"powermode.enabled": true, //启动"powermode.presets": "fireworks", // 火花效果// particles、 simple-rift、e…

鸿蒙内核源码分析(任务调度篇) | 任务是内核调度的单元

任务即线程 在鸿蒙内核中,广义上可理解为一个任务就是一个线程 官方是怎么描述线程的 基本概念 从系统的角度看,线程是竞争系统资源的最小运行单元。线程可以使用或等待CPU、使用内存空间等系统资源,并独立于其它线程运行。 鸿蒙内核每个…

Rust序列化和反序列化

Rust 编写python 模块 必备库 docker 启动 nginx 服务 NGINX 反向代理配置

蛋糕购物商城

蛋糕购物商城 运行前附加数据库.mdf(或使用sql生成数据库) 登陆账号:admin 密码:123456 修改专辑价格时去掉¥以及上传专辑图片 c#_asp.net 蛋糕购物商城 网上商城 三层架构 在线购物网站,电子商务系统 …

【Godot4.2】自定义Todo清单类 - myTodoList

概述 在写myList类的时候,就想到可以写一个类似的Todo清单类。 基础思路 本质还是在内部维护一个数组,在其基础上进行增删改查操作的封装为了方便存储数据,编写一个自定义内置类TodoItem,内部数组就变成了Array[TodoItem]类型的…

【前端开发基础知识快速入门】

前端开发基础知识&快速入门 一、VSCode 使用1.1 安装常用插件1.2 创建项目1.3 创建网页1.4 运行效果二、ES62.1 简介2.2 什么是 ECMAScript2.3 ES6 新特性2.3.1 let 声明变量2.3.2 const 声明常量(只读变量)2.3.3 解构表达式2.3.4 字符串扩展2.3.5 函数优化2.3.6 对象优化…

uniapp的bug们

1 uni-icons标签的数据绑定有问题 <uni-icons type="contact" class=" icon" size="25"></uni-icons><view>{{user_msg}}</view> <!-- 之所以不把{{user_msg}}写在uni-icons标签之内,是因为,uni-icons有一个bug,它不…

nvm基本使用

nvm基本使用 文章目录 nvm基本使用1.基本介绍2.下载地址3.常用指令 1.基本介绍 NVM是一个用于管理 Node.js 版本的工具。它允许您在同一台计算机上同时安装和管理多个 Node.js 版本&#xff0c;针对于不同的项目可能需要不同版本的 Node.js 运行环境。 NVM 主要功能&#xff…

easyExcel快速入门

目录 &#x1f9c2;1.简单介绍 &#x1f32d;2.快速入门 &#x1f953;1.导入依赖 &#x1f37f;2.导出到excel &#x1f38f;3.读入数据 &#x1f389;4.下载 1.简单介绍 传统操作Excel大多都是利用Apach POl进行操作的,但是POI框架并不完善,使用过程非常繁琐且有较多…

C语言学习/复习36

一、程序的环境与预处理 二、翻译环境与执行环境 三、运行环境 四、预编译(预处理)详解

docker 基本命令

目录 一、docker 镜像操作命令 1.1.查询软件镜像 1.2.docker pull&#xff1a;下载镜像 1.3.docker push&#xff1a;上传镜像 1.4.docker images&#xff1a;查看本地镜像 1.5.docker inspect &#xff1a;获取镜像详细信息 1.6.docker tag&#xff1a;添加镜像标签 …

spring boot3单模块项目工程搭建-上(个人开发模板)

⛰️个人主页: 蒾酒 &#x1f525;系列专栏&#xff1a;《spring boot实战》 目录 写在前面 上文衔接 常规目录创建 common目录 exception.handle目录 result.handle目录 controller目录 service目录 mapper目录 entity目录 test目录 写在最后 写在前面 本文…

url规则

uniapp的url最后加了斜杠,如同,就不能生效了. 与之相对的django必须最后加斜杠,否则报错