当前位置: 首页 > news >正文

第十四章 RabbitMQ延迟消息之延迟队列

目录

一、引言

二、死信队列

三、核心代码实现

四、运行效果 

五、总结


一、引言

什么是延迟消息?

发送者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间后收到消息。

什么是延迟任务?

设置在一定时间之后才执行的任务。

延迟消息使用场景

我们在实际项目中经常会有一些场景,需要延迟指定时间后发送消息,比如在电商或者外卖平台中订单10分钟后自动取消功能等。

对于上述延迟消息的场景,我们该怎么实现呢?

RabbitMQ 官方并没有直接内置延迟消息的功能,但是可以通过 TTL(Time-To-Live)和死信队列(Dead Letter Exchanges)的组合来实现延迟消息的效果,另外RabbitMQ 也可以通过安装延迟消息插件的方式来实现。

二、死信队列

电商购物中,针对用户下单扣减库存的服务逻辑,我们希望删除10分钟后状态为未支付的订单。在过去的项目中,我们可能第一时间会想到通过定时任务定期查询未支付的订单并做删除来实现:

定时任务会有两个问题:

1. 当针对订单量特别大的电商项目而言,定时任务间断性地查询整个订单数据会极大增加订单服务的压力。

2. 定时任务存在时间上的滞后性。

通过使用RabbitMQ延迟消息,我们可以在完成需求的同时,有效的避免上述问题。如下图所示,用户通过交易服务下单(状态为未支付),随后交易服务调用商品服务扣减库存。 用户在调用交易服务的同时发送一个延迟消息到RabbitMQ,10分钟后交易服务收到消息,此时如果订单还是未支付状态,则取消订单。

RabbitMQ中的死信队列,就是一种可以实现延迟消息的方式。当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):

1. 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false

2. 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费

3. 要投递的队列消息堆积满了,最早的消息可能成为死信

如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。

三、核心代码实现

package com.example.consumer;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 常规的RabbitMQ 交换机/队列绑定配置类*/
@Configuration
public class RabbitMQConfig {@BeanQueue normalQueue() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable("normal.queue").deadLetterExchange("dead.direct").deadLetterRoutingKey("dead").build();}@BeanDirectExchange normalDirect() {return ExchangeBuilder.directExchange("normal.direct").build();}@BeanBinding bindingNormal(Queue normalQueue, DirectExchange normalDirect) {return BindingBuilder.bind(normalQueue).to(normalDirect).with("normal");}@BeanQueue deadQueue() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable("dead.queue").build();}@BeanDirectExchange deadDirect() {return ExchangeBuilder.directExchange("dead.direct").build();}@BeanBinding bindingDead(Queue deadQueue, DirectExchange deadDirect) {return BindingBuilder.bind(deadQueue).to(deadDirect).with("dead");}
}
package com.example.publisher;import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;import java.nio.charset.StandardCharsets;/*** 生产者*/
@Slf4j
@SpringBootTest
class PublisherApplicationTests {@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid test() {String content = "生活不易,所以保持足够的努力,对自己要有信心,积极地去面对工作生活的挑战!";Message message = MessageBuilder.withBody(content.getBytes(StandardCharsets.UTF_8)).setExpiration("10000").build();rabbitTemplate.convertAndSend("normal.direct","normal", message);}
}
package com.example.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;/*** 消费者*/
@Slf4j
@Component
public class SimpleListener {@RabbitListener(queues = "dead.queue")public void listener1(Message message) throws Exception {String msg = new String(message.getBody(), StandardCharsets.UTF_8); ;System.out.println("消费者1:人生是个不断攀登的过程【" + msg + "】");}
}

四、运行效果 

五、总结

虽然我们通过RabbitMQ的死信队列能够实现延迟消息的功能,但是通过代码我们可以看到,这种实现方式相对来说比较繁琐。而且关键是RabbitMQ提供死信队列的初衷并不是让我们用来发送延迟消息的,而是为了作为兜底方案,来接收没有消费的死信的,便于定位问题。因此,后续章节会给大家讲解更优的解决方案,即延迟插件。


http://www.mrgr.cn/news/48456.html

相关文章:

  • 详细学习 pandas 和 xlrd:从零开始
  • 实测9款AI文件助手!原来最好用的并不是全网称赞的谷歌NotebookLM...
  • 京准:时间频率(北斗授时设备)助力广电网络
  • Unity的Compute Shader如何进行同步?
  • JVS低代码轻应用是什么?是如何拼装的?这篇文章讲的非常详细
  • Docker 教程一(简介)
  • NVM 切换Node.js版本工具
  • 家政行业怎么运营
  • 【从零开始的LeetCode-算法】3158.求出出现两次数字的 XOR 值
  • 2024骨传导耳机推荐:五款耐用跑步骨传导耳机!贴心附带选购技巧~
  • 2024双11有哪些值得入手的好物?2024年双十一好物推荐
  • 2024年重要的AI内容总结与梳理,多模型思维链及模型,重要文章及AI学术总结
  • 单服务器基于 Nginx 负载均衡 + Docker Compose 提高并发量
  • 界面控件DevExpress中文教程 - 如何拓展具有AI功能的文本编辑器(二)
  • modelsim 软件的使用.
  • 利用FnOS搭建虚拟云桌面,并搭建前端开发环境(二)
  • oracle实例宕机,虚拟机磁盘精简配置模式,磁盘无法扩展
  • Java 函数式编程
  • Python编程学习第一篇——Python零基础快速入门(六)(7)模块
  • 深兰科技|“武汉市AI心理热线医工交叉研发合作基地”正式揭牌