【消息队列】RabbitMQ五种消息模式

news/2024/5/18 23:59:40

RabbitMQ

  • RabbitMQ
    • RabbitMQ安装
  • 常见的消息模型
    • 基本消息队列
    • SpringAMQP
    • WorkQueue
    • 消息预取
    • 发布订阅模式
      • Fanout Exchange
      • DirectExchange
      • TopicExchange
    • 消息转换器

RabbitMQ

RabbitMQ是基于Erlang语言开发的开源消息通信中间件
官网地址:https://www.rabbitmq.com/

RabbitMQ安装

我们在Centos虚拟机中使用Docker来安装

  1. 下载镜像,在线拉取
    docker pull rabbitmq
  2. 安装MQ
docker run\
--env RABBITMQ_DEFAULT_USER=itcast \  # 设置环境变量用户名
--env RABBITMQ_DEFAULT_PASS= \  # 设置环境变量密码
--name mq \   # 队列名称
--hostname mq1 \  #配置主机名
-p 15672:15672 \  # MQ管理端口
-p 5672:5672 \   #MQ消息传输端口
-d \   # 后台运行
rabbitmq

在这里插入图片描述
在这里插入图片描述

交换机的创建与消息的发送由虚拟主机来完成,每个用户的虚拟主机是相互隔离的

在RabbitMQ中:
channel:操作MQ的工具
exchange:路由消息到队列中
queue:缓存消息
virtual host:虚拟主机,是对queue,exchange等资源的逻辑分组

常见的消息模型

  1. 基本消息队列
  2. 工作消息队列

这两种并没有用到交换机,而是直接到达队列

  1. 发布订阅(Publish,Subscribe),根据交换机类型不同分为三种:
    Fanout Exchange:广播
    Direct Exchange:路由
    Topic Exchange:主题

基本消息队列

publisher:消息发布者,将消息发送到队列queue
queue:消息队列,负责接收并缓存消息
consumer:订阅队列,处理队列中的消息

java模型(消息发布者)

@Test
public void test() throws IOException,TimeoutException{//1.建立连接,与消息队列进行连接ConnetionFactory factory =new ConnetionFactory();//设置连接参数,主机名,端口号,vhost,用户名,密码factory.setHost(192.168.75.136);factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("");//建立连接Connection connection =factory.newConnection();//创建通道Channel,就可以向队列发送消息了Channel channel =connection.createChannel();//创建队列String queuename="hlh";channel.queueDeclare(queuename,false,false,false,null);//发送消息String message="hello";channel.basicPublish("",queuename,null,message.getBytes());//关闭通道和连接channel.close();connection.close();
}

java模型(消息消费者)

    //1.建立连接,与消息队列进行连接ConnetionFactory factory =new ConnetionFactory();//设置连接参数,主机名,端口号,vhost,用户名,密码factory.setHost(192.168.75.136);factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("");//建立连接Connection connection =factory.newConnection();//创建通道Channel,就可以向队列发送消息了Channel channel =connection.createChannel();//创建队列String queuename="hlh";channel.queueDeclare(queuename,false,false,false,null);//订阅消息channel.basicConsume(queuename,true,new DefaultConsumer(channel){@Override//处理消息的代码,绑定函数,有了消息才执行public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{//处理消息String message=new String(body);             }})

注意:上边生产者消费者都创建了队列:

这是为了防止消息队列中的队列不存在,在进行消息队列初始化的时候不知道是先建立消费者,还是先建立生产者,所以都执行创建函数,但是创建的队列只有一个不会重复

SpringAMQP

  • AMQP

是用于在应用程序或之间传递业务消息的开放标准,该协议与语言和平台无关,更符合微服务中的独立性的要求

  • Spring AMQP

Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息,包含两部分,其中Spring-amqp是基础抽象,spring-rabbit是底层的默认实现

  • 特征:
  1. 监听器容器,用于异步处理入站消息
  2. 用于发送和接收消息的RabbitTemplate
  3. Rabbitadmin用于自动声明队列,交换和绑定
  • 使用:
  1. 引入spring-amqp的依赖
    在这里插入图片描述
    在yml中配置mq连接信息:
spring: rabbitmq:host: 192.168.75.136 #主机名port: 5672 #端口virtual-host: / #虚拟主机username: itcast #用户名password:   #密码
  1. 在生产者服务中利用RabbitTemplate发送消息到hlh.queue这个队列
