当前位置: 首页 > news >正文

消息队列基础概念及选型,常见解决方案包括消息可靠性、消息有序、消息堆积、重复消费、事务消息

前言

是时候总结下消息队列相关知识点啦!我搓搓搓搓

本文包括消息队列基础概念介绍,常见解决方案包括消息可靠性、消息有序、消息堆积、重复消费、事务消息

参考资料:

Kafka常见问题总结 | JavaGuide

RocketMQ常见问题总结 | JavaGuide

【原创】消息队列的消费语义和投递语义 - 孤独烟 - 博客园

Kafka事务是怎么实现的?Kafka事务消息原理详解(文末送书)-CSDN博客

Kafka/RocketMQ事务消息对比 - 简书

1. 什么是消息队列

消息队列可以看作是存放消息的容器,常用于分布式系统中,在消息的生产者和消费者之间引入一个缓冲区

它的作用主要由三点概括,分别是

  1. 解耦:生产者和消费者之间没有直接的调用关系,新增或者修改模块对其它模块的影响较小
  2. 异步:生产者无需等待消息消费完成就即刻返回,减少用户请求的响应时间
  3. 削峰:在高并发场景,消息队列可以缓存消息,平滑高峰流量,防止系统过载

2. 消息队列有什么常见应用场景

日志处理:将日志发送到消息队列,由日志处理系统进行消费,进行实时分析和监控
电商系统:订单的创建、支付、发货等步骤可以由消息队列进行异步处理和解耦
任务调度:任务调度系统可以将任务发布在消息队列中,由多个不同的节点并行处理
数据同步:消息队列可以将变更的数据同步到不同的存储系统中

3. 常见的消息队列如何选型

常见的消息队列有几种:RabbitMQ、Kafka、RocketMQ

延时来看,RabbitMQ是基于Erlang开发的,延时最低,能达到微秒级别,其它都是毫秒级别

吞吐量来看,Kafka和RocketMQ吞吐量最高,达十万、百万级,RabbitMQ只有万级

定制开发难度来看,RocketMQ是由Java开发的,对于大型公司可以有人手进行定制化开发,而能对RabbitMQ进行定制化开发的较少,但是RabbitMQ社区活跃,能够解决开发上的bug

业务场景来看,RocketMQ基本上经受住了大型企业场景的考验比如双十一,Kafka适用于大数据场景实时分析和日志采集等业务

总结,如果是数据量没有那么大,可以选择功能完备、社区活跃的RabbitMQ,如果有大数据量的金融互联网场景,对可靠性和吞吐量要求都很高,推荐选择RocketMQ,如果是日志采集和大数据实时分析的场景,则推荐使用Kafka

4. 消息队列有什么模型

JMS是一种Java消息服务的API规范,它定义了两种消息模型:队列模型和发布/订阅模型

队列模型又叫点对点模型,指的是生产者往队列里发送消息,一个消息只能被一个消费者消费,一个队列的多个消费者之间是竞争关系

发布/订阅模型又叫主题模型,指生产者往Topic发送消息,所有订阅了该Topic的消费者都能消费到

Kafka和RocketMQ就是基于发布/订阅模型实现的

AMQP是另一种消息服务的协议,它定义了几种消息模型,通过引入exchange完成对消息的路由,支持direct exchange、fanout exchange、topic exchange

RabbitMQ就是基于AMQP协议实现的

5. 如何保证消息不丢失

消息不丢失需要从生产消息、存储消息、消费消息三个方面去保证

5.1 生产消息

在生产者侧,需要处理好生产消息的异常处理,如果写入失败需要有重试、告警等机制

5.2 存储消息

在存储消息上,Broker需要在刷盘之后再给生产者应答

在RocketMQ中,可以配置同步刷盘,主从架构下配置同步复制(也叫同步双写)

在Kafka中,默认acks=1,表示消息被leader副本接受后就会返回成功,设置acks=all,那么在所有的ISR(In-Sync Replicas)接受后才算成功,相关的其它参数有:
replication.factor:分区的副本数,可设置为>=3
min.insync.replicas:消息发送成功需要至少写入的副本数,设置为>1
unclean.leader.election.enable:leader副本故障后的选举机制,false表示不从非ISR节点中选leader,设置为false

