当前位置: 首页 > news >正文

RabbitMQ 2025/3/5

高性能异步通信组件。

同步调用

以支付为例:

可见容易发生雪崩。

异步调用

以支付为例:

支付服务当甩手掌柜了,不管后面的几个服务的结果。只管库库发,后面那几个服务想取的时候就取,因为消息代理里可以一直装,缓存消息。

消息代理(英文Broker)

消息代理相关的技术MQ技术

Erlang面向并发的语言

 

RabbitMQ安装部署

可直接采用Docker,方便。

RabbitMQ消息流转的过程(整体架构)

消费者监听队列,发送者不直接发给队列,而是发给exchange交换机,交换机会根据规则把消息路由给不同队列。

因为RabbitMQ的性能很强,每秒钟可以达到数万的并发,所以企业有多个项目的话,往往部署一套RabbitMQ就够了,多个项目可以共享RabbitMQ服务。但是大家一起的话,很可能交换机出现冲突,所以引出RabbitMQ的新概念virtual-host(虚拟主机),类似与MySQL里的database(众所周知,MySQL里可以创建多个database,每个database的表和其他database的表直接是相互隔离的),每个项目都创一个自己的virtual-host,就可以相互隔离开了。

例子:MQ入门-06.RabbitMQ-快速入门_哔哩哔哩_bilibili

交换机不存只负责转发。交换机和队列必须有一个关系,才能给队列发消息。binding绑定关系。

数据隔离

虚拟主机是实现了数据隔离。

不同的项目创建不同的用户。为新建用户建一个虚拟主机。

视频举例:MQ入门-07.RabbitMQ-数据隔离_哔哩哔哩_bilibili

RabbitMQ的java客户端

这里我们不采用Rabbit官方提供的java客户端,而是Spring AMQP,它是基于AMQP协议(消息收发与语言和平台无关),官方提供的java客户端使用起来繁琐,所以使用Spring AMQP。

4:00  MQ入门-08.Java客户端-快速入门_哔哩哔哩_bilibili

因为是简单入门案例,可以省去交换机这个步骤。

控制台创建队列->pom里引入spring-amqp依赖->yaml里配置RabbitMQ服务端信息(如地址主机名、端口、虚拟主机名、用户、密码),这样微服务才能连接到RabbitMQ->发送消息(SpringAMQP提供了RabbitTemplate工具类,方便我们发送消息)->接受消息

接收者接受消息的代码:加一个@Comonent把它注册成Spring的一个Bean,这个类的内部要有一个方法,这个方法要加上@RabbitListener(也就是消费者的监听者)的注解后面再带上队列名,现在只要队列有消息,方法就可以拿到了。方法参数自己设。

WorkQueues

任务模型,让多个消费者绑定到一个队列,共同消费队列中的消息。

那么队列中的消息会被哪个消费者收到呢?

模拟WorkQueue(实现一个队列绑定多个消费者)

MQ入门-09.Java客户端-WorkQueue_哔哩哔哩_bilibili

在RabbitMQ控制台建立一个队列->一个发消息的->2个消费者

下图两个消费者,一个发送者。

 

队列同一个消息只能被一个消费者处理,很多条消息的话均匀分配(默认轮询,1人1条)。(上图2个消费者结果有奇偶规律)

把很多消息平均分给消费者,可以加快消息处理速度。

每个消费者能力不一样(通过sleep(ms)修改),均匀分配消息(默认)肯定是不合理的,只需要修改application.yml,设置preFetch值为1,实现能者多劳。

Fanout交换机(广播交换机)

再看一下以前学的这个发消息的过程。带有交换机的完整模型。

交换机的作用是接受发送者发送的消息,并将消息路由到与其绑定的队列。

那么什么是Fanout交换器?

特征:把接收到的消息路由到每一个和他绑定的队列。(队列中的消息只能被一个消费者处理,有了Fanout交换机,发的消息就可以被多个消费者处理了。我们完全可以给每个微服务创建一个队列,然后队列绑到交换机上,fanout交换机想广播一样,给每个队列群发/复制消息。)

案例步骤:声明2个队列,一个交换机exhanges,然后banding绑定,第一种方法可以直接用控制台发消息然后查看,还有一种方法java代码,2段接受代码,1段发送代码(这回调3个参数的api,exchange,null,message,之前写的例子用到的都是两个参数,队列名和消息)。

Direct交换机(定向)

MQ入门-11.Java客户端-Direct交换机_哔哩哔哩_bilibili

和原来差不多,这回交换机选direct,队列设置bindingkey,发送时发送者的参数分别设好exchange,routingkey值,message。其余的步骤和原来一样。

所以两个key值一样,direct交换机也能实现Fanout交换机的功能。一个queue可以设多个bindingkey。

Topic交换机(话题)

 队列bindingkey可以通过通配符简易设置。

topic和direct比除了多了一个统配符,功能差不多。

声明队列交换机

之前队列和交换机的创建都是依靠控制台,这次学习用代码声明队列交换机,这样项目一启动就会自己创建队列和交换机了。

可以用new的方式。(更简单)

也可以用builder方式。

