Python 全栈系列236 rabbit_agent搭建

news/2024/5/13 9:06:46

说明

通过rabbit_agent, 以接口方式实现对队列的标准操作,将pika包在微服务内,而不必在太多地方重复的去写。至少在服务端发布消息时,不必再去考虑这些问题。

在分布式任务的情况下,客户端本身会启动一个持续监听队列的客户端服务,这些应该是容易通过简单的配置来实现的。

在未来的应用上,我计划使用rabbitmq作为公网的消息队列,满足分布式计算的要求。例如,我部署了n个大模型,希望它们可以处理接口请求。很显然,一台服务器放不下n个大模型,但是用户可以把请求先发到消息队列,然后在不同的机器上启动大模型,分别接受来自队列的消息,处理后返回。
这样,只要在服务端有一个前端,可以转发、收集消息展示在应用前端,那么就可行了。

内容

模式1:简单队列,连通性测试

适合简单缓存

有P(Produce)和C(Consumer)两端。
在这里插入图片描述
生产端在建立连接后,声明队列,然后往里面发消息。
Connection -> Channel -> Queue -> Message

生产

生产者:将消息发送到队列。
模式:只有在有新的消息要发布时才连接队列。(然后就可以释放连接)

import pika
credentials = pika.PlainCredentials('xxx', 'xxx')
import time
with pika.BlockingConnection(pika.ConnectionParameters('HOST', 11111, '/', credentials)) as connection:channel = connection.channel()channel.queue_declare(queue='hello')# 方式一:基本队列for i in range(100):time.sleep(0.1)channel.basic_publish(exchange='',routing_key='hello',body='Hello World!')print(" [x] Sent 'Hello World!'")
消费

消费者:将消息提取出来并打印。
模式:一直处于监听状态,所以连接需要一直保持。

def callback(ch, method, properties, body):print(f" [x] Received {body}")connection = pika.BlockingConnection(pika.ConnectionParameters('HOST', 11111, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue='hello')
# 方式一:基本队列
channel.basic_consume(queue='hello',auto_ack=True,on_message_callback=callback)print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

模式2:工作队列,区分消费者

适合分布式任务

在这里插入图片描述

在这里插入图片描述
这个模式下,稍微有点复杂。

简单模式生产者:

import sys
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',routing_key='hello',body=message)
print(f" [x] Sent {message}")

如果这时候rabbitmq挂了,那么数据就会丢失,这时候要在声明队列时声明持久化的。但这要求队列在一开始就声明是持久化的。如果队列一开始没声明,再次声明会报错。
同时在发布消息的时候,也要声明持久化

channel.queue_declare(queue='hello', durable=True)
channel.basic_publish(exchange='',routing_key="hello1",body=message,properties=pika.BasicProperties(delivery_mode = pika.DeliveryMode.Persistent))

配置完重启服务测试

docker restart rabbitmq_24091_24092

在这里插入图片描述
ok,生产端的消息被持久化了,即使重启消息也没有丢失。

接下来就是客户端。也就是worker。
考虑到worker同样存在不可靠的情况,有可能消息消费到一半,然后worker挂了。所以这里主要是消息的应答机制。

默认情况下,worker采用自动应答机制。即获取消息就认为被正常消费。这适用于产品的稳定性很高,或者消息的重要性很低的情况(允许漏消息)。

def callback(ch, method, properties, body):print(f" [x] Received {body.decode()}")time.sleep(body.count(b'.'))print(" [x] Done")
# 方式一:基本队列
channel.basic_consume(queue='hello',on_message_callback=callback, auto_ack =True)    

如果要做更可靠的确认,可以采取这种手工应答的机制。即消费时声明不自动确认,然后在callback内部确认。

# 手动确认
def callback(ch, method, properties, body):print(f" [x] Received {body.decode()}")time.sleep(body.count(b'.'))print(" [x] Done")ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(queue='hello',on_message_callback=callback, auto_ack =False)    print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

最后是负载均衡,在这里,通过消费者声明自己的预取数量来完成。

channel.basic_qos(prefetch_count=3)
channel.basic_consume(queue='hello1',on_message_callback=callback, auto_ack =False)    

3 广播

我认为在复杂决策场景下可以用到。
在这里插入图片描述

emit_log.py

channel = connection.channel()channel.exchange_declare(exchange='logs', exchange_type='fanout')message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Sent {message}")
connection.close()'''
如您所见,建立连接后我们声明了交换。此步骤是必要的,因为禁止发布到不存在的交易所。如果还没有队列绑定到交换器,消息将会丢失,但这对我们来说没关系;如果还没有消费者在监听,我们可以安全地丢弃该消息。python3 emit_log.py First message.
'''

receive_logs1.py

channel = connection.channel()channel.exchange_declare(exchange='logs', exchange_type='fanout')result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queuechannel.queue_bind(exchange='logs', queue=queue_name)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(f" [x] {body}")channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)channel.start_consuming()'''python3 receive_logs1.py >> logs_from_rabbit1.log
python3 receive_logs2.py >> logs_from_rabbit2.log
'''

