go动态创建/增加channel并处理数据

news/2024/5/20 2:15:09

背景描述

有一个需求,大概可以描述为:有多个websocket连接,因此消息会并发地发送过来,这些消息中有一个标志可以表明是哪个连接发来的消息,但只有收到消息后才能建立channel或写入已有channel,在收消息前无法预先创建channel

解决过程(可直接阅读最终版)

初版:直接写入

因为对数据量错误预估(以为数据量不大),一开始我是用的mysql直接写入,每次收到ws消息立即处理,可测试中发现因数据量过多且都会操作同一行数据,出现了资源竞争,导致死锁。

第二版:增加锁

在发现出现数据竞争后,我第一反应是增加读写锁。读写锁的代码类似以下示例:

package mainimport ("database/sql""fmt""sync"_ "github.com/go-sql-driver/mysql"
)var (db *sql.DBmu sync.RWMutex
)func init() {var err errordb, err = sql.Open("mysql", "username:password@tcp(localhost:3306)/dbname")if err != nil {panic(err)}
}func main() {defer db.Close()// 读取数据go readData()// 写入数据go writeData()// 保持主线程运行select {}
}func readData() {for {mu.RLock()rows, err := db.Query("SELECT * FROM table_name")mu.RUnlock()if err != nil {fmt.Println("Error reading data:", err)continue}defer rows.Close()// 处理查询结果// ...// 睡眠一段时间,模拟读操作的持续性// 请注意,这是一个简单示例,实际应用中可能需要更复杂的逻辑// 或使用定时器进行控制}
}func writeData() {for {mu.Lock()_, err := db.Exec("INSERT INTO table_name (column1, column2) VALUES (?, ?)", value1, value2)mu.Unlock()if err != nil {fmt.Println("Error writing data:", err)continue}// 睡眠一段时间,模拟写操作的持续性// 请注意,这是一个简单示例,实际应用中可能需要更复杂的逻辑// 或使用定时器进行控制}
}

但是代码里对数据库的操作非常频繁且混乱,加了读写锁后经常出现请求很慢的情况,考虑其他方案

第三版 使用事务

使用事务代码忽略,最终发现,因为事务过长,导致出现了重复写的问题,考虑其他方案

第四版 map

通过一个二维的map来存储数据,每当数据存满10条就处理,当然毫不意外的,出现了map的竞争。map也是可以用锁的,但是这里是二维的map,加上两层锁之后使得效率极低,而且依旧有概率出现map竞争导致报错

此外,还可以考虑使用redis设置锁,直接set就行了,但是因为环境不支持redis,此方案弃用

最终版 动态channel

出现以上问题的根本原因是消费太快,其实完全可以把每个ws连接的数据都写到各自的channel里,同时设置每个channel都累积10条再消费,当然还需要一个处理机制,如果超过10s也消费一次。

启动"生产者"、“消费者”

在当前环境中,生产者就是每次从ws中读到数据往动态channel中写入,消费者就是不断获取有哪些channel,以及从channel中读数据,在ws写入时的处理逻辑大概可以简化为如下demo:

package testimport ("context""encoding/json""github.com/gin-gonic/gin""github.com/gorilla/websocket"log "github.com/sirupsen/logrus""net/http""sync"
)// RequestTemplate 请求模板
type RequestTemplate struct {Op   string               `json:"op"`   // 操作Id   int                  `json:"id"`   // 唯一id标识Time string               `json:"time"` // 时间,用秒级时间戳,字符串包裹Data *RequestTemplateData `json:"data"` // 请求数据Code int                  `json:"code"` // 状态码
}// RequestTemplateData 请求中data包含的部分,实际这里是很复杂的结构,之前超时/死锁也是因为这里处理逻辑比较复杂,但是这篇博客的演示重点不是这个,因此简略为id和请求ip
type RequestTemplateData struct {ConnIp string `json:"conn_ip"` // 请求ipId     int    `json:"id"`      // 唯一id标识
}// ConnInfo 具体的连接信息
type ConnInfo struct {Conn      *websocket.Conn    `json:"conn"`   // websocket连接Ctx       context.Context    `json:"ctx"`    // 连接上下文CtxCancel context.CancelFunc `json:"cancel"` // 连接上下文cancel functionIp        string             `json:"ip"`     // 连接的手机端ipId        int                `json:"id"`     // 唯一id标识
}var AllConns = make(map[string]*ConnInfo) //创建字典集合存储连接信息// Start 启动
func Start() {//处理ws的连接http.HandleFunc("/ws", HandleMsg)// //监听7001端口号,作为websocket连接的服务log.Info("Server started on :7001")log.Fatal(http.ListenAndServe(":7001", nil))
}// ChannelStorage channel数据
type ChannelStorage struct {sync.RWMutexchannels map[string]chan *RequestTemplateData
}var ConnRequestData map[int]*RequestTemplateDatavar upgrader = websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {return true},
}// HandleMsg 处理ws连接,每来一个新客户端请求就建立一个新连接
func HandleMsg(w http.ResponseWriter, r *http.Request) {conn, err := upgrader.Upgrade(w, r, nil) // 协议升级,这里也可以直连if err != nil {log.Error(err)return}//获取连接ip,这里是为了区分每个连接connIp := conn.RemoteAddr().String()// 这里是为了后续关闭channelrootCtx := context.Background()ctx, cancel := context.WithCancel(rootCtx)//加入连接AllConns[connIp] = &ConnInfo{Conn:      conn,   // 客户端ws链接对象Ctx:       ctx,    // 连接上下文CtxCancel: cancel, // 取消连接上下文}defer func() {// 如果断开连接,删除数据if AllConns[connIp] != nil {AllConns[connIp].CtxCancel()delete(ConnRequestData, AllConns[connIp].Id)go SetDoneData(AllConns[connIp].Id, conn) // 这里对结束做处理}delete(AllConns, conn.RemoteAddr().String())err = conn.Close()if err != nil {return}log.Error("HandleMsg异常,开始defer处理:", err)if err := recover(); err != nil {log.Error("websocket连接异常,已断开:", err)}}()log.WithFields(log.Fields{"connIp": connIp,}).Info("沙箱已连接")reqCh := &ChannelStorage{}go reqCh.ResultConsumer(ctx) // 这里是消费者//循环读取ws客户端的消息for {// 读取消息_, msg, err := conn.ReadMessage()if err != nil {log.WithFields(log.Fields{"connIp": connIp,}).WithError(err).Error("读取websocket的消息失败")if AllConns[connIp] != nil {delete(ConnRequestData, AllConns[connIp].Id)go SetDoneData(AllConns[connIp].Id, conn) // 连接断开设置状态为结束}// 断开ws连接conn.Close()delete(AllConns, conn.RemoteAddr().String())return}//msg []byte转stringmsgStr := string(msg)log.Info("收到消息为:", msgStr)//反序列化消息为结构体requestData := RequestTemplate{}if err := json.Unmarshal(msg, &requestData); err != nil {conn.WriteJSON(gin.H{"id": "未知", "op": "未知", "error": "cmd通信的请求参数有误,无法json decode"})log.Error("json_decode cmd命令的请求参数时出错:", err)continue}dataInfo := requestData.Data// 这里实际上有很多操作,简写为两种if requestData.Op != "" {switch requestData.Op {// 收到报告case "report":go reqCh.Produce(dataInfo) // "生产者",发送一条消息// 已完成case "done":go CheckDone(dataInfo, conn) // 做完成的处理default:log.Error("未识别的命令:", msgStr)}}}
}

有一个for循环在持续监听ws消息,消费者只启动一次,这里重点就是生产和消费如何实现

“生产者”

“生产者”要做的事就是:
1 每当收到ws消息后,解析,拿到唯一id(这个唯一是指这个连接下的所有上报消息的id都是相同的)
2 判断这个“唯一id”是否已经创建了channel,若创建了则不需要创建,直接写入channel,若未创建则新建channel
以下是生产者的demo:

// GetChannel 获取通道
func (cs *ChannelStorage) GetChannel(key string) chan *RequestTemplateData {cs.RLock()defer cs.RUnlock()return cs.channels[key]
}// CreateChannel 创建通道并存储到 map 中
func (cs *ChannelStorage) CreateChannel(key string) chan *RequestTemplateData {cs.Lock()defer cs.Unlock()if cs.channels == nil {cs.channels = make(map[string]chan *RequestTemplateData, 800)}ch := make(chan *RequestTemplateData, 10)cs.channels[key] = chreturn ch
}// Produce 往上报channel中写数据
func (cs *ChannelStorage) Produce(requestData *RequestTemplateData) {defer func() {if err := recover(); err != nil {log.Info("_____________recover CaseResultAdd error________: ", err)}}()// 创建存储通道的结构体实例chanelKey := strconv.Itoa(requestData.Id)channel := cs.GetChannel(chanelKey)if channel == nil {channel = cs.CreateChannel(chanelKey)}// 直接往channel里面塞if channel != nil {channel <- requestData}
}
消费者

消费者由于只启动一次,但后续可能会有新的channel,因此需要增加一个获取所有连接的方法:
消费者demo:

func (cs *ChannelStorage) ResultConsumer(ctx context.Context) {defer func() {if err := recover(); err != nil {log.Info("_____________recover CaseResultConsumer error________: ", err)}}()for {select {case <-ctx.Done():log.Info("websocket断开连接,消费者协程退出...")returndefault:cs.processAllChannels(ctx)  // 传入 context.Contexttime.Sleep(2 * time.Second) // 控制处理频率}}
}// processAllChannels 获取所有channel
func (cs *ChannelStorage) processAllChannels(ctx context.Context) {cs.RLock()defer cs.RUnlock()var wg sync.WaitGroup // 用于等待所有通道处理完毕for chName, channel := range cs.channels {wg.Add(1)go func(chName string, channel chan *RequestTemplateData) {defer wg.Done()cs.processChannel(chName, channel, ctx)}(chName, channel)}wg.Wait() // 等待所有通道处理完毕
}
func (cs *ChannelStorage) processChannel(chName string, channel chan *RequestTemplateData, ctx context.Context) {const batchSize = 10 // 每次处理的数据量var messages []*RequestTemplateDatatargetMsgOverTime := 10 * time.Second // 超时时间for {select {case caseMsg := <-channel:messages = append(messages, caseMsg) // 将接收到的消息放入 messages 切片中if len(messages) == batchSize {tmpMessages := messagesmessages = nilprocessMessages(tmpMessages)}case <-time.After(targetMsgOverTime):log.Info("Timeout reached. Processing...")if len(messages) > 0 {tmpMessages := messagesmessages = nillog.Info("Processing remaining messages for channel:", chName)processMessages(tmpMessages)}case <-ctx.Done(): // 如果收到上下文取消信号,退出函数log.Info("______________________error__________cancel______")return}}
}func processMessages(messages []*RequestTemplateData) {// 在这里处理消息就是批量的了
}

http://www.mrgr.cn/p/50070516

相关文章

【GPT调用】本地使用python调用GPT接口

python调用GPT接口 环境变量设置主调用方法执行结果 环境变量设置 .env文件中配置GPT环境变量 api_key"你的GPT-API-KEY" urlhttps://ai-proxy.ksord.com/wps.openai.azure.com/openai/deployments/gpt-4-32k/chat/completions?api-version2023-09-01-preview主调…

Jmeter-线程组下篇

线程组 线程组作为JMeter测试计划的核心组件之一,对于模拟并发用户的行为至关重要。线程组元件是整个测试计划的入口,所有的取样器和控制器必须放置在线程组下。 可以将线程组视为一个虚拟用户池,其中每个线程可被理解为一个虚拟用户,多个虚拟用户同时执行相同的一批任务。…

stable diffusion 之云端部署攻略

本文主要介绍stable diffusion云端产品以及使用步骤 ℹ️整合安装包、模型资源见文末~ megaease cloud&#xff08;强烈推荐&#xff09; 优点&#xff1a; 集成了常用大模型和插件、VAE3080显卡配置&#xff0c;费用大概0.48元/小时&#xff0c;可随时暂停&#xff0c;暂停…

【自然语言处理】seq2seq模型——机器翻译

seq2seq模型——机器翻译 1 任务目标 1.1 案例简介 seq2seq是神经机器翻译的主流框架&#xff0c;如今的商用机器翻译系统大多都基于其构建&#xff0c;在本案例中&#xff0c;我们将使用由NIST提供的中英文本数据训练一个简单的中英翻译系统&#xff0c;在实践中学习seq2se…

腾讯游戏海外扩张,增持芬兰游戏开发商股份持股比例增至14.8%

易采游戏网5月8日消息&#xff0c;近日腾讯再次出手&#xff0c;大幅增持了芬兰知名游戏开发商Remedy Entertainment的股份&#xff0c;持股比例猛增至14.8%。这一举动引起了业界和投资者的广泛关注。 据了解&#xff0c;腾讯此次增持是在2024年4月24日完成的。根据芬兰法律规…

Ubuntu下halcon软件的下载安装

由于工作需求&#xff0c;点云配准需要使用halcon进行实现&#xff0c;并且将该功能放入QT界面中 1.下载halcon 进入halcon官网进行下载 官网链接&#xff1a;https://www.mvtec.com/products/halcon/ 注意&#xff1a;要注册登陆之后才能进行下载 接着点击Downloads->H…

Axure实现菜单抽屉效果

Axure是怎么实现如下效果的&#xff1f; 菜单打开和收起侧边栏菜单抽屉效果 实现效果 两级菜单&#xff0c;点击菜单收起其他菜单&#xff0c;打开当前菜单。 实现原理 单击一级菜单时&#xff0c;1&#xff09;切换当下二季菜单的显示/隐藏状态 2&#xff09;隐藏其他菜单…

网络安全之ACL

ACL&#xff1a;访问控制列表——控制列表&#xff08;策略列表&#xff09;&#xff0c;是一个控制工具。 功能&#xff1a;&#xff01;、定义感兴趣路由&#xff08;控制层面&#xff09;。2、定义感兴趣流量&#xff08;数据层面&#xff09;。 例如&#xff1a; 假设在该…

【MsSQL】数据库基础 库的基本操作

目录 一&#xff0c;数据库基础 1&#xff0c;什么是数据库 2&#xff0c;主流的数据库 3&#xff0c;连接服务器 4&#xff0c;服务器&#xff0c;数据库&#xff0c;表关系 5&#xff0c;使用案例 二&#xff0c;库的操作 1&#xff0c;创建数据库 2&#xff0c;创建…

【攻防技术系列+Python】-- 用 Python 控制系统进程

用 Python 控制系统进程 由于注册表几乎可以决定整个操作系统的运行,因此它成为安全工具与恶意软件对抗的主要战场之一。除了注册表之外,对系统进程的控制也是安全工具和恶意软件的必争之地。这里我们首先要了解程序和进程的区别。程序是静态的,进程是动态的。进程可以分为系…

两个手机在一起ip地址一样吗?两个手机是不是两个ip地址

在数字时代的浩瀚海洋中&#xff0c;手机已经成为我们生活中不可或缺的一部分。随着移动互联网的飞速发展&#xff0c;IP地址成为了连接手机与互联网的桥梁。那么&#xff0c;两个手机在一起IP地址一样吗&#xff1f;两个手机是不是两个IP地址&#xff1f;本文将带您一探究竟&a…

【快速入门Linux】10_Linux命令—Vi编辑器

文章目录 一、vi 简介1.1 vi1.2 vim1.3查询软连接命令&#xff08;知道&#xff09; 二、打开和新建文件&#xff08;重点&#xff09;2.1 打开文件并且定位行2.2 异常处理 三、vi三种工作模式&#xff08;重点&#xff09;3.1 末行模式-命令 四、常用命令4.0 命令线路图4.1 移…

图像涂哪就动哪!Gen-2新功能“神笔马良”爆火,网友:急急急

AI搞视频生成&#xff0c;已经进化到这个程度了&#xff1f;&#xff01; 对着一张照片随手一刷&#xff0c;就能让被选中的目标动起来&#xff01; 明明是一辆静止的卡车&#xff0c;一刷就跑了起来&#xff0c;连光影都完美还原&#xff1a; 原本只是一张火灾照片&#xff0…

STM32快速入门(串口传输之USART)

STM32快速入门&#xff08;串口传输之USART&#xff09; 前言 USART串口传输能实现信息在设备之间的点对点传输&#xff0c;支持单工、半双工、全全双工&#xff0c;一般是有三个引脚&#xff1a;TX、RX、SW_RX&#xff08;共地&#xff09;。不需要一根线来同步时钟。最大优…

Hadoop3:集群搭建及常用命令与shell脚本整理(入门篇,从零开始搭建)

一、集群环境说明 1、用VMware安装3台Centos7.9虚拟机 2、虚拟机配置&#xff1a;2C&#xff0c;2G内存&#xff0c;50G存储 3、集群架构设计 从表格中&#xff0c;可以看出&#xff0c;Hadoop集群&#xff0c;主要有2个模块服务&#xff0c;一个是HDFS服务&#xff0c;一个是…

基于web的物流管理系统

文章目录 项目介绍主要功能截图&#xff1a;部分代码展示设计总结项目获取方式 &#x1f345; 作者主页&#xff1a;超级无敌暴龙战士塔塔开 &#x1f345; 简介&#xff1a;Java领域优质创作者&#x1f3c6;、 简历模板、学习资料、面试题库【关注我&#xff0c;都给你】 &…

特斯拉CEO马斯克访华,或加速FSD技术在中国的落地

特斯拉首席执行官埃隆马斯克于4月底进行了中国之旅&#xff0c;这一访问被业内人士认为可能加速特斯拉FSD&#xff08;Full Self-Drive&#xff0c;完全自动驾驶&#xff09;技术在中国的应用。业内专家指出&#xff0c;马斯克的此番到访可能会对中国自动驾驶市场产生深远影响&…

VMware如何将虚拟机的端口服务映射出去

我们有时候在VMware起了一个服务,想要局域网的朋友同事访问 这时候就需要i端口映射 选择NAT模式 VMnet8点击 NAT设置 然后点击添加然后映射传入端口对话框 红色部分是 你主机本机,也就是你在用的电脑的空闲端口(可以打开cmd 输入命令 : netstat -ano 查看已用端口都有哪些…

c++多线程2小时速成

简介 c多线程基础需要掌握这三个标准库的使用&#xff1a;std::thread,std::mutex, andstd::async。 1. Hello, world #include <iostream> #include <thread>void hello() { std::cout << "Hello Concurrent World!\n"; }int main() {std::th…

CSS 伪类、伪元素的应用实例:电池充电、高能进度条

一、目的 本文通过 CSS 伪类、伪元素&#xff0c;结合动画 animation 和 Vue 动态样式属性&#xff08;通过 CSS 变量&#xff09;的写法&#xff0c;来实现电池充电、高能进度条的效果&#xff0c;如下图所示。 二、基础知识 1、CSS 伪类、伪元素 简单概括成以下 4 点&#x…