当前位置: 首页 > news >正文

SpringBoot集成kafka-获取生产者发送的消息(阻塞式和非阻塞式获取)

在这里插入图片描述
在这里插入图片描述在这里插入图片描述

说明

CompletableFuture对象需要的SpringBoot版本为3.X.X以上,需要的kafka依赖版本为3.X.X以上,需要的jdk版本17以上。

1、阻塞式(等待式)获取生产者发送的消息

生产者:

package com.power.producer;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void getResult(){//Integer partition, Long timestamp, K key, @Nullable V dataCompletableFuture<SendResult<String, String>> result =kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "k3", "hello-kafka");//怎么拿结果,通过ListenableFuture类拿结果try {//1、阻塞式等待拿结果SendResult<String, String> sendResult = result.get();if(null!=sendResult.getRecordMetadata()){//kafka服务器确认已经拿到了消息System.out.println("消息发送成功:"+sendResult.getRecordMetadata().toString());}System.out.println("producerRecord:"+sendResult.getProducerRecord());} catch (Exception e) {e.printStackTrace();}}
}

测试类:

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid getResult(){eventProducer.getResult();}
}

测试结果:
在这里插入图片描述

消息发送成功:default-topic-0@1
2024-08-22 22:18:51.344  INFO 8976 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-hello-group-1, groupId=hello-group] Adding newly assigned partitions: hello-topic-0
producerRecord:ProducerRecord(topic=default-topic, partition=0, headers=RecordHeaders(headers = [], isReadOnly = true), key=k3, value=hello-kafka, timestamp=1724336330821)

2、非阻塞式(非等待式)获取生产者发送的消息

生产者:

package com.power.producer;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void getResult2(){//Integer partition, Long timestamp, K key, @Nullable V dataCompletableFuture<SendResult<String, String>> result =kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "k3", "hello-kafka");//怎么拿结果,通过CompletableFuture类拿结果try {//2、非阻塞式等待拿结果result.thenAccept((sendResult)->{if(null!=sendResult.getRecordMetadata()){//kafka服务器确认已经拿到了消息System.out.println("消息发送成功:"+sendResult.getRecordMetadata().toString());}System.out.println("producerRecord:"+sendResult.getProducerRecord());}).exceptionally((e)->{e.printStackTrace();//做消息发送失败的处理System.out.println("消息发送失败");return null;});} catch (Exception e) {e.printStackTrace();}}}

测试类:

@Test
void getResult2(){eventProducer.getResult2();
}

http://www.mrgr.cn/news/6217.html

相关文章:

  • Spring 中的AnnotationConfigWebApplicationContext
  • SQLite 创建表
  • 微服务基础知识
  • 微信小程序没有历史记录的跳转
  • 在Mac电脑安装Homebrew并且配置环境变量
  • LangGPT结构化提示词编写实践
  • 【前端面试】浏览器原理解读
  • Dinky教程--Flink CDC pipline整库同步Doris
  • Clickhouse中bitmap使用指南(ck位图)
  • 【QT从入门helloworld到进阶QT(C++)部署YOLOV5】
  • redis 主从复制方案
  • Python酷库之旅-第三方库Pandas(094)
  • Ollama - Llama3 docker版本安装部署使用
  • 物联网(IoT)设备渗透文章二:智能家居中控系统的渗透与利用
  • clickhouse中使用ReplicatedMergeTree表引擎数据去重问题
  • POSIX线程库
  • 详解MBR分区结构以及GPT分区结构
  • 【Vue】生命周期函数
  • 推荐系统三十六式学习笔记:产品篇36 | 组建推荐团队及工程师的学习路径
  • flutter事件与消息通知