在两个终端分别执行

python3 receive_logs1.py >> logs_from_rabbit1.log
python3 receive_logs2.py >> logs_from_rabbit2.log

在这里插入图片描述
当关闭后,数据被写入日志
在这里插入图片描述
队列被自动删除
在这里插入图片描述

4 路由

fanout是无意识转发,direct可以通过不同的路由键值决定队列分发,或者消息丢弃(如严重程度低的)。这种过滤和转发是通过路由键来确定的 routing_key
在这里插入图片描述

5 主题

有点像正则,实现更复杂的过滤。
在这里插入图片描述

5 微服务

命名为rabbit_agent_24098,第一步先实现模式二(worker)和模式三(subscribe)

先获取到基本包,并安装,能省不少事

wget  Basefuncs-1.2-py3-none-any.whl 
pip install Basefuncs-1.2-py3-none-any.whl

然后是简单的server_funcs.py,在server_funcs里定义了两个基础文件夹(现在看来也不是特别需要)

# 【创建tornado所需问文件夹】
import os
# 如果路径不存在则创建
def create_folder_if_notexist(somepath):if not os.path.exists(somepath):os.makedirs(somepath)return Truem_static = os.path.join(os.getcwd(),'m_static')
m_template = os.path.join(os.getcwd(),'m_template')create_folder_if_notexist(m_static)
create_folder_if_notexist(m_template)settings = {
'static_path':m_static,
'template_path':m_template
}# 如果需要序列化含np的内容
import json
from json import JSONEncoder
class MyEncoder(JSONEncoder):def default(self, obj):if isinstance(obj, np.integer):return int(obj)elif isinstance(obj, np.floating):return float(obj)elif isinstance(obj, np.ndarray):return obj.tolist()if isinstance(obj, datetime):return obj.__str__()if isinstance(obj, dd.timedelta):return obj.__str__()else:return super(MyEncoder, self).default(obj)# json.dumps(foo, cls=MyJsonEncoder)from Basefuncs import * 
# 读取配置
conf_dict = get_conf_dict('configs.conf')

服务端:

from server_funcs import *
import tornado.httpserver  # http服务器
import tornado.ioloop  # ?
import tornado.options  # 指定服务端口和路径解析
import tornado.web  # web模块
from tornado.options import define, options
import os.path  # 获取和生成template文件路径import pika
import json
# 全局配置文件
# rabbit01 = conf_dict['rabbit01']# 应用列表
app_list = []IndexHandler_path = r'/'
class IndexHandler(tornado.web.RequestHandler):def get(self):self.write('【GET】This is Website for Internal API System')self.write('Please Refer to API document')print('Get got a request test')# print(buffer_dict)def post(self):request_body = self.request.bodyprint('Trying Decode Json')some_dict = json.loads(request_body)print(some_dict)msg_dict = {}msg_dict['info'] = '【POST】This is Website for Internal API System'msg_dict['input_dict'] = some_dictself.write(json.dumps(msg_dict))print('Post got a request test')
IndexHandler_tuple = (IndexHandler_path,IndexHandler)
app_list.append(IndexHandler_tuple)# 发布消息:传入对应的队列服务器名称,获取对应的服务器配置,然后进行连接并发布消息
## 模式二:Work Queues
WorkQMessageHandler_path = r'/send_workq_message/'
class WorkQMessageHandler(tornado.web.RequestHandler):def post(self):request_body = self.request.bodysome_dict = json.loads(request_body)# 1 第一层rabbit = some_dict['rabbit']exchange = some_dict.get('exchange') or ''exchange_type = some_dict.get('exchange_type') or ''# queue不可缺少queue = some_dict['queue']durable = some_dict.get('durable') if durable is None:durable = True routing_key = some_dict.get('routing_key') or ''message_list = some_dict['message_list']print(some_dict)the_rabbit_conf_dict = conf_dict[rabbit]credentials = pika.PlainCredentials(the_rabbit_conf_dict['user'], the_rabbit_conf_dict['pwd'])msg_dict = {}with pika.BlockingConnection(pika.ConnectionParameters(the_rabbit_conf_dict['host'], the_rabbit_conf_dict['port'], '/', credentials)) as connection:channel = connection.channel()if len(exchange.strip())>1:channel.exchange_declare(exchange=exchange, exchange_type=exchange_type)# 队列的持久化与否要一开始设置好if durable is True:channel.queue_declare(queue=queue, durable=True)for message in message_list:print('a :',message )channel.basic_publish(exchange= exchange, routing_key=routing_key, body=json.dumps(message),properties=pika.BasicProperties(delivery_mode = pika.DeliveryMode.Persistent))msg_dict['durable'] = Truemsg_dict['status'] = Trueelse:channel.queue_declare(queue=queue)for message in message_list:print('b :',message )channel.basic_publish(exchange= exchange, routing_key=routing_key, body=json.dumps(message) )msg_dict['durable'] = Falsemsg_dict['status'] = Truemsg_dict['messages'] = len(message_list)self.write(json.dumps(msg_dict))
WorkQMessageHandler_tuple = (WorkQMessageHandler_path,WorkQMessageHandler)
app_list.append(WorkQMessageHandler_tuple)## 模式三:Publish/Subscribe
## 模式二:Work Queues
SubscribeMessageHandler_path = r'/send_subscribe_message/'
class SubscribeMessageHandler(tornado.web.RequestHandler):def post(self):request_body = self.request.bodysome_dict = json.loads(request_body)# 1 第一层rabbit = some_dict['rabbit']exchange = some_dict.get('exchange') or ''exchange_type = some_dict.get('exchange_type') or ''routing_key = some_dict.get('routing_key') or ''message_list = some_dict['message_list']print(some_dict)the_rabbit_conf_dict = conf_dict[rabbit]credentials = pika.PlainCredentials(the_rabbit_conf_dict['user'], the_rabbit_conf_dict['pwd'])msg_dict = {}with pika.BlockingConnection(pika.ConnectionParameters(the_rabbit_conf_dict['host'], the_rabbit_conf_dict['port'], '/', credentials)) as connection:channel = connection.channel()if len(exchange.strip())>1:channel.exchange_declare(exchange=exchange, exchange_type=exchange_type)for message in message_list:channel.basic_publish(exchange= exchange, routing_key=routing_key, body=json.dumps(message))            SubscribeMessageHandler_tuple = (SubscribeMessageHandler_path,SubscribeMessageHandler)
app_list.append(SubscribeMessageHandler_tuple)if __name__ == '__main__':#tornado.options.parse_command_line()apps = tornado.web.Application(app_list, **settings)http_server = tornado.httpserver.HTTPServer(apps)define('port', default=8000, help='run on the given port', type=int)http_server.listen(options.port)# 单核# 多核打开注释# 0 是全部核# http_server.start(num_processes=10) # tornado将按照cpu核数来fork进程# ---启动print('Server Started')tornado.ioloop.IOLoop.instance().start()

本地开发测试完之后,发布为镜像,然后启动服务。

