【RabbitMQ】可靠性策略(幂等,消息持久化)

news/2024/5/18 18:59:50

MQ可靠性策略

  • 发送者的可靠性问题
    • 生产者的重连
    • 生产者确认
  • MQ的可靠性
    • 数据持久化
    • Lazy Queue
  • 消费者的可靠性问题
    • 消费者确认机制
    • 消息失败处理
  • 业务幂等性
  • 简答问题

发送者的可靠性问题

生产者的重连

可能存在由于网络波动,出现的客户端连接MQ失败,我们可以通过配置文件配置解决

spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled:  true #开启超时重试机制initial-interval: 1000ms #失败后的初始等待时间multiplier: 1 #失败后下次的等待时长倍数,下次等待时间= initial-interval * multipliermax-attempts: 3 #最大重试次数

生产者确认

RabbitMQ提供了Publisher Confirm和Publisher Return两种确认机制,开机确认机制后,在MQ成功收到消息后会返回确认消息给生产者,返回的结果有以下几种情况:

  1. 消息投递到了MO,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
  2. 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  3. 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
  4. 其他情况都会返回NACK,告知投递失败

通过配置文件配置生产者的消息类型:

spring:rabbitmq:publisher-confirm-type: correlated #开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制

这里的publisher-confirm-type 有三种模式可选:
none:关闭confirm机制
simple: 同步阻塞等待MQ的回执消息
correlated:MQ异步调用方式返回回执消息

异步回调方式:

我们完成一个任务将消息交由消息队列中,就进行别的任务了,当消息队列返回异常问题,在过来进行对应的处理

我们需要调用ReturnCallback函数完成消息失败后的操作:
在使用之前需要配置ReturnCallback,每个RabbitTemplate只能配置一个ReturnCallback

