当前位置: 首页 > news >正文

使用kafka改造分布式事务

文章目录

  • 1、kafka确保消息不丢失?
    • 1.1、生产者端确保消息不丢失
    • 1.2、kafka服务端确保消息不丢失
    • 1.3、消费者确保正确无误的消费
  • 2、生产者发送消息 KafkaService
  • 3、UserInfoServiceImpl -> login()
  • 4、service-account - > AccountListener.java

1、kafka确保消息不丢失?

1.1、生产者端确保消息不丢失

  1. 发送模式:发后即忘、同步阻塞确认、异步非阻塞确认
  2. 生产者acks模式:props.put(“acks”, “all”)、acks: all(-1)
  3. 配置重试:props.put(“retries”, 3)、retries: 3

1.2、kafka服务端确保消息不丢失

  1. kafka是文件型的消息中间件,不会单纯的因为服务器宕机导致消息丢失
  2. 消息的log日志文件损坏:搭建kafka集群(副本)

1.3、消费者确保正确无误的消费

  1. 偏移量提交
     自动提交:enable-auto-commit: true
     手动提交:ack-mode: manual_immediate:同步提交 异步提交(推荐)
  2. 偏移量重置:
     auto-offset-reset: earliest -> 如果有偏移量则继续消费,如果偏移量没了,从头重新进行消费,可能会存在幂等性问题
     auto-offset-reset: latest -> 如果有偏移量则继续消费,如果偏移量不存在,只消费新消息,旧消息没消费完就丢掉了
     auto-offset-reset: none -> 如果有偏移量则继续消费,如果偏移量不存在,抛出异常
  3. 消费者重试:重试主题和死信主题, @RetryableTopic()

2、生产者发送消息 KafkaService

package com.atguigu.tingshu.common.service;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;import java.util.concurrent.CompletableFuture;@Service
public class KafkaService {private static final Logger logger = LoggerFactory.getLogger(KafkaService.class);@Autowiredprivate KafkaTemplate kafkaTemplate;/*** 向指定主题发送消息* 此方法通过调用重载的sendMsg方法,向指定主题发送消息,使用默认的消息标签和消息键** @param topic 发送消息的主题* @param msg   需要发送的消息内容*/public void sendMsg(String topic, String msg){// 调用重载的sendMsg方法,传入默认值以简化调用this.sendMsg(topic, null, null, msg);}/*** 发送消息到指定的Kafka主题** @param topic 消息主题* @param partition 分区编号* @param key 消息键值* @param msg 消息内容*/public void sendMsg(String topic, Integer partition, String key, String msg){// 发生消息并返回异步结果CompletableFuture<SendResult> future = this.kafkaTemplate.send(topic, partition, key, msg);// 异步处理发送结果future.whenCompleteAsync((result, ex) -> {if (ex != null){// 如果发送过程中出现异常logger.error("生产者发送消息失败!原因:{}", ex.getMessage());}});}}
  • whenCompleteAsync:异步完成时的处理、当异步操作完成时
    在这里插入图片描述

3、UserInfoServiceImpl -> login()