public class springamqptest{@Autowiredprivate RabbitTemplate rabbittemplate;@Testpublic void test(){String queuename="hlh.queue";String message="hello";rabbittemplate.convertAndSend(queuename,message);}
}
  1. 在消费者服务端编写消费逻辑,绑定到hlh.queue这个队列中
@Component
public class SpringrabbitListener {@RabbitListener(queues="hlh.queue")public void listenSimple(String msg) throws InterruptedException{//消费逻辑代码}
}

注意:消息一旦消费就会从队列中删除,rabbitmq没有消息回溯功能

WorkQueue

Work queue,工作队列。可以提高消息处理速度,避免队列消息堆积

一个消息队列绑定多个消费者

假设现在生产者每秒循环发送50条消息,此时的消费者怎么处理:

@Component
public class SpringrabbitListener {@RabbitListener(queues="hlh.queue")public void listenSimple(String msg) throws InterruptedException{//消费逻辑代码}@RabbitListener(queues="hlh.queue")public void listenSimple2(String msg) throws InterruptedException{//消费逻辑代码}
}

通过定义多个消费者进行消费,追上生产者生产的速度,同一个消息只能被一个消费者消费,一旦消费完就会在队列中删除

消息预取

指的每个消费者每次取多少条消息:
可以通过配置进行配置:

spring:rabbitmq:host: 192.168.75.136port: 5672virtual-host: /username: itcastpassword: listener:simple:prefecth: 1 #每次只能获取一条消息,处理完才能获得下一个消息

发布订阅模式

发布订阅可以使得同一个消息发送给多个消费者,实现方式是加入了exchange(交换机)
在这里插入图片描述

注意:exchange负责消息路由,而不是存储,路由失败则消息丢失

交换机的作用:

  1. 接收生产者的消息,将消息按照规则路由到与之绑定的队列
  2. 不能缓存消息,路由失败,消息丢失
  3. FanoutExchange的会将消息路由到每个绑定的队列

SpringAMQP提过了声明交换机,队列,绑定关系的API:
在这里插入图片描述

Fanout Exchange

Fanout Exchange 会将所有的消息路由到每一个跟其绑定的queue
在创建配置类,在配置类中进行消息队列绑定交换机

@Configuration
public class FanoutConfig{// 声明FanoutExchange交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}//声明一个队列@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}// 绑定队列跟交换机@Beanpublic Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}
}

此时的生产者如何发送消息:

public void test(){//给出交换机名称String exchangeName="itcast.fanout";String message="hello";//发送消息rabbitTemplate.convertAndSend(exchangeName,"",message);
}

监听者如何收到消息

@RabbitListener(queues="fanout.queue1")
public void listener(String msg){//处理得到的消息
}

DirectExchange

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此成为路由模式(routes)

每一个Queue都与Exchange设置一个BindingKey

发布者发送消息时,指定消息的RoutingKey,Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

一个队列可以指定多个Key

我们可以通过 @RabbitListener声明Exchange,Queue,RoutingKey
在消费者方法上注解

@RabbitListener(bindings=@QueueBinding(value=@Queue(name="direct.queue1"),exchange=@Exchange(name="itcast.direct",type=ExchangeTypes.DIRECT),key={"red","blue"}))
public void Listener(String msg){//进行消息的处理
}

在生产者生产时:

public void test(){//给出交换机名称String exchangeName="itcast.fanout";String message="hello";//发送消息rabbitTemplate.convertAndSend(exchangeName,"blue",message);
}

TopicExchange

TopicExchange与路由模式类似,区别在于routingKey必须是多个单词的列表,并且以.分隔
Queue与Exchange指定BindingKey时可以使用通配符:
#:代指0个或多个单词
*:代指一个单词
同样也是使用 @RabbitListener进行声明

@RabbitListener(bindings=@QueueBinding(value=@Queue(name="direct.queue1"),exchange=@Exchange(name="itcast.direct",type=ExchangeTypes.DIRECT),key="hi.#"))
public void Listener(String msg){//进行消息的处理
}

生产者生产消息:

public void test(){//给出交换机名称String exchangeName="itcast.fanout";String message="hello";//发送消息rabbitTemplate.convertAndSend(exchangeName,"hi.now",message);
}

消息转换器

在SpringAMQP的发送方法中,接收消息的类型是Object,也就是我们可以发送任意对象类型的消息,SpringAMQP会帮助我们序列化为字节后发送

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的,而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化

如果要修改只需定义一个MessageConverter 类型的Bean即可,推荐使用JSON方式完成序列化