@Configuration
public class CommonConfig implements ApplicationContextAware{@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException{//获取RabbitTemplateRabbitTemplate rabbittemplate =applicationContext.getBean(RabbitTemplate.class);//设置ReturnCallbackrabbitTemplate.setReturnCallback(message,replyCode,replyText,exchange,routingKey)->{//处理操作}}
}

通过ConfirmCallback来处理消息失败:
每一个消息指定一个ConfirmCallback

void test() throws InterruptedException{CorrelationData cd= new CorrelationData();cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.confirm>(){@Overridepublic void onFailure(Throwable ex){//Future 发生异常是的逻辑处理,基本不会触发}@Overridepublic void onSuccess(CorrelationData.confirm result){//Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){ //result.isack,boolean类型,true代表ack回执,false表示nack回执//处理逻辑}else{//异常处理}}});
}
rabbittemplate.convertAndSend("","",cd);

MQ的可靠性

在默认情况下,Rabbitmq会将接收到的数据保存在内存中以降低消息收发的延迟,这样会有问题:

  1. 一旦MQ宕机,内存的消息会丢失
  2. 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞(消息对队列将消息保存到磁盘,此时MQ阻塞)

数据持久化

RabbitMQ的数据持久化包括:

  1. 交换机持久化(Durable 永久的,Transient临时的)
  2. 队列持久化(Durable 永久的,Transient临时的)
  3. 消息持久化

消息的持久化:

void test(){Message message =MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF-8)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();
}

会将消息在过程中持久化到磁盘不会导致MQ阻塞

消息的持久化性能不是很高,可以通过Lazy Queue进行消息的持久化

Lazy Queue

惰性队列
特征:

  1. 接收到消息后直接存入磁盘而非内存(内存只保留最近消息,默认2048条)
  2. 消费者要消费消息时才会从磁盘中读取并加载到内存
  3. 支持百万条数据的消息存储

在3.12 版本后,所有的队列都是Lazy Queue模式,无法更改
在这里插入图片描述
在java中要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy:

@Bean
public Queue lazyQueue(){return QueueBuilder.durable("lazy.queue").lazy() //开启lazy模式.build();
}

通过注解也可以实现

@RabbitListener(queuesToDeclare= @Queue(name="lazy.queue",durable="true",//持久化arguments =@Argument(name="x-queue-mode",value="lazy")))
public void listenLazyQueue(String msg){//消费处理
}

消费者的可靠性问题

消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制,当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
ack:成功处理消息,RabbitMQ从队列中删除该信息
nack:消息处理失败,RabbitMQ需要再次投递信息
reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该信息
那么如何实现呢:
SprinaAMOP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式
有三种方式:
none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
auto:自动模式。SprinGAMOP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack.当业务出现异常时:
如果是业务异常,则会自动返回nack
如果是消息处理或校验异常,自动返回reject

spring:rabbitmq:listener:simple:prefetch: 1acknowledge-mode: none #none,关闭

消息失败处理

如果消费者返回nack,那就会重复进行,这样大大影响效率
我们可以利用Spring的retry机制,在消费者出现异常的时利用本地重试:

spring:rabbitmq:listener:simple:prefetch: 1retry:enabled: true  #开启消费者失败重试initial-interval: 1000ms #初始的失败等待时间multiplier: 1 #下次失败的等待时长的倍数max-attempts: 3 # 最大重试次数stateless: true # true无状态,false有状态,如果业务中包含事务,这里改为false

在开启重试模式之后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种实现:
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息,默认这种方式
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
在这里插入图片描述

将失败消息重新投递到error交换机中,可以绑定error消息队列,将来发送信息给开发人员等操作消息

将失败策略改为RepublishMessageRecoverer:

  1. 首先,定义接收失败消息的交换机,队列绑定
  2. 定义RepublishMessageRecoverer
@Bean
public MessageRecoverer test(RabbitTemplate rabbittemplate){return new RepublishMessageRecoverer(rabbitTemplate,"交换机名称","key值")
}

业务幂等性

幂等是一个数学概念,用函数表达式来描述是这样的:f(x)=f(f(x)),在程序开发中,则指同一个业务,执行一次或多次对业务状态的影响是一致的
幂等的使用场景:防止某一数据被重复进行修改
幂等业务:根据id的查询业务,根据id的删除业务等
非幂等:用户下单,扣减库存等

如何实现幂等:
方案一唯一消息id
给每一个消息都设置一个唯一id,利用id区分是否重复消息:

  1. 每一条消息都生成一个唯一id,与消息一起投递给消费者
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息id保存到数据库
  3. 如果下次又收到相同的消息,去数据库查询判断是否存在,存在则为重复消息放弃处理
@Bean
public MessageConverter messageConverter(){//定义消息转换器Jackson2JsonMessageConverter jjmc= new jackson2JsonMessageConverter();//配置自动创建消息id,用于识别不同消息,也可以在业务中基于id判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}

但是这样会造成额外的操作冗余,比如还需要写数据库等等
方案二:结合业务逻辑,基于业务本身做判断

简答问题

如何保证支付服务与交易服务之间的订单状态一致性:

  1. 首先,支付服务会在正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步
  2. 其次,为了保证消息的可靠性,我们采取了生产者确认机制,消费者确认,消费者失败重试等策略,确保消息投递和处理的可靠性,同时可开启了MQ的持久化,避免因服务宕机导致消息丢失
  3. 最后,我们还会交易服务更新订单状态时作业业务幂等判断,避免因消息重复导致订单异常

如果交易服务处理失败,还有什么方案:
在交易服务设置定时任务,定期查询订单生产状态,这样即使MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性


http://www.mrgr.cn/p/24047633

相关文章

IDEA 选择 Maven profile 后不生效

参考:Idea select maven profile sometimes not working发现切换 profile 后没有生效。可以进入运行配置,在 Before launch 中加入 compile 目标。

Laravel-设计模式最佳实践(全)

Laravel 设计模式最佳实践(全)原文:zh.annas-archive.org/md5/c21d87a1a56234879b851abfda164e5a 译者:飞龙 协议:CC BY-NC-SA 4.0前言 本书介绍了如何使用 Laravel 4 设计模式开发不同的应用程序并解决重复出现的问题。它将引导您了解广泛使用的设计模式——生成器(管理…

Laravel-入门指南(全)

Laravel 入门指南(全)原文:zh.annas-archive.org/md5/e93ac8af650aa246eabea26267ef0d7b 译者:飞龙 协议:CC BY-NC-SA 4.0第一章:Laravel 入门 欢迎来到 Laravel 入门。本书专门为您提供了开始使用 Laravel Web 开发框架所需的所有信息。您将学习 Laravel 的基础知识,开…

web3风格的网页怎么设计?分享几个,找找感觉。

web3风格的网站是指基于区块链技术和去中心化理念的网站设计风格。这种设计风格强调开放性、透明性和用户自治&#xff0c;体现了Web3的核心价值观。 以下是一些常见的Web3风格网站设计元素&#xff1a; 去中心化标志&#xff1a;在网站的设计中使用去中心化的标志&#xff0…

JAVA自定义日期选择器

下载jar地址&#xff0c; https://toedter.com/jcalendar/ jar包下载地址 依赖包如下图所示&#xff1a; 整个项目代码已经上传到CSDN https://download.csdn.net/download/qq_30273575/89241601?ydrefereraHR0cHM6Ly9tcC5jc2RuLm5ldC9tcF9kb3dubG9hZC9tYW5hZ2UvZG93bmxvYWQ…

探讨宝塔切换php版本切换失败的原因和解决方法

宝塔切换php版本是非常简单的操作,但是有时候切换失败可能会导致系统无法正常工作,给我们带来很大的麻烦和困扰。在本文中,我们将探讨可能导致宝塔切换php版本失败的一些常见原因和解决方法。一、检查宝塔是否已成功安装PHP版本在切换PHP版本之前,请确保您已经正确地安装了…

Qt在任务栏图标和系统托盘图标上显示红点

在任务栏图标上显示红点 关键类&#xff1a;QWinTaskbarButton #include <QWinTaskbarButton>QPointer<QWinTaskbarButton> taskbarBtn nullptr; if (!taskbarBtn) {taskbarBtn new QWinTaskbarButton(window);taskbarBtn->setWindow(window->windowHand…

二叉树相关的三个常见算法题

算法题一// 计算一颗二叉树的所有节点的数量 int BinaryTree_CountNode(Tnode_t *root) {int n1, n2;if (NULL == root){return 0;}n1 = BinaryTree_CountNode(root->lchild);n2 = BinaryTree_CountNode(root->rchild);return n1 + n2 + 1; }算法题二// 计算一颗二叉树的…

多输入多输出 | Matlab实现WOA-LSSVM鲸鱼算法优化最小二乘支持向量机多输入多输出预测

多输入多输出 | Matlab实现WOA-LSSVM鲸鱼算法优化最小二乘支持向量机多输入多输出预测 目录 多输入多输出 | Matlab实现WOA-LSSVM鲸鱼算法优化最小二乘支持向量机多输入多输出预测预测效果基本介绍程序设计往期精彩参考资料 预测效果 基本介绍 Matlab实现WOA-LSSVM鲸鱼算法优化…

物联网D1——建工程,配环境,注意事项

1.STLink、JLink、USB等驱动配置keil环境配置——下载芯片对应型号的包——导入库函数源文件、Core内核文件、对应芯片系统文件。 2.学会看芯片手册 3.在STM32微控制器中&#xff0c;CRH通常指的是控制寄存器高位&#xff08;Control Register High&#xff09;。 在这种情况下…

金汇龙王战神程序智慧管家app拨号精灵下载说明

金汇战神程序App下载,龙王程序app,智慧管家下载安装 厂家售后使用说明及安装教程:金汇战神系金汇科技出品战神程序,无区域限制,高性价比高,调试安装更加快捷方便,安装时间大大缩短。添加微心 ZSMJCC 咨询索取金汇相关App下载链接 手机上安装好金汇战神小精灵app后,连接上…

202209青少年软件编程(Python) 等级考试试卷(一级)

第 1 题 【单选题】 表达式 len(“学史明理增信 , 读史终生受益”) > len(" reading history will benefit you ") 的结果是? ( ) A :0 B :True C :False D :1 正确答案:C 试题解析: 第 2 题 【单选题】 在 turtle 画图中, 常常使用 turtle.color(co…

Prometheus+Grafana多方位监控

PrometheusGrafana多方位监控 契机 ⚙ 最近发现火山引擎有托管的Prometheus,可是当前是邀测阶段。并且发现火山云的ECS是自带开机自启的exporter的。刚好需要搭建一套服务器监控&#xff0c;所以研究了一套Prometheus监控&#xff0c;包含linux主机监控nginx监控es监控rabbitM…

Flink checkpoint 源码分析- Checkpoint barrier 传递源码分析

背景 在上一篇的博客里&#xff0c;大致介绍了flink checkpoint中的触发的大体流程&#xff0c;现在介绍一下触发之后下游的算子是如何做snapshot。 上一篇的文章: Flink checkpoint 源码分析- Flink Checkpoint 触发流程分析-CSDN博客 代码分析 1. 在SubtaskCheckpointCoo…

Go协程的底层原理(图文详解)

为什么要有协程 什么是进程 操作系统“程序”的最小单位进程用来占用内存空间进程相当于厂房&#xff0c;占用工厂空间 什么是线程 进程如果比作厂房&#xff0c;线程就是厂房里面的生产线&#xff1a; 每个进程可以有多个线程线程使用系统分配给进程的内存&#xff0c;线…

【python的魅力】:教你如何用几行代码实现文本语音识别

文章目录 引言一、运行效果二、文本转换为语音2.1 使用pyttsx32.2 使用SAPI实现文本转换语音2.3 使用 SpeechLib实现文本转换语音 三、语音转换为文本3.1 使用 PocketSphinx实现语音转换文本 引言 语音识别技术&#xff0c;也被称为自动语音识别&#xff0c;目标是以电脑自动将…

电脑是组装的好还是原装的好?

组装电脑和原装电脑孰优孰劣一直备受争议。困扰着无数人的问题:组装电脑和原装电脑到底哪个更出色?下面php小编新一将为您详细介绍这两者的优缺点,帮助您做出明智的选择。继续阅读,探索组装电脑的灵活性、升级潜力和性价比,以及原装电脑的稳定性、保修和简便性。电脑是组装…

32.Docker认识

Docker介绍 Docker是一个快速交付应用&#xff0c;运行应用的技术。 1.可以将程序、依赖、运行环境一起打包为一个镜像&#xff0c;可以迁移到任意Linux操作系统。 2.运行时利用沙箱机制行程隔离容器&#xff0c;各个应用互不干扰。 3.启动、移除都可以通过一行命令完成&am…

【深耕 Python】Quantum Computing 量子计算机(1)图像绘制基础

一、绘制静止图像 使用matplotlib库绘制函数图像y sin(pi * x): import math import matplotlib.pyplot as pltx_min -2.0 x_max 2.0N 1000x1 [] y1 []for i in range(N 1):x x_min (x_max - x_min) * i / Ny math.sin(math.pi * x)x1.append(x)y1.append(y)plt.xl…