docker run -d \--restart=always \--name=rabbit_agent_24098 \-v /etc/localtime:/etc/localtime  \-v /etc/timezone:/etc/timezone\-v /etc/hostname:/etc/hostname\-e "LANG=C.UTF-8" \-w /workspace\-p 24098:8000\myregistry.domain.com:24052/server.andy.rabbit_agent_24098:v100 \sh -c "python3 server.py"

模式二测试:WorkerQ

在生产端发送消息。声明了一个不持久化的队列,然后发送消息列表。注意:如果生产端声明非持久队列,那么消费端也要做同样的声明。否则会出现声明错误。另,如果消息ACK失败,RabbitMQ会在TTL之后将消息放回队列。如果消费者的通道断开连接,那么RabbitMQ也会将消息放回队列。

import requests as req 
message_list = [{'msg_id':1,'msg':'first msg'},{'msg_id':2,'msg':'second msg'}]# 1 模式2 WorkQ:服务端发送消息
para_dict = {}
para_dict['rabbit'] = 'rabbit01'
para_dict['routing_key'] = 'hello2'
para_dict['durable'] = False
para_dict['message_list'] = message_list
para_dict['queue'] = 'hello2'# res = req.post('http://127.0.0.1:8000/send_workq_message/', json = para_dict)
res = req.post('http://WAN_IP:24098/send_workq_message/', json = para_dict)

在消费端执行消费。默认的情况下,body里存放的是二进制字符串。以下采取了自动和手动方式进行消息确认。

import pika
import json
credentials = pika.PlainCredentials('xxx', 'xxx')
connection = pika.BlockingConnection(pika.ConnectionParameters('HOST', PORT, '/', credentials))channel = connection.channel()import time# 自动确认
# def callback(ch, method, properties, body):
#     print(f" [x] Received {body.decode()}")
#     time.sleep(body.count(b'.'))
#     print(" [x] Done")
# # 方式一:基本队列
# channel.basic_consume(queue='hello',
#                         on_message_callback=callback, auto_ack =True)    # 手动确认
# def callback(ch, method, properties, body):
#     input_data = json.loads(body.decode())
#     print(f" [x] Received ",input_data)
#     # time.sleep(body.count(b'.'))
#     print(" [x] Done")
#     ch.basic_ack(delivery_tag = method.delivery_tag)
def callback(ch, method, properties, body):# input_data = json.loads(body.decode())print(f" [x] Received ",body.decode())# time.sleep(body.count(b'.'))print(" [x] Done")ch.basic_ack(delivery_tag = method.delivery_tag)# channel.queue_declare(queue='hello1')
channel.queue_declare(queue='hello1',durable=True)
channel.basic_qos(prefetch_count=3)
channel.basic_consume(queue='hello1',on_message_callback=callback, auto_ack =False)    print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

模式三测试:订阅模式

para_dict = {}
para_dict['rabbit'] = 'rabbit01'
para_dict['routing_key'] = None
para_dict['durable'] = False
para_dict['message_list'] = message_list
para_dict['exchange'] = 'logs'
para_dict['exchange_type'] = 'fanout'res = req.post('http://127.0.0.1:8000/send_subscribe_message/', json = para_dict)

订阅的worker用了另一种形式:使用系统分配的默认队列,使用完之后自动删除

#!/usr/bin/env python
import pika
credentials = pika.PlainCredentials(xxx, xxx)
connection = pika.BlockingConnection(pika.ConnectionParameters(WAN_IP, PORT, '/', credentials))channel = connection.channel()channel.exchange_declare(exchange='logs', exchange_type='fanout')result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queuechannel.queue_bind(exchange='logs', queue=queue_name)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):input_data = json.loads(body.decode())print(f" [x] ",input_data)channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)channel.start_consuming()

6 结束

到这里第一版就算完成了,可以开始先用RabbitMQ做一些应用。

目前能想到的是用于分布式任务,队列中存放任务的元信息。Worker可以通过直接或者间接方式取数。
直接方式是指worker直接发起数据库的拉取动作,获得数据然后执行。
间接方式则是worker向指定队列反馈消息,由另一个服务来分发数据文件(针对租用算力机没有额外端口的情况)

