当前位置: 首页 > news >正文

Go语言实现长连接并发框架 - 请求分发器

文章目录

  • 前言
  • 接口
  • 结构体
  • 接口实现
  • 项目地址
  • 最后

前言

你好,我是醉墨居士,我们上篇博客实现了任务管理器的功能,接下来这篇博客我们将要实现请求分发模块的开发

接口

trait/dispatcher.go

type Dispatcher interface {Start()Dispatch(connQueue chan Connection)BatchDispatch(conn Connection) errorSetHeaderDeadline(deadline time.Time)SetBodyDeadline(deadline time.Time)ChooseQueue(conn Connection) chan <- ConnectionCommit(conn Connection)
}

结构体

gcore/dispatcher.go

// Dispatcher 请求分发模块,负责读取客户端连接的数据,并对数据进行拆包转换成消息格式,然后分发给下游的任务处理模块对消息进行业务处理
type Dispatcher struct {headerDeadline time.TimebodyDeadline time.TimeconnQueue []chan trait.ConnectiontaskMgr trait.TaskMgr
}// NewDispatcher 创建一个请求分发器
func NewDispatcher(taskMgr trait.TaskMgr) *Dispatcher {connQueue := make([]chan trait.Connection, global.Config.DispatcherQueues)for i := 0; i < len(connQueue); i++ {connQueue[i] = make(chan trait.Connection, global.Config.DispatcherQueueLen)}return &Dispatcher{connQueue: connQueue,taskMgr: taskMgr,}
}

接口实现

gcore/dispatcher.go

// Start 启动请求分发模块
func (d *Dispatcher) Start() {for i := 0; i < len(d.connQueue); i++ {for j := 0; j < global.Config.DispatcherQueueLen; j++ {go d.Dispatch(d.connQueue[i])}}
}// StartDispatcher 分发连接数据
func (d *Dispatcher) Dispatch(connQueue chan trait.Connection) {// 从conn中读取数据,并将数据提交给taskMgr处理for conn := range connQueue {d.BatchDispatch(conn)}
}// BatchDispatch 批量读取连接中的数据,并封装成请求,然后分发请求
func (d *Dispatcher) BatchDispatch(conn trait.Connection) error {for time.Now().After(d.headerDeadline) {header := make([]byte, 4)// 设置header读取超时时间conn.SetReadDeadline(d.headerDeadline)_, err := io.ReadFull(conn, header)if err != nil {if netErr, ok := err.(net.Error); ok && netErr.Timeout() {// 数据包读取超时return nil}return err}// 设置body读取超时时间conn.SetReadDeadline(d.bodyDeadline)// 读取长度dataLen := binary.BigEndian.Uint16(header[2:4])// 读取数据body := make([]byte, dataLen)	_, err = io.ReadFull(conn, body)if err != nil {return err}msg := gpack.Unpack(header, body)// 提交消息,处理数据request := NewRequest(conn, msg)d.taskMgr.Submit(request)}return nil
}// SetHeaderDeadline 设置header读取超时时间
func (d *Dispatcher) SetHeaderDeadline(deadline time.Time) {d.headerDeadline = deadline
}// SetBodyDeadline 设置body读取超时时间
func (d *Dispatcher) SetBodyDeadline(deadline time.Time) {d.bodyDeadline = deadline
}// ChooseQueue 选择处理连接的队列
func (d *Dispatcher) ChooseQueue(conn trait.Connection) chan <- trait.Connection {// 负载均衡,选择队列return d.connQueue[conn.ID() % int32(len(d.connQueue))]
}// Commit 提交连接到队列
func (d *Dispatcher) Commit(conn trait.Connection) {d.ChooseQueue(conn) <- conn
}

项目地址

Github:https://github.com/zm50/gte
Giee:https://gitee.com/zm50/gte

最后

我是醉墨居士,我们完成了基本的请求分发器的开发,希望对你有所帮助,也希望你有所收获


http://www.mrgr.cn/news/42084.html

相关文章:

  • 10.3 刷题
  • 更新-Python OS
  • java版基于Spring Boot + Mybatis在线招投标|评标|竞标|单一采购|询价|邀标|在线开标|招标公告发布|评审专家|招投标采购系统源码
  • Qt 5开发步骤及实例
  • solana监听智能合约事件实践
  • 端侧大模型系列 | 端侧AI Agent任务拆解大师如何助力AI手机?(简短版)
  • cookies和session的区别?
  • kafka基本概念以及用法
  • node依赖安装失败方法
  • 项目管理-沟通管理
  • 【JNI】普通类型的基本使用
  • 2024/10/3 408数据结构大题打卡
  • Web安全 - 安全防御工具和体系构建
  • Linux学习之路 -- 线程 -- 死锁及线程安全相关问题
  • 【Nacos架构 原理】内核设计之Nacos一致性协议
  • 数据库中 级联 详解
  • 【AIGC】内容创作——AI文字、图像、音频和视频的创作流程
  • 【C++】STL--vector
  • 将模板引擎用于 Express
  • PostgreSQL常用字符串函数