当前位置: 首页 > news >正文

rust操作rabbitmq

Rust 操作 Rabbitmq

使用docker快速部署rabbitmq

docker pull rabbitmq:management
# 15672为rabbitmq 管理员端口,默认账号密码为guest(账号密码相同)
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

rust 添加amqp库lapin

cargo add lapin

1. 连接到rabbitmq

let conn=lapin::Connection::connect("amqp://localhost:5672",lapin::ConnectionProperties::default(),).await?;
let chan=conn.create_channel().await?;

2. 交换机创建和队列创建

//创建一个名为itest的交换机,模式为话题模式
chan.exchange_declare("itest",lapin::ExchangeKind::Topic,lapin::options::ExchangeDeclareOptions::default(),lapin::types::FieldTable::default(),
)
.await?;
//创建一个名为queue1的队列
chan.queue_declare("queue1",lapin::options::QueueDeclareOptions::default(),lapin::types::FieldTable::default(),
)
.await?;
//绑定队列到交换机,将名为队列queue1绑到交换机itest,并设置路由名为/queue1
chan.queue_bind("queue1","itest","/queue1",lapin::options::QueueBindOptions::default(),lapin::types::FieldTable::default(),
).await?;

3. 生产者发布消息

// 发送给itest交换机,交换机会把消息交给路由/queue1
chan.basic_publish("itest","/queue1",lapin::options::BasicPublishOptions::default(),"hello".as_bytes(),lapin::BasicProperties::default(),
).await.expect("publish message failed");

4. 消费者订阅消息

let consumer = chan.basic_consume("queue1","",lapin::options::BasicConsumeOptions::default(),lapin::types::FieldTable::default(),).await?;
consumer.set_delegate(|d: lapin::message::DeliveryResult| async move {match d {Err(err) => eprintln!("subscribe message error {err}"),Ok(data) => {if let Some(data) = data {let raw = data.data.clone();let f = data.ack(lapin::options::BasicAckOptions::default());println!("accept msg {}",String::from_utf8(raw).expect("parse msg failed"));if let Err(err) = f.await {eprintln!("ack failed {err}");}}}}
});

最终demo

#[cfg(test)]
mod mq{#[tokio::test]async fn rabbitmq() -> Result<(), Box<dyn std::error::Error>> {//连接到rabbitmqlet conn = lapin::Connection::connect("amqp://localhost:5672",lapin::ConnectionProperties::default(),).await?;let chan = conn.create_channel().await?;//初始化queue和exchangechan.queue_declare("queue1",lapin::options::QueueDeclareOptions::default(),lapin::types::FieldTable::default(),).await?;chan.exchange_declare("itest",lapin::ExchangeKind::Topic,lapin::options::ExchangeDeclareOptions::default(),lapin::types::FieldTable::default(),).await?;chan.queue_bind("queue1","itest","/queue1",lapin::options::QueueBindOptions::default(),lapin::types::FieldTable::default(),).await?;//发送消息tokio::spawn(async move {chan.basic_publish("itest","/queue1",lapin::options::BasicPublishOptions::default(),"hello".as_bytes(),lapin::BasicProperties::default(),).await.expect("publish message failed");});let chan = conn.create_channel().await?;let consumer = chan.basic_consume("queue1","",lapin::options::BasicConsumeOptions::default(),lapin::types::FieldTable::default(),).await?;//使用回调来触发接受到新消息时的操作,使用futures_lite 中StreamExt 可以不使用回调consumer.set_delegate(|d: lapin::message::DeliveryResult| async move {match d {Err(err) => eprintln!("subscribe message error {err}"),Ok(data) => {if let Some(data) = data {let raw = data.data.clone();let f = data.ack(lapin::options::BasicAckOptions::default());println!("accept msg {}",String::from_utf8(raw).expect("parse msg failed"));if let Err(err) = f.await {eprintln!("ack failed {err}");}}}}});tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;Ok(())}
}

结果展示

请添加图片描述
rabbitmq 管理后台页面可以看到我们创建的itest交换机和queue1队列向绑定,queue1的路由地址为/queue1
在这里插入图片描述

简言

amqp 包其实无论是rust 的lapin还是golang的streadway/amqp,操作手法整体都是一样的,rabbitmq其它几种模式可以参考我goalng 的rabbitmq几种模式下操作方式来类推


http://www.mrgr.cn/news/4758.html

相关文章:

  • Notion使用详解
  • EmguCV学习笔记 VB.Net 2.S 特别示例
  • 【Python】Pydantic:快速上手
  • 【与C++的邂逅】--- 类和对象(中)
  • Web客户端软件测试
  • 通过剪枝与知识蒸馏优化大型语言模型:NVIDIA在Llama 3.1模型上的实践与创新
  • 供应链技术
  • 动态规划篇-代码随想录算法训练营第三十六天l 279.完全平方数,139.单词拆分,多重背包问题
  • 博物馆地图导览:利用GIS与蓝牙定位技术,融合语音解说功能
  • 软考-软件设计师(程序设计语言习题)
  • C# 程序运行内存优化的点
  • 【iOS】——响应者链和事件传递链
  • ES6解构赋值详解;全面掌握:JavaScript解构赋值的终极指南
  • Ubuntu 下 通过 Docker 部署 Vaultwarden 服务器
  • pytorch,半精度判断,半精度和全精度之间的转化。
  • 2024前端面试题-网络篇
  • 因为嫌吵,在自己家也用上了远程控制电脑
  • 波导阵列天线单元 学习笔记3 基于空气填充双模馈网的双圆极化膜片天线阵列
  • 记录一次SQL 查询 LEFT JOIN 相关优化
  • 合宙LuatOS生成毫秒级时间戳