当前位置: 首页 > news >正文

2. springboot集成kafka入门使用教程

 项目demo地址 : https://mp.weixin.qq.com/s?__biz=MzkzODQyNzE3

1. 项目结构

 ─src├─main│  ├─java│  │  └─org│  │      └─example│  │          │  KafkaApplication.java│  │          ││  │          └─demo│  │                  KafkaConsumerListener.java (监听消息类)│  ││  └─resources│          application.yml│└─test└─java└─org└─example└─demoKafkaProducerTest.java (发送消息测试类)

2. kafka依赖

<!--kafka-->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>${kafka.version}</version>
</dependency>

3. 消息的发送

3.1 同步发送

同步发送是指发送消息后等待Kafka的响应,确认消息已成功发送。这个方式的优点在于可靠性高,但缺点是会阻塞当前线程,影响系统的响应速度。

    /*** 同步发送*/@Testpublic void synchronizeSend() {Map<String, Object> map = new HashMap<>();map.put("say", "你好, kafka........");ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC, JSONObject.toJSONString(map));try {SendResult<String, String> result = future.get();log.info("获取同步消息结果:{}", result.getRecordMetadata().topic());} catch (Exception e) {throw new RuntimeException(e);}}
3.2 异步发送

异步发送是指发送消息后立即返回,不等待Kafka的响应,而是通过回调函数处理发送结果。这种方式不会阻塞线程,更适合高并发的场景。

@Test
public void asynchronousSend() {Map<String, Object> map = new HashMap<>();map.put("say", "你好, kafka........");kafkaTemplate.send(TOPIC, JSONObject.toJSONString(map)).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable ex) {log.error("发送失败:{}", ex.getMessage());}@Overridepublic void onSuccess(SendResult<String, String> result) {log.info("异步消息结果:{}", result.getRecordMetadata().topic());}});
}
3.3 kafka事务

Kafka事务确保一组消息要么全部成功,要么全部失败,用于实现消息的原子性。适用于需要保证一致性的场景,例如订单处理。

@Test
public void transaction() {kafkaTemplate.executeInTransaction(t -> {t.send(TOPIC, "kafka事务消息...");if (true) {throw new RuntimeException("发生异常kafka回滚事务");}t.send(TOPIC, "你好, kafka........");return true;});
}

注意 :

如果想使用kafka事务需要在配置文件中开启事务

# 开启事务
transaction-id-prefix: tx_

开启事务后, 同步/异步发送消息都需要加@Transactional注解

4. 消息的接收

Kafka消息的接收是通过监听器实现的。监听器会自动接收指定主题的消息,并处理接收到的消息。以下是一个简单的示例:

/*** 监听kafka数据*/
@KafkaListener(topics = {"test-topic"})
public void consumer(ConsumerRecord<?, ?> consumerRecord) {log.info("监听kafka消息>>>>>>>>>>>>>>>>>主题topic={}, 分区offset={}, 信息message={}", consumerRecord.topic(), consumerRecord.offset(), consumerRecord.value());// 收到监听数据后面可以进行入库等业务操作
}

原文地址(阅读体验更佳) : 2. springboot集成kafka入门使用教程 (yuque.com)


http://www.mrgr.cn/news/5669.html

相关文章:

  • 已解决:java.lang.ClassNotFoundException: com.mongodb.test.test 异常的正确解决方法,亲测有效!!!
  • 进阶岛 - MindSearch(CPU版)部署到github codespace
  • 软件测试-测试分类
  • 独立ip为何高级又安全?
  • Scratch与AI:开启少儿编程的智能之旅
  • 编译原理(极速版)
  • 【简历】25届青岛某一本JAVA简历:中厂不要强调算法,面试官听不懂
  • Nginx(项目管理和LINUX)
  • 新手起步:探索AWS新账户的服务器部署能力与限制
  • CSP-J-链表
  • 创客匠人老蒋在「IP私域发售六脉神剑落地班」现场金句频出
  • C语言与Python的区别
  • OW-VISCap——开放世界视频实例分割方法研究
  • ChatGPT快速完成论文大纲创作【附完整示例】
  • springboot调用sap接口传输数据,RFC协议接口调用,包含linux,windows部署
  • 湖北省各市各地两化融合贯标、3A级认定申报奖补补助、申报条件材料、流程指南
  • css设置input单选radio多选checkbox样式
  • 数据结构(6.2_4)——图的基本操作
  • 一文彻底搞懂Transformer - Training(模型训练)
  • 【分布式】简述CAP理论