当前位置: 首页 > news >正文

RocketMQ 与 Spring Cloud Stream之事务消息配置

1 引言

RocketMQ的事务消息设计是为了解决分布式系统中数据一致性的问题。在分布式系统中,由于数据可能分布在不同的服务或节点上,因此需要一种机制来确保数据的最终一致性。事务消息通过引入本地事务和消息状态的关联,确保了消息的发送与本地事务的执行结果紧密相关,从而避免了数据不一致的问题。

2 事务消息步骤

  1. 生产者将半事务消息发送至 RocketMQ Broker。
  2. RocketMQ Broker 将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息暂不能投递,为半事务消息。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
    4.1 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    4.2 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
    5.1 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
    5.2 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
  6. 注意:服务端仅仅会按照参数尝试指定次数,超过次数后事务会强制回滚,因此未决事务的回查时效性非常关键,需要按照业务的实际风险来设置。

3 项目结构

本章内容以Spring Cloud Alibaba 快速学习之 RocketMQ中的项目为基础稍作修改。下面列出修改的文件。

3.1 项目rocketmq-producer

在这里插入图片描述

  • application.properties
    这里添加了事务相关的配置
 #spring应用程序监听的端口号
server.port=8080
#spring应用程序的名称
spring.application.name=rocketmq-producer
#spring当前激活的配置文件
spring.profiles.active=dev#rocketmq 服务地址
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
#rocketmq 通道分组
spring.cloud.stream.bindings.testchannel-out-0.producer.group=test-group
#rocketmq 通道目标
spring.cloud.stream.bindings.testchannel-out-0.destination=test-destination
#rocketmq 通道消息类型
spring.cloud.stream.bindings.testchannel-out-0.content-type=application/json#rocketmq 事务消息配置
spring.cloud.stream.rocketmq.bindings.testchannel-out-0.producer.producerType=Trans
#rocketmq 事务消息分组
spring.cloud.stream.rocketmq.bindings.testchannel-out-0.producer.group=test-group
#rocketmq 事务消息监听
spring.cloud.stream.rocketmq.bindings.testchannel-out-0.producer.transactionListener=RocketMQTransactionListener
  • rocketmq-producer/src/main/java/org/example/controller/TestController.java
 package org.example.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.HashMap;
import java.util.Map;@RestController
@RequestMapping("/test")
public class TestController {@Autowiredprivate StreamBridge streamBridge;@GetMapping("/send")public String send() {for (int i = 0; i < 5; i++) {Map<String, String> map = new HashMap<>();String id = i + "";map.put("id", id);map.put("msg", "测试消息");MessageBuilder<Map<String, String>> builder = MessageBuilder.withPayload(map);streamBridge.send("testchannel-out-0", builder.build());}return "消息发送成功!";}}
  • rocketmq-producer/src/main/java/org/example/conf/RocketMQTransactionListener.java
    这里添加了事务监听器,注意@Component名称与配置文件中对应
package org.example.conf;import com.alibaba.fastjson2.JSONObject;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;@Component("RocketMQTransactionListener")
public class RocketMQTransactionListener implements TransactionListener {@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {String msg = new String(message.getBody());System.out.println("execute:" + msg);JSONObject jsonObject = JSONObject.parseObject(msg);if (jsonObject.getIntValue("id") == 0) {return LocalTransactionState.COMMIT_MESSAGE;}if (jsonObject.getIntValue("id") == 1) {return LocalTransactionState.UNKNOW;}return LocalTransactionState.ROLLBACK_MESSAGE;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt messageExt) {System.out.println("check:" + new String(messageExt.getBody()));return LocalTransactionState.COMMIT_MESSAGE;}
}

3.2 项目rocketmq-consumer-b

在这里插入图片描述

  • application.properties
    这里注释了接收广播消息
#spring应用程序监听的端口号
server.port=8082
#spring应用程序的名称
spring.application.name=rocketmq-consumer-b
#spring当前激活的配置文件
spring.profiles.active=dev#rocketmq 服务地址
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
#rocketmq 接受消息的方法名,须保持与通道名一致
spring.cloud.stream.function.definition=testchannel
# 通道接收广播消息
#spring.cloud.stream.rocketmq.bindings.testchannel-in-0.consumer.messageModel=BROADCASTING#rocketmq 通道分组
spring.cloud.stream.bindings.testchannel-in-0.group=test-group
#rocketmq 通道目标
spring.cloud.stream.bindings.testchannel-in-0.destination=test-destination
#rocketmq 通道消息类型
spring.cloud.stream.bindings.testchannel-in-0.content-type=application/json

3.3 项目rocketmq-consumer-a

在这里插入图片描述

  • application.properties
    这里也是注释了接收广播消息
#spring应用程序监听的端口号
server.port=8081
#spring应用程序的名称
spring.application.name=rocketmq-consumer-b
#spring当前激活的配置文件
spring.profiles.active=dev#rocketmq 服务地址
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
#rocketmq 接受消息的方法名,须保持与通道名一致
spring.cloud.stream.function.definition=testchannel
# 通道接收广播消息
#spring.cloud.stream.rocketmq.bindings.testchannel-in-0.consumer.messageModel=BROADCASTING#rocketmq 通道分组
spring.cloud.stream.bindings.testchannel-in-0.group=test-group
#rocketmq 通道目标
spring.cloud.stream.bindings.testchannel-in-0.destination=test-destination
#rocketmq 通道消息类型
spring.cloud.stream.bindings.testchannel-in-0.content-type=application/json

4 测试

4.1 同时启动三个子项目

在这里插入图片描述

4.2 发送消息

  • 打开浏览器访问:http://localhost:8080/test/send,可以看到5条消息都进入了executeLocalTransaction方法。

在这里插入图片描述 - 其中消息id为0二次确认结果为Commit,被consumerA正常接收,这与监听器中代码功能一致。
在这里插入图片描述- 其中消息id为1二次确认结果为Unknown,触发回查,在回查中正常Commit,被consumerB正常接收,这与监听器中代码功能一致。
在这里插入图片描述在这里插入图片描述- 其他消息二次确认结果为Rollback,服务端将回滚事务,不会将半事务消息投递给消费者,这与监听器中代码功能一致。

5 完整代码

Gitee代码链接


http://www.mrgr.cn/news/8772.html

相关文章:

  • 【Vue】计算属性和监听属性
  • springdatajpa解决postgresql数据库字段驼峰命名问题
  • C++系列-多态的基本语法
  • repo的patch转换成git am能打的patch
  • 数据结构:(OJ题力扣 20). 有效的括号
  • 怎样写好提示词(Prompt) 一
  • CyberScraper-2077+simple-one-api:使用大模型爬虫
  • Xv6驱动(一):PLIC
  • 51单片机——数码管控制
  • linux驱动:(16)在设备树添加自定义节点
  • 23次8.7(mysql主从脚本与mysql详细语句介绍)
  • Linux 终端显示 Git 当前所在分支
  • RabbitMQ安装 docker
  • 【Redis】Redis 持久化 -- RDB AOF
  • 层次分析法
  • 【设计模式】模板方法模式和迭代器模式
  • 单片机外部中断+定时器实现红外遥控NEC协议解码
  • LEAP模型在能源环境发展、碳排放建模预测及分析中实践应用
  • java操作zookeeper
  • 【话题】关于工厂模式和策略模式