在应用上,可以

  • 1 为任务搭建具有前端的微服务,数据量不大,可以通过RabbitMQ直接传数据
  • 2 接受来自量化程序的交易消息

http://www.mrgr.cn/p/33226677

相关文章

Oracle重做日志文件clear logfile与clear unarchived logfile浅析

首先,从v$log动态视图中观察到ARC和STATUS两个字段STATUS:分为CURRENT、ACTIVE和INACTIVE三种,当数据库进程DBWn进行一次写入,脏数据从内存刷写到redo logfile中,这时承载数据写入的redo logfile状态即为CURRENT;而数据从redo logfile拷贝到归档目录下时处于ACTIVE状态,…

第十篇【传奇开心果系列】Python自动化办公库技术点案例示例:深度解读Python自动化操作Excel

传奇开心果博文系列 系列博文目录Python自动化办公库技术点案例示例系列博文目录 前言一、重要作用解说二、Python操作Excel的常用库介绍三、数据处理和分析示例代码四、自动化报表生成示例代码五、数据导入和导出示例代码六、数据可视化示例代码八、数据校验和清洗示例代码九、…

Qt实现简易的多线程TCP服务器(附源码)

目录 一.UI界面的设计 二.服务器的启动 三.实现自定义的TcpServer类 1.在widget中声明自定义TcpServer类的成员变量 2.在TcpServer的构造函数中对于我们声明的m_widget进行初始化,m_widget我们用于后续的显示消息等,说白了就是主界面的更新显示等 …

实验二 数据描述

c语言程序设计——实验报告二 实验项目名称:实验二 数据描述 实验项目类型:验证性 实验日期:2023年3月21日 一、实验目的 1、掌握C语言数据类型,熟悉如何定义一个整型、字符型和实型的变量,以及对它们赋值的方法。 2、掌握不同数据类型之间赋值的规律。 3、学会使用C的有…

三级数据库技术考点(详解!!)

1、 答疑:【解析】分布式数据库系统按不同层次提供的分布透明性有:分片透明性;②位置透明性;③局部映像透明性,位置透明性是指数据分片的分配位置对用户是透明的,用户编写程序时只需 要考虑数据分片情况,不需要了解各分片在各个场地的分配情…

实例、构造函数、原型、原型对象、prototype、__proto__、原型链……

