当前位置: 首页 > news >正文

RabbitMq实现延迟队列功能

1、rabbitmq服务端打开延迟插件 (超过 4294967295毫秒 ≈ 1193 小时 ≈ 49.7 天  这个时间会立即触发)

注意:只有RabbitMQ 3.6.x以上才支持

在下载好之后,解压得到.ez结尾的插件包,将其复制到RabbitMQ安装目录下的plugins文件夹。

然后通过命令行启用该插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

该插件在通过上述命令启用后就可以直接使用,不需要重启。

2、添加依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

3、配置交换机和队列

@Configuration
public class DelayedConfig {public static String EXCHANGE_NAME = "delayed_exchange";public static String QUEUE_NAME = "delayed_queue";public static String KEY_NAME = "delayed_key";/*** 基于插件实现的交换机,必须是CustomExchange类型,标识这是一个延时类型的交换机*/@Bean()public CustomExchange delayedExchange(){Map<String,Object> params = new HashMap<>();params.put("x-delayed-type","direct");//参数1:交换机名字,参数2:交换机的类型,参数3:是否持久化,参数4:是否自动删除队列,参数5:交换机的额外参数设置return new CustomExchange(EXCHANGE_NAME,"x-delayed-message",true,false,params);}@Bean()public Queue delayedQueue(){return new Queue(QUEUE_NAME);}@Beanpublic Binding delayedBinding(){return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(KEY_NAME).noargs();}}

 4、发送和接收消息

@GetMapping("/t5")public void t5(){Date date = new Date();System.out.println("发送时间:" + date.toString());//发送消息rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME,DelayedConfig.KEY_NAME,"1延迟消息wxm",msg->{msg.getMessageProperties().setHeader(MessageProperties.X_DELAY, 15552000000L);msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return msg;});//发送消息rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME,DelayedConfig.KEY_NAME,"延迟消息wxm",msg->{msg.getMessageProperties().setDelay(10000);msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return msg;});log.info("发送成功");}@RabbitHandler@RabbitListener(queues = "delayed_queue")public void getDelayed(Message message, Channel channel) throws Exception{Date date = new Date();String rightNow = date.toString();String msg = new String(message.getBody());// 手动应答System.out.println(message.getMessageProperties().getDeliveryTag());channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);System.out.println("接受成功:"+msg+rightNow);}


http://www.mrgr.cn/news/4943.html

相关文章:

  • 继承(下)【C++】
  • redis基本工具类编写
  • 【机器学习】(基础篇五) —— 逻辑回归
  • JAVA_8
  • Selenium + Python 自动化测试18(数据驱动实现测试)
  • 哈希表--字母异位词分组
  • uni-app--》打造个性化壁纸预览应用平台(二)
  • 代码随想录算法训练营第九天| 151.翻转字符串里的单词、卡码网55.右旋转字符串 、28. 找出字符串中第一个匹配项的下标、459. 重复的子字符串
  • 如何使用Pytest进行自动化测试
  • 流苏马兜铃Aristolochia fimbriata参考基因组
  • 世上最简单的安装jenkins
  • 组合模式 详解
  • 力扣: 两数之和 梦开始的地方
  • Markdown与Word中插入图片的方法及比较
  • 为啥https比http慢
  • stm32单片机学习 - 参考手册和数据手册
  • 【vue3|第24期】深入了解useRouter:方法、属性与使用示例
  • Redis做消息队列
  • CentOS7使用Rpm方式离线安装MySQL-5.7
  • sql server 截断日志的操作