  • 此时 service-user 是生产者 发送消息

在这里插入图片描述

@Slf4j
@Service
@SuppressWarnings({"unchecked", "rawtypes"})
public class UserInfoServiceImpl extends ServiceImpl<UserInfoMapper, UserInfo> implements UserInfoService {@Autowiredprivate WxMaService wxMaService;@Autowiredprivate RedisTemplate redisTemplate;@Autowiredprivate UserAccountFeignClient userAccountFeignClient;@Autowiredprivate KafkaService kafkaService;/*** 根据微信返回的code进行用户登录* @param code 微信登录凭证* @return 返回包含登录令牌的Map对象*///@GlobalTransactional//@Transactional@Overridepublic Map<String, Object> login(String code) {// 创建一个HashMap对象用于存放返回的数据HashMap<String, Object> map = new HashMap<>();try {// 通过微信服务获取用户的会话信息WxMaJscode2SessionResult sessionInfo = this.wxMaService.getUserService().getSessionInfo(code);// 获取用户的openidString openid = sessionInfo.getOpenid();// 查询数据库中是否存在该openid对应的用户信息UserInfo userInfo = this.getOne(new LambdaQueryWrapper<UserInfo>().eq(UserInfo::getWxOpenId, openid));if (userInfo == null) {// 如果用户不存在,则创建一个新的UserInfo对象userInfo = new UserInfo();// 设置用户的openiduserInfo.setWxOpenId(openid);// 设置用户的昵称,其中包含一个随机生成的IDuserInfo.setNickname("这家伙太懒"+ IdWorker.getIdStr());// 设置用户的头像URLuserInfo.setAvatarUrl("https://img0.baidu.com/it/u=1633409170,3159960019&fm=253&fmt=auto&app=138&f=JPEG?w=500&h=500");// 保存用户信息到数据库this.save(userInfo);// 初始化用户账号信息//userAccountFeignClient.initAccount(userInfo.getId());this.kafkaService.sendMsg(KafkaConstant.QUEUE_USER_REGISTER,userInfo.getId().toString());//int i = 1 / 0;}// 生成一个随机的登录令牌String token = UUID.randomUUID().toString();// 创建一个UserInfoVo对象,用于存放用户信息UserInfoVo userInfoVo = new UserInfoVo();// 将UserInfo对象的属性复制到UserInfoVo对象中BeanUtils.copyProperties(userInfo, userInfoVo);// 将用户信息存储到Redis中,设置过期时间为30分钟this.redisTemplate.opsForValue().set(RedisConstant.USER_LOGIN_KEY_PREFIX + token, userInfoVo,RedisConstant.USER_LOGIN_KEY_TIMEOUT, TimeUnit.SECONDS);// 将生成的登录令牌放入Map对象中map.put("token", token);// 返回包含登录令牌的Map对象return map;} catch (WxErrorException e) {// 如果发生微信错误异常,抛出自定义的异常throw new GuiguException(ResultCodeEnum.LOGIN_AUTH);}}}

在这里插入图片描述

4、service-account - > AccountListener.java

  • 此时 service-account 是消费者 接收消息

在这里插入图片描述

@Slf4j
@Component
public class AccountListener {@Autowiredprivate UserAccountService userAccountService;@RetryableTopic(backoff = @Backoff(2000))@KafkaListener(topics = KafkaConstant.QUEUE_USER_REGISTER)public void listen(String userId, Acknowledgment ack){// 如果是空消息直接确认掉,后续不用再执行if (StringUtils.isBlank(userId)) {ack.acknowledge();return;}// 初始化账户this.userAccountService.saveAccount(Long.valueOf(userId));ack.acknowledge();// 手动确认}
}

在这里插入图片描述


http://www.mrgr.cn/news/10437.html

相关文章:

  • 一文搞定MybatisPlus
  • asio之task_io_service多线程
  • [Linux]如何在虚拟机安装Ubuntu?(小白向)
  • Java获取List实体交集差集
  • docker inspect输出内容详解,推测容器运行命令
  • 【案例59】WebSphere类加载跟踪开启方法
  • 吴恩达谈AI未来:Agentic Workflow、推理成本下降与开源的优势
  • 【Kotlin设计模式】Kotlin实现工厂模式
  • Python-断点续传的方式下载GPM降水数据
  • 企业高性能web服务器知识点合集
  • [指南]微软发布Windows-Linux双系统无法启动的完整修复方案
  • 可变参数模板(C++11)
  • 深度学习设计模式之策略模式
  • 【HTML】DOCTYPE作用
  • 【clickhouse】访问clickhouse数据库,并且插入数据
  • vue3+elementPlus:无法清空问题,清空表单没效果
  • Clearpool 推出 Ozean:专注 RWA 的高性能创新区块链
  • 使用 QML 类型系统注册 C++ 类型
  • Threejs三要素及demo
  • 高标准农田灌区对农业发展的支撑作用