5.3 消费消息

消费者在真正完成业务逻辑时再返回成功给Broker

在RocketMQ中,完成消费逻辑后再返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,否则返回ConsumeConcurrentlyStatus.RECONSUME_LATER

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;public class RocketMQConsumer {public static void main(String[] args) throws Exception {// 实例化消费者,并指定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_consumer_group");// 指定NameServer地址consumer.setNamesrvAddr("localhost:9876");// 订阅一个或多个Topic,以及Tag来过滤需要消费的消息consumer.subscribe("TopicTest", "*");// 注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {// 处理消息的业务逻辑System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));} catch (Exception e) {// 处理异常,这里可以根据业务需求进行重试或其他处理e.printStackTrace();// 消费失败,返回RECONSUME_LATER,消息会重新投递return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}// 消费成功,返回CONSUME_SUCCESSreturn ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例consumer.start();System.out.printf("Consumer Started.%n");}
}    

在Kafka中,需要设置enable.auto.commit=false,然后在代码中手动提交offset

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {// 配置消费者属性Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test-group");props.put("enable.auto.commit", "false");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 创建Kafka消费者实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题String topic = "test-topic";consumer.subscribe(Collections.singletonList(topic));try {while (true) {// 拉取消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();for (ConsumerRecord<String, String> record : records) {try {// 处理消息的业务逻辑System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());// 记录需要提交的偏移量TopicPartition partition = new TopicPartition(record.topic(), record.partition());OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1);offsetsToCommit.put(partition, offsetAndMetadata);} catch (Exception e) {// 处理异常,这里可以根据业务需求进行重试或其他处理e.printStackTrace();}}// 手动同步提交偏移量if (!offsetsToCommit.isEmpty()) {consumer.commitSync(offsetsToCommit);}}} finally {// 关闭消费者consumer.close();}}
}    

6. 如何解决重复消费

解决重复消费比较粗鲁的办法就是保证一条消息只能被消费一次,但是如果消费者挂了未提交offset,那么为了保证消息可靠性,消息就会被重复消费

所以重点是要让消费者的处理具有幂等性,即多次处理同一条消息得到的结果是一样的

具体做法有引入全局唯一的ID,对已经消费过的消息进行去重

7. 如何保证消息的有序性

简单粗暴的方法是:单一生产者和单一消费者,即消息只由单个生产者发往单个队列,再由单个消费者消费,存在性能瓶颈

在支持分区键(Partition Key)的系统比如Kafka和RocketMQ中,可以在发送消息的时候指定key

在Kafka中在发送消息时指定key,如果需要严格顺序的话,可以指定max.in.flight.requests.per.connection=1,该参数表示在得到响应前可以发送的消息数,参数值越大吞吐量越大,设置为1会降低吞吐量

在RocketMQ中可以通过继承MessageQueueSelector实现

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;import java.util.List;public class RocketMQPartitionKeyProducer {public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("partition_key_producer_group");producer.setNamesrvAddr("localhost:9876");producer.start();String topic = "partition_key_topic";// 模拟不同的 Partition KeyString[] partitionKeys = {"key1", "key2", "key1", "key2"};for (int i = 0; i < partitionKeys.length; i++) {String partitionKey = partitionKeys[i];Message msg = new Message(topic, "TAG", ("Message " + i).getBytes());try {SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {String key = (String) arg;int index = Math.abs(key.hashCode()) % mqs.size();return mqs.get(index);}}, partitionKey);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();}}producer.shutdown();}
}

8. 推拉模式有什么区别,怎么选择

推模式和拉模式的选择一般出现在Broker和消费者之间,而生产者和Broker之间一般使用推模式而不是Broker去拉取消息,否则大量的生产者还需要去维护消息可靠性

推模式的好处是Broker在接收到消息后能够迅速地推送给消费者进行消费,适用于对实时性要求比较高的场景,坏处是消费者消费能力不够强时容易过载,需要Broker去维护消费者的状态去调整推送速率,对Broker要求比较高

拉模式的好处是对Broker没什么要求,可以稳定地控制消息消费的速率,防止消费者过载,并且适合批量地拉取消息,坏处是消息容易延迟,或者是长时间轮询没有消息可消费

RocketMQ和Kafka都使用了拉模式,它们都利用长轮询对拉模式做了优化,也就是在消费者拉取的时候把请求给hold住,然后等待消息到来再把消息发送出去

9. 如何解决消息堆积

消息堆积问题有两个原因,一个是生产太快,一个是消费太慢

生产太快的问题可以对生产端做限流和降级,以及只保留关键消息,丢弃非关键消息,或者延迟处理

消费太慢的问题需要去定位bug,提升消费者的消费能力,包括优化消费的逻辑,增加消费者的线程数量,对消费者进行垂直扩容(增加单个消费者的CPU/内存),或者水平扩容(增加消费者副本数)

10. Kafka的事务消息是什么,怎么实现

Kafka的事务消息保证一系列消息要么完全发送成功,要么完全发送失败,即一系列消息操作的原子性

实现的方式是,Kafka使用事务协调器负责事务的启动、提交和终止,为确保精确一次(exactly once)的投递语义,会为每个Producer维护一个pid,为<pid, topic, partition>维护一个递增的seq从而保证消息按顺序被正确接收,在消费者侧,需要设置read_committed=true保证只消费已提交的消息

具体实践如下

生产者配置acks=all确认消息成功发送、transaction_id设置事务id、enable.idempotence=true确保投递精确一次

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;public class KafkaTransactionalProducer {public static void main(String[] args) {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");KafkaProducer<String, String> producer = new KafkaProducer<>(props);producer.initTransactions();try {producer.beginTransaction();ProducerRecord<String, String> record = new ProducerRecord<>("test_topic", "key", "value");producer.send(record);producer.commitTransaction();} catch (Exception e) {producer.abortTransaction();e.printStackTrace();} finally {producer.close();}}
}

消费者配置read_committed=true确保只消费已提交的消息,auto.offset.reset=earliest确保启动时从最早的消息开始消费

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaTransactionalConsumer {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("test_topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (var record : records) {System.out.printf("Received message: key = %s, value = %s%n", record.key(), record.value());}}}
}

11. RocketMQ的事务消息是什么

RocketMQ的事务消息和Kafka的不同,Kafka实现的是多条消息的原子性,RocketMQ实现的是分布式事务,也就是消息存储和本地事务处在同一个事务

RocketMQ是利用事务消息+事务回查机制实现分布式事务的

  1. 生产者发送“半消息”到MQ
  2. MQ响应
  3. 生产者执行本地事务
  4. 生产者根据本地事务执行状态向MQ发送commit/rollback,若是commit,半消息变为正式消息,消费者可见,若是rollback,则丢弃该半消息
  5. 如果因为网络抖动等原因,MQ没收到第四步,则通知生产者事务回查
  6. 生产者查询事务状态
  7. 生产者根据回查的事务状态,通知MQ进行commit/rollback

http://www.mrgr.cn/news/97326.html

相关文章:

  • 整车CAN网络和CANoe
  • C# Winform 入门(12)之制作简单的倒计时
  • WEB安全--内网渗透--LMNTLM基础
  • 计算机系统--- BIOS(基本输入输出系统)
  • JCR一区文章,壮丽细尾鹩莺算法Superb Fairy-wren Optimization-附Matlab免费代码
  • iOS APP集成Python解释器
  • 设计模式简述(十三)适配器模式
  • 高频面试题(含笔试高频算法整理)基本总结回顾65
  • Spring 中的 @Autowired 和 @Resource
  • 美国mlb与韩国mlb的关系·棒球9号位
  • 计算机系统---UEFI(统一可扩展固件接口)
  • 开源软件与自由软件:一场理念与实践的交锋
  • Spring 中有哪些设计模式?
  • QT6(9)2.4:用 cmake 构建项目:整体介绍与 cmake 语法,cmake 不支持中文,依据QT帮助为 cmake文件添加模块,ui_dialog.h 头文件的位置有变化,更改与完善代码
  • C# Winform 入门(13)之通过WebServer查询天气预报
  • 定时器的实现方案:红黑树、最小堆与时间轮
  • 自动化备份全网服务器数据平台
  • go简化版面试题
  • 蓝桥杯高频考点——经典01背包问题详解(附例题)
  • Java 常用数据结构详解