学习原型链和原型对象,不需要说太多话,只需要给你看看几张图,你自然就懂了。 prototype 表示原型对象__proto__ 表示原型 实例、构造函数和原型对象 以 error 举例 图中的 error 表示 axios 抛出的一个错误对象(实例&#xff0…

hiveserver2拒绝连接

一、报错内容 二、解决办法 基本都是core-site.xml文件中没做好代理导致的。 在文件中添加如下配置<property><name>hadoop.proxyuser.xxx.hosts</name><value>*</value></property><property><name>hadoop.proxyuser.xxx.gro…

SpringCloud微服务集成Dubbo

1、Dubbo介绍 Apache Dubbo 是一款易用、高性能的 WEB 和 RPC 框架,同时为构建企业级微服务提供服务发现、流量治理、可观测、认证鉴权等能力、工具与最佳实践。用于解决微服务架构下的服务治理与通信问题,官方提供了 Java、Golang 等多语言 SDK 实现。使用 Dubbo 开发的微服…

修改 RabbitMQ 默认超时时间

MQ客户端正常运行&#xff0c;突然就报连接错误&#xff0c; 错误信息写的很明确&#xff0c;是客户端连接超时。 不过很疑虑&#xff0c;为什么会出现连接超时呢&#xff1f;代码没动过&#xff0c;网络也ok&#xff0c;也设置了心跳和重连机制。 最终在官网中找到了答案&am…

vs2022安装和使用教程(详细)

vs2022和vs2019一样强大&#xff0c;C/C&#xff0c;Python&#xff0c;F#&#xff0c;ios&#xff0c;Android&#xff0c;Web&#xff0c;Node.js&#xff0c;Azure&#xff0c;Unity&#xff0c;HTML&#xff0c;JavaScript等开发都可以执行&#xff0c;大家快来使用它吧~ 如…

《自动机理论、语言和计算导论》阅读笔记:p49-p67

《自动机理论、语言和计算导论》学习第4天,p49-p67总结,总计19页。 一、技术总结 1.Deterministic Finite Automata(DFA) vs Nondeterministic Finite Automata(NFA) (1)DFA定义(2)NFA定义A "nonedeterministic" finite automata has the power to be in several s…

HTTPS:原理、使用方法及安全威胁

文章目录 一、HTTPS技术原理1.1 主要技术原理1.2 HTTPS的工作过程1.2.1 握手阶段1.2.2 数据传输阶段 1.3 CA证书的签发流程1.4 HTTPS的安全性 二、HTTPS使用方法三、HTTPS安全威胁四、总结 HTTPS&#xff08;全称&#xff1a;Hyper Text Transfer Protocol over Secure Socket …

如何开始定制你自己的大型语言模型

2023年的大型语言模型领域经历了许多快速的发展和创新,发展出了更大的模型规模并且获得了更好的性能,那么我们普通用户是否可以定制我们需要的大型语言模型呢? 首先你需要有硬件的资源,对于硬件来说有2个路径可以选。高性能和低性能,这里的区别就是是功率,因为精度和消息…

探索跨海大桥新境界:3D可视化技术的魔力

跨海大桥3D可视化,不仅仅是一场技术的革新,更是桥梁建设领域的一次划时代飞跃。跨海大桥3D可视化,不仅仅是一场技术的革新,更是桥梁建设领域的一次划时代飞跃。让我们先来想象一个场景:站在海岸边,望着眼前辽阔的海面,一座雄伟的跨海大桥如巨龙般蜿蜒伸展,连接着两岸。…

cesium加载.tif格式文件

最近项目中有需要直接加载三方给的后缀名tif格式的文件 <script src"https://cdn.jsdelivr.net/npm/geotiff"></script> 或者 yarn add geotiff npm install geotiff 新建tifs.js import GeoTIFF, { fromBlob, fromUrl, fromArrayBuffer } from geotif…

macOS 编译 openssl + libcurl

libcurl库 但是不支持 https 协议 现在加上openssl 来支持https 首先下载 openssl 源码https://www.openssl.org/source我这边下载的是3.0.13 编译openssl 参考这个https://zhuanlan.zhihu.com/p/628437266 主要命令 ./Configure darwin64-x86_64-cc --prefix="/Users/a…

C++ 控制语句(一)

一 顺序结构 程序的基本结构有三种&#xff1a; 顺序结构、分支结构、循环结构 大量的实际问题需要通过各种控制流程来解决。 1.1 顺序结构 1.2 简单语句和复合语句 二 循环 2.1 for循环 语句流程图 注意&#xff1a;使用for语句的灵活性 三 while语句 四 do while语句

天翼云发布边缘安全加速平台AccessOne,四大产品能力助力企业安全高速发展

AccessOne依托中国电信分布式边缘资源,基于CDN底座,将云原生安全能力注入分布式边缘节点,实现网络底层对性能、安全、算力原子能力编排,并以“安全与加速、零信任、边缘接入、开发者平台”四大产品能力,一站式响应客户加速与防护需求,助力政企轻松管理自身业务。本文分享…

Alfred使用AppleScript来实现一键隐藏功能(老板键)

set appNames to {"WeChat","QQ"} -- 将要隐藏的进程名称放入数组中tell application "System Events"repeat with appName in appNamesset appProcess to first process whose name is appNameset appId to id of appProcesstell process appNa…

玩转云端|演唱会一票难求?快用天翼云边缘安全加速平台AccessOne!

天翼云AccessOne基于覆盖全球的海量边缘节点,能够智能分离动静态内容,通过智能负载均衡技术,让静态内容在边缘节点进行缓存,保障用户就近接入边缘节点获取资源;对于动态内容,可通过智能选路、协议优化等技术,选择最/佳链路回源,同时提供协议优化、链路优化等多项优化技…