当前位置: 首页 > news >正文

基于RabbitMQ的消息监听器

1. 背景

机构的新增、更新、删除在微服务A中已经完成了(微服务A已经部署,不能修改代码),如果在微服务A中对机构进行新增、更新、删除操作后,需要同步到自己的微服务B中,这里采用MQ消息通知的方式实现。

微服务A中配置如下:

消息发往的交换机为:itcast-auth,交换机的类型为:topic

发送消息的规则如下:

● 消息为json字符串○ 如:{"type":"ORG","content":[{"managerId":"1","parentId":"0","name":"测试组织","id":"973902113476182273","status":true}],"operation":"UPDATE"}
● type表示变更的对象,比如组织:ORG 
● content为更改对象列表
● operation类型列表○ 新增-ADD○ 修改-UPDATE○ 删除-DEL

2. 消息监听器

/*** 对于微服务A消息的处理*/
@Slf4j
@Component
public class AuthMQListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = Constants.MQ.Queues.AUTH_TRANSPORT),exchange = @Exchange(name = "${rabbitmq.exchange}", type = ExchangeTypes.TOPIC),key = "#"))public void listenAgencyMsg(String msg) {//{"type":"ORG","operation":"ADD","content":[{"id":"977263044792942657","name":"55","parentId":"0","managerId":null,"status":true}]}log.info("接收到消息 -> {}", msg);JSONObject jsonObject = JSONUtil.parseObj(msg);String type = jsonObject.getStr("type");if (!StrUtil.equalsIgnoreCase(type, "ORG")) {//非机构消息return;}String operation = jsonObject.getStr("operation");JSONObject content = (JSONObject) jsonObject.getJSONArray("content").getObj(0);String name = content.getStr("name");Long parentId = content.getLong("parentId");// 。。。消息处理。。。}}

2.1 标记监听器

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = Constants.MQ.Queues.AUTH_TRANSPORT),exchange = @Exchange(name = "${rabbitmq.exchange}", type = ExchangeTypes.TOPIC),key = "#"
))
public void listenAgencyMsg(String msg) {
  • @RabbitListener:标记该方法为RabbitMQ的消息监听器。它会监听指定的队列并处理收到的消息。
  • @QueueBinding:将队列与交换机绑定。
    • @Queue(name = Constants.MQ.Queues.AUTH_TRANSPORT):指定要监听的队列的名称,这里在常量类里定义了。
    • @Exchange(name = "${rabbitmq.exchange}", type = ExchangeTypes.TOPIC):指定交换机的名称和类型(Topic),这里在常量类里定义了,与微服务A中配置相同。
    • key = "#":路由键,这里#表示匹配所有路由键。
  • listenAgencyMsg(String msg):当消息队列接收到消息时,会调用这个方法,并将消息内容传递进来。

2.2 消息解析

log.info("接收到消息 -> {}", msg);
JSONObject jsonObject = JSONUtil.parseObj(msg);
String type = jsonObject.getStr("type");
if (!StrUtil.equalsIgnoreCase(type, "ORG")) {//非机构消息return;
}
String operation = jsonObject.getStr("operation");
JSONObject content = (JSONObject) jsonObject.getJSONArray("content").getObj(0);
String name = content.getStr("name");
Long parentId = content.getLong("parentId");
  • log.info("接收到消息 -> {}", msg):记录接收到的消息日志。
  • JSONObject jsonObject = JSONUtil.parseObj(msg):将消息字符串解析为JSON对象。
  • String type = jsonObject.getStr("type"):从消息中提取type字段。
  • if (!StrUtil.equalsIgnoreCase(type, "ORG")) { return; }:判断消息类型是否为“ORG”,如果不是,直接返回不做处理。
  • 提取operation字段:操作类型(如ADD、UPDATE、DEL)。
  • 提取content内容:content字段是一个数组,这里取第一个对象。
  • 提取name字段:表示机构的名称。
  • 提取parentId字段:表示父机构的ID。

3. RabbitMQ介绍

RabbitMQ是一种广泛使用的消息队列(Message Queue)系统,它基于AMQP(Advanced Message Queuing Protocol)协议,用于在不同的系统或组件之间传递消息。通过消息队列,系统可以实现解耦、异步处理、负载均衡等特性,从而提高系统的可扩展性和可靠性。

3.1 RabbitMQ的核心概念

  1. 生产者(Producer)

    • 生产者是消息的发送方。它负责将消息发送到RabbitMQ的交换机中。
  2. 消费者(Consumer)

    • 消费者是消息的接收方。它从RabbitMQ的队列中获取并处理消息。
  3. 队列(Queue)

    • 队列是RabbitMQ内部存储消息的地方。消息从生产者发送到队列中,消费者从队列中获取消息。队列类似于一个消息的存储池。
  4. 交换机(Exchange)

    • 交换机负责接收生产者发送的消息,并根据一定的路由规则将消息路由到一个或多个队列。交换机有不同的类型,常见的有:
      • Direct Exchange:直接交换机,根据消息的路由键精确匹配队列。
      • Fanout Exchange:扇出交换机,不考虑路由键,直接将消息广播到所有绑定的队列中。
      • Topic Exchange:主题交换机,根据路由键的模式匹配(使用通配符)将消息路由到一个或多个队列。
      • Headers Exchange:头交换机,通过消息的头部属性来路由消息。
  5. 路由键(Routing Key)

    • 路由键是生产者在将消息发送到交换机时指定的一个字符串。交换机会根据这个字符串决定将消息路由到哪个队列。
  6. 绑定(Binding)

    • 绑定是交换机和队列之间的连接关系。通过绑定,可以将交换机和队列关联起来,并通过路由键决定消息的流向。

3.2 消息的生命周期

  1. 生产者发送消息

    • 生产者将消息发送到交换机,并指定一个路由键。
  2. 交换机路由消息

    • 交换机根据路由键和绑定规则,将消息路由到一个或多个队列中。
  3. 消费者接收消息

    • 消费者从队列中取出消息并进行处理。处理完成后,消费者可以向RabbitMQ发送一个确认消息(ACK),告知RabbitMQ该消息已成功处理。
  4. 消息确认与重试

    • 如果消费者处理消息失败,可以选择不发送确认消息,RabbitMQ会将消息重新放回队列,等待其他消费者处理,或进行重试。

3.3 RabbitMQ的常见使用场景

  1. 解耦

    • 在复杂系统中,各个组件之间可能有很强的依赖性。通过消息队列,生产者和消费者可以实现解耦,生产者只需将消息发送到队列,不需要关心谁会处理这些消息。
  2. 异步处理

    • 有些任务可能是耗时操作,例如生成报告、图片处理等。通过消息队列,系统可以将这些耗时操作异步处理,不会阻塞主流程。
  3. 负载均衡

    • RabbitMQ可以将消息分发给多个消费者,从而实现负载均衡。即使流量高峰期,消息处理也不会成为系统瓶颈。
  4. 消息广播

    • 通过Fanout Exchange,可以实现消息广播,将同一消息同时发送给多个队列,让多个系统或服务同时收到消息并处理。

4. 总结

这段代码的主要作用是通过监听RabbitMQ消息队列,处理微服务A中与机构相关的消息。在微服务B中通过解析消息内容,动态确定消息的类型和需要执行的操作,并调用相应的服务处理该消息。这种设计可以有效地处理异步消息,并将业务逻辑与消息队列解耦。


http://www.mrgr.cn/news/1039.html

相关文章:

  • Xshell中弹出“ssh服务器拒绝了密码请再试一次”时,如何解决
  • 目标检测之数据增强
  • 腾讯云 AI 代码助手四大基础功能介绍
  • K8S 探针
  • 归并排序算法及优化(java)
  • 如何保证数据不丢失?(死信队列)
  • API安全
  • Postman接口Mock Servier服务器
  • 用Python实现生信分析——序列搜索和比对工具详解
  • 每天一个数据分析题(四百八十七)- 非监督学习
  • 提升独立站搜索引擎排名:关键词研究策略
  • 华为od(D卷) 环中最长子串/字符成环找偶数LOX
  • 《AI视频类工具之七——​ 有言》
  • Linux中防火墙实战之Web服务器和ssh远程服务配置指南
  • json 库的下载与使用
  • 第10章 无持久存储的文件系统 (3)
  • 如何一键删除iPhone相册所有照片
  • Java中等题-丑数(力扣)
  • leetcode322. 零钱兑换,完全背包最值问题,附背包问题模板
  • DNSmasq多个高危漏洞修复_CentOS Linux release 7.9.2009 升级 DNSmasq 到2.89