发送者只管发,什么也不关心,所以通常我们在消费者这一端,声明队列交换机及绑定关系。

步骤(以Fanout交换机为例):创建FanoutConfiguration类,声明交换机,队列,绑定关系。可以new(比较简单),也可以builder构建(比较专业)。

上述基于JavaBean绑定太麻烦了,这回学习注解@RatbbitListener。

基于JavaBean还是基于注解,完全个人喜好了。(但是如果基于javabean,声明direct交换机好像没法写很多bindingKey)

消息转换器

以前我们发消息一直用到rabbitTemplate的convertAndSend方法。我们发消息是可以传任何java对象作为消息,网络传输其实是以字节传的,直接对象不行,因此传的时候要把java对象转换成字节。这个转换就是由消息转换器转的。

java里有一种JDK自带的序列化的一个工具(能把任一java对象序列化成字节的形式),所以这个消息转换的过程就是采用JDK自带的序列化方式。

ObjectOutputStream:对象流,jdk自带的序列化工具,能把任意java对象序列化成字节。

但是推荐使用JSON的消息转换器。

发送方和接收方一定要用相同的消息转换器。

pom配置好之后到MAVEN里刷新一下。

给发送方和接收方都配置一个消息转换类。

对比图(下边的JSON的消息转换器)

业务改造

把业务从OpenFeign的同步调用改成基于MQ的异步调用。

OpenFeign:02-基本概念_哔哩哔哩_bilibili

nacos可以抽取共享配置,不用重复进行同样的配置。

共同代码可以写到common里

消息可靠性

发送者的可靠性

发送者重连

yml

发送者确认

路由失败的两种原因:

exchange2交换机没连队列,路由失败。返回ACK(因为消息确实发出去了)

routingkey和bandingkey没有匹配上的。

如果返回NACK要重发消息。

none是关闭(默认)

simple是同步阻塞

correlated异步的

至此,ConfirmCallback和ReturnCallback全部写完了。

唯一,uuid,最简单的随机算法

MQ的可靠性

消息传到MQ也不一样可靠,因为MQ本身也可能把消息弄丢。

MQ内存储存的方式,可能弄丢消息,还有可能导致MQ性能下降和阻塞。

为此,产生两个方案:数据持久化(把数据提前持久化存到磁盘,提前的意思:不是等到满了再存到磁盘,提前就开始往磁盘存了)、LazyQueue

数据持久化

持久化的一种方法:写出到磁盘,这样就永久保存了。

交换机的持久化(默认都是持久的交换机、durable属性)(Spring的AMQP代码生成的交换机默认也是持久化的)

队列持久化(默认持久化durable属性)(Spring的AMQP代码生成的默认也是持久化的)

消息持久化(手动设置为Persistent)

LazyQueue

上节课写了数据持久化,当我们把交换机、队列、消息持久化了以后,就不用再担心MQ宕机而导致消息丢失了,不仅如此,RabbitMQ也不会再因为消息堆积配置out而出现阻塞了。但是数据持久化后,不仅要在内存里写,还要再磁盘里写一份,这样每条消息处理的耗时就增加了,这也就导致它的整体并发能力有点下降。为了解决这个问题,引入了一种新的队列模式,lazy Queue(消息直接写磁盘里)。它不仅仅具有数据持久化的优势,同时还解决了并发能力下降的问题。

那么怎样去设置一个队列变成Lazy Queue模式呢

第一种方法,控制台添加

第二种方法,代码添加(声明bean的方式或者注解都可以)

消费者的可靠性

消费者确认机制

三种状态(ack,nack,reject):不管哪种状态,都不能在刚收到消息的时候就返回,我们应该根据处理结果去做判断,也就是consumer处理完的时候。

注意:这回配置的是消费者下的application文件。

失败重试机制

延迟消息


http://www.mrgr.cn/news/93317.html

相关文章:

  • sqli-lab靶场学习(七)——Less23-25(关键字被过滤、二次注入)
  • 基于Matlab的多目标粒子群优化
  • c++ 纯虚函数
  • c++ 接口/多态
  • 国内支持Stable Diffusion模型的平台
  • CSS—属性继承与预处理器:2分钟掌握预处理器
  • 机器学习--特征选择
  • 自然语言模型(NLP)介绍
  • 鬼泣:项目前置设置杂项
  • 【零基础C语言】第四节 数组
  • QT5 GPU使用
  • vLLM代码推理Qwen2-VL多模态大模型(远程服务器解决方案,无需UI)
  • 机器学习数学基础:40.结构方程模型(SEM)中卡方值与卡方自由度比
  • 人工智能神经网络基本原理
  • cursor使用经验分享(java后端服务开发向)
  • Dify+DeepSeek | Excel数据一键可视化(创建步骤案例)(echarts助手.yml)(文档表格转图表、根据表格绘制图表、Excel绘制图表)
  • 启智平台华为昇腾910B 运行DeepSeek Janus-Pro-7/1B
  • Dify 开源大语言模型应用开发平台使用(一)
  • 【Oracle学习笔记】1.数据库组成对象
  • mybatis日期格式与字符串不匹配bug