当前位置: 首页 > news >正文

SpringBoot集成kafka接收消息

SpringBoot集成kafka接收消息

  • 1、SpringBoot集成kafka接收消息
  • 2、@Payload注解接收消息体内容
  • 3、@Header注解接收消息头内容
  • 4、接收消息所有内容

在这里插入图片描述

1、SpringBoot集成kafka接收消息

生产者

package com.power.producer;import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendEvent(){kafkaTemplate.send("helloTopic","hello kafka");}}

application.yml配置文件

spring:application:#应用名称name: spring-boot-02-kafka-base#kafka连接地址(ip+port)kafka:bootstrap-servers: 47.116.35.15:9092#配置生产者(24个配置)
#    producer:#配置消费者(24个配置)
#    consumer:

测试类

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot02KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid sendInterceptor(){eventProducer.sendEvent();}}

2、@Payload注解接收消息体内容

消费者:

package com.power.consumer;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;@Component
public class EventConsumer {//采用监听的方式接收事件(消息,数据)@KafkaListener(topics = {"helloTopic"},groupId="helloGroup")public void onEvent(@Payload String event){System.out.println("读取/消费到的事件:"+event);}
}

测试结果:
在这里插入图片描述

3、@Header注解接收消息头内容

注意,不太版本kafak使用@Header注解读取partition时不一样:

  • kafka3.0以下版本使用KafkaHeaders.RECEIVED_PARTITION_ID获取分区
  • kafka3.0以上版本使用KafkaHeaders.RECEIVED_PARTITION获取分区

消费者:

package com.power.consumer;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;@Component
public class EventConsumer {//采用监听的方式接收事件(消息,数据)@KafkaListener(topics = {"helloTopic"},groupId="helloGroup")public void onEvent(@Payload String event,@Header(value=KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(value=KafkaHeaders.RECEIVED_PARTITION_ID) String partition){System.out.println("读取/消费到的事件:"+event+",topic:"+topic+",partition:"+partition);}
}

测试结果:
在这里插入图片描述

4、接收消息所有内容

在这里插入图片描述
在这里插入图片描述

消费者:

package com.power.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.util.function.Consumer;@Component
public class EventConsumer {//采用监听的方式接收事件(消息,数据)@KafkaListener(topics = {"helloTopic"},groupId="helloGroup")public void onEvent(@Payload String event,@Header(value=KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(value=KafkaHeaders.RECEIVED_PARTITION_ID) String partition,ConsumerRecord<String,String> record){System.out.println("读取/消费到的事件:"+event+",topic:"+topic+",partition:"+partition);System.out.println("读取/消费到的事件:"+record.toString());}
}

测试打印所有消息:
在这里插入图片描述


http://www.mrgr.cn/news/8417.html

相关文章:

  • 在网易云音乐服务器故障事件中提升应急处理能力的探讨
  • 分发糖果
  • jom.exe 是一个并行构建工具,专门为使用 Microsoft Visual C++ 编译器的 Qt 项目加速编译过程
  • 【数据结构篇】~二叉树(堆)
  • 使用Python恢复Windows、Linux、MacOS回收站中的文件和目录
  • Windows与Linux的和谐共处时代!
  • 目标 CDC实例数据库更改密码,预定启动报错SQL 错误代码为“-30082”。SQL 状态为:08001。
  • docker配置国内镜像加速
  • Vue项目中根据电脑的分辨率使用了Zoom缩放导致日期控件和下拉框控件位置偏移;
  • 【C语言】内存函数
  • 如何使用ssm实现基于HTML5的出租车管理系统
  • NLP位置编码
  • 基于Springboot2 + vue3酒店客房预订管理系统
  • rust 日志记录与跟踪
  • 游戏开发设计模式之责任链模式
  • Gameplay Ability System(通过表配置不同等级的伤害)
  • Ruby跨平台移动应用开发的新篇章
  • 国货之光|暴雨机推出面向大模型训练的AI服务器
  • 【Node】【2】创建node应用
  • 汇编语句中的 jmp 与 call 指令