文章目录
- 1、kafka确保消息不丢失?
- 1.1、生产者端确保消息不丢失
- 1.2、kafka服务端确保消息不丢失
- 1.3、消费者确保正确无误的消费
- 2、生产者发送消息 KafkaService
- 3、UserInfoServiceImpl -> login()
- 4、service-account - > AccountListener.java
1、kafka确保消息不丢失?
1.1、生产者端确保消息不丢失
- 发送模式:发后即忘、同步阻塞确认、异步非阻塞确认
- 生产者acks模式:props.put(“acks”, “all”)、acks: all(-1)
- 配置重试:props.put(“retries”, 3)、retries: 3
1.2、kafka服务端确保消息不丢失
- kafka是文件型的消息中间件,不会单纯的因为服务器宕机导致消息丢失
- 消息的log日志文件损坏:搭建kafka集群(副本)
1.3、消费者确保正确无误的消费
- 偏移量提交
自动提交:enable-auto-commit: true
手动提交:ack-mode: manual_immediate:同步提交 异步提交(推荐) - 偏移量重置:
auto-offset-reset: earliest -> 如果有偏移量则继续消费,如果偏移量没了,从头重新进行消费,可能会存在幂等性问题
auto-offset-reset: latest -> 如果有偏移量则继续消费,如果偏移量不存在,只消费新消息,旧消息没消费完就丢掉了
auto-offset-reset: none -> 如果有偏移量则继续消费,如果偏移量不存在,抛出异常 - 消费者重试:重试主题和死信主题, @RetryableTopic()
2、生产者发送消息 KafkaService
package com.atguigu.tingshu.common.service;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;import java.util.concurrent.CompletableFuture;@Service
public class KafkaService {private static final Logger logger = LoggerFactory.getLogger(KafkaService.class);@Autowiredprivate KafkaTemplate kafkaTemplate;public void sendMsg(String topic, String msg){this.sendMsg(topic, null, null, msg);}public void sendMsg(String topic, Integer partition, String key, String msg){CompletableFuture<SendResult> future = this.kafkaTemplate.send(topic, partition, key, msg);future.whenCompleteAsync((result, ex) -> {if (ex != null){logger.error("生产者发送消息失败!原因:{}", ex.getMessage());}});}}
whenCompleteAsync:异步完成时的处理、当异步操作完成时

3、UserInfoServiceImpl -> login()
此时 service-user 是生产者 发送消息

@Slf4j
@Service
@SuppressWarnings({"unchecked", "rawtypes"})
public class UserInfoServiceImpl extends ServiceImpl<UserInfoMapper, UserInfo> implements UserInfoService {@Autowiredprivate WxMaService wxMaService;@Autowiredprivate RedisTemplate redisTemplate;@Autowiredprivate UserAccountFeignClient userAccountFeignClient;@Autowiredprivate KafkaService kafkaService;@Overridepublic Map<String, Object> login(String code) {HashMap<String, Object> map = new HashMap<>();try {WxMaJscode2SessionResult sessionInfo = this.wxMaService.getUserService().getSessionInfo(code);String openid = sessionInfo.getOpenid();UserInfo userInfo = this.getOne(new LambdaQueryWrapper<UserInfo>().eq(UserInfo::getWxOpenId, openid));if (userInfo == null) {userInfo = new UserInfo();userInfo.setWxOpenId(openid);userInfo.setNickname("这家伙太懒"+ IdWorker.getIdStr());userInfo.setAvatarUrl("https://img0.baidu.com/it/u=1633409170,3159960019&fm=253&fmt=auto&app=138&f=JPEG?w=500&h=500");this.save(userInfo);this.kafkaService.sendMsg(KafkaConstant.QUEUE_USER_REGISTER,userInfo.getId().toString());}String token = UUID.randomUUID().toString();UserInfoVo userInfoVo = new UserInfoVo();BeanUtils.copyProperties(userInfo, userInfoVo);this.redisTemplate.opsForValue().set(RedisConstant.USER_LOGIN_KEY_PREFIX + token, userInfoVo,RedisConstant.USER_LOGIN_KEY_TIMEOUT, TimeUnit.SECONDS);map.put("token", token);return map;} catch (WxErrorException e) {throw new GuiguException(ResultCodeEnum.LOGIN_AUTH);}}}

4、service-account - > AccountListener.java
此时 service-account 是消费者 接收消息

@Slf4j
@Component
public class AccountListener {@Autowiredprivate UserAccountService userAccountService;@RetryableTopic(backoff = @Backoff(2000))@KafkaListener(topics = KafkaConstant.QUEUE_USER_REGISTER)public void listen(String userId, Acknowledgment ack){if (StringUtils.isBlank(userId)) {ack.acknowledge();return;}this.userAccountService.saveAccount(Long.valueOf(userId));ack.acknowledge();}
}