  1. 引入jackson的依赖
    在这里插入图片描述
  2. 声明MessageConverter:
@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}

这样发送的消息就会使用自定义的转换类型


http://www.mrgr.cn/p/66375260

相关文章

腾讯云IM即时通信引入(React Web端组件式)

开发环境要求 React ≥ v18.0 (17.x 版本不支持) TypeScript node(12.13.0 ≤ node 版本 ≤ 17.0.0, 推荐使用 Node.js 官方 LTS 版本 16.17.0) npm(版本请与 node 版本匹配) chat-uikit-react 集成 …

Unity 编辑器工具 - 资源引用查找器

在Unity项目开发过程中,管理和维护资源之间的引用关系是至关重要的。当然我们项目也是需要这个功能 毕竟项目大了之后查找资源引用还是交给 资源引用查找器 比较好。 功能概述 资源引用查找器允许开发者选择一个目标资源,并在整个项目中查找引用了该资…

vue3--element-plus-抽屉文件上传和富文本编辑器

一、封装组件 article/components/ArticleEdit.vue <script setup> import { ref } from vue const visibleDrawer ref(false)const open (row) > {visibleDrawer.value trueconsole.log(row) }defineExpose({open }) </script><template><!-- 抽…

读天才与算法:人脑与AI的数学思维笔记18_心流机

读天才与算法:人脑与AI的数学思维笔记18_心流机1. 心流机 1.1. 在音乐中你会期盼旋律从不稳定解决到稳定,最终实现某种张力的解决 1.2. 将马尔可夫链系统中的自由与约束条件结合起来,从而形成一种更具结构化的组合 1.3. 美籍匈牙利心理学家米哈里契克森米哈赖(Mihaly Csiks…

Jmeter05:配置环境变量

1 Jmeter 环境 1.1 什么是环境变量&#xff1f;path什么用&#xff1f; 系统设置之一&#xff0c;通过设置PATH&#xff0c;可以让程序在DOS命令行直接启动 1.2 path怎么用 如果想让一个程序可以在DOS直接启动&#xff0c;需要将该程序目录配置进PATH 1.3 PATH和我们的关系…

(四)小程序学习笔记——自定义组件

1、组件注册——usingComponents &#xff08;1&#xff09;全局注册&#xff1a;在app.json文件中配置 usingComponents进行注册&#xff0c;注册后可以在任意页面使用。 &#xff08;2&#xff09;局部注册&#xff0c;在页面的json文件中配置suingComponents进行注册&#…

【Linux】awk命令学习

最近用的比较多&#xff0c;学习总结一下。 文档地址&#xff1a;https://www.gnu.org/software/gawk/manual/gawk.html 一、awk介绍二、语句结构1.条件控制语句1&#xff09;if2&#xff09;for3&#xff09;while4&#xff09;break&continue&next&exit 2.比较运…

uniapp 微信开发工具上访问正常,真机调试一直跨域报错

微信小程序真机调试时&#xff0c;出现跨域问题&#xff0c;需要同时在后端设置多种允许跨域的设置&#xff1a; // 指定允许其他域名访问 header(Access-Control-Allow-Origin:*); // 响应类型 header(Access-Control-Allow-Methods:GET,POST,OPTION); // 响应头设置 header(…

Mysql报错红温集锦(一)(ipynb配置、pymysql登录、密码带@、to_sql如何加速、触发器SIGNAL阻止插入数据)

一、jupyter notebook无法使用%sql来添加sql代码 可能原因&#xff1a; 1、没装jupyter和notebook库、没装ipython-sql库 pip install jupyter notebook ipython-sql 另外如果是vscode的话还需要安装一些相关的插件 2、没load_ext %load_ext sql 3、没正确的登录到mysql…

扫雷实现详解【递归展开+首次必展开+标记雷+取消标记雷】

扫雷 一.扫雷设计思路二.扫雷代码逐步实现1.创建游戏菜单2.初始化棋盘3.打印棋盘4.随机布置雷5.统计周围雷的个数6.递归展开棋盘7.标记雷8.删除雷的标记9.保证第一次排雷的安全性棋盘必定展开10.排查雷11.判断输赢 三.扫雷总代码四.截图 一.扫雷设计思路 1.创建游戏菜单。  2.…

深度学习500问——Chapter08:目标检测(7)

文章目录 8.3.8 RFBNet 8.3.9 M2Det 8.3.8 RFBNet RFBNet有哪些创新点 1. 提出RF block&#xff08;RFB&#xff09;模块 RFBNet主要想利用一些技巧使得轻量级模型在速度和精度上达到很好的trade-off的检测器。灵感来自人类视觉的感受野结构Receptive Fields&#xff08;RFs…

分布式websocket IM即时通讯聊天开源项目如何启动

前言 自己之前分享了分布式websocket的视频有同学去fork项目了&#xff0c;自己启动一下更方便理解项目嘛。然后把项目启动需要的东西全部梳理出来。支持群聊单聊,表情包以及发送图片。 支持消息可靠&#xff0c;消息防重&#xff0c;消息有序。同时基础架构有分布式权限&…

axios.get请求 重复键问题??

封装的接口方法&#xff1a; 数据&#xff1a; 多选框多选后 能得到对应的数组 但是请求的载荷却是这样的,导致会请求不到数据 departmentChecks 的格式看起来是一个数组&#xff0c;但是通常 HTTP 请求的查询参数不支持使用相同的键&#xff08;key&#xff09;名多次。如…

分享一个网站实现永久免费HTTPS访问的方法

免费SSL证书作为一种基础的网络安全工具&#xff0c;以其零成本的优势吸引了不少网站管理员的青睐。要实现免费HTTPS访问&#xff0c;您可以按照以下步骤操作&#xff1a; 一、 选择免费SSL证书提供商 选择一个提供免费SSL证书的服务商。如JoySSL&#xff0c;他们是国内为数不…

JVM知识总汇(JVM面试题篇5.1)

个人理解&#xff0c;所学有限&#xff0c;若有不当&#xff0c;还请指出 1.JVM是由哪些部分组成&#xff0c;运行流程是什么&#xff1f; JVM为java虚拟机&#xff0c;是java程序的运行环境&#xff08;其实是java字节码文件的运行环境&#xff09;&#xff0c;能够实现一次编…

电路板/硬件---器件

电阻 电阻作用 电阻在电路中扮演着重要的角色&#xff0c;其作用包括&#xff1a; 限制电流&#xff1a;电阻通过阻碍电子流动的自由而限制电流。这是电阻最基本的功能之一。根据欧姆定律&#xff0c;电流与电阻成正比&#xff0c;电阻越大&#xff0c;通过电阻的电流就越小。…

【副本向】Lua副本逻辑

副本生命周期 OnCopySceneTick() 子线程每次心跳调用 --副本心跳 function x3323_OnCopySceneTick(elapse)if x3323_g_IsPlayerEnter 0 thenreturn; -- 如果没人进入&#xff0c;则函数直接返回endif x3323_g_GameOver 1 thenif x3323_g_EndTick > 0 thenx3323_CountDown…

如何低成本创建个人网站?

目录 前言 网站源代码 虚拟主机或服务器 域名注册或免费二级域名 域名解析 上传源代码压缩包 添加刚刚的域名 成功搭建 失败的解决方案 结语 前言 很多小白都非常想拥有自己的网站&#xff0c;但很多人虽然有了自己的源代码但苦于不知道怎么将其变成所有人都能够访…

Jenkins流水线部署springboot项目

文章目录 Jenkins流水线任务介绍Jenkins流水线任务构建Jenkins流水线任务Groovy脚本Jenkinsfile实现 Jenkins流水线任务实现参数化构建拉取Git代码构建代码制作自定义镜像并发布 Jenkins流水线任务介绍 之前采用Jenkins的自由风格构建的项目&#xff0c;每个步骤流程都要通过不…

GPU虚拟化和算力隔离探讨

1. 术语介绍 术语 全称 说明 GPU Graphics Processing Unit 显卡 CUDA Compute Unified Device Architecture 英伟达2006年推出的计算API VT/VT-x/VT-d Intel Virtualization Technology -x表示x86 CPU&#xff0c;-d表示Device SVM AMD Secure Virtual Machine …