当前位置: 首页 > news >正文

SpringBoot集成kafka接收对象消息

SpringBoot集成kafka接收对象消息

  • 1、生产者
  • 2、消费者
  • 3、工具类
  • 4、消息实体对象
  • 5、配置文件
  • 6、启动类
  • 7、测试类
  • 8、测试结果

在这里插入图片描述

1、生产者

package com.power.producer;import com.power.model.User;
import com.power.util.JSONUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Date;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,Object> kafkaTemplate;public void sendEvent2(){User user = User.builder().id(10001).phone("15676767676").birthday(new Date()).build();String userJson = JSONUtils.toJSON(user);kafkaTemplate.send("helloTopic",userJson);}}

2、消费者

package com.power.consumer;import com.power.model.User;
import com.power.util.JSONUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.util.function.Consumer;@Component
public class EventConsumer {//采用监听的方式接收事件(消息,数据)@KafkaListener(topics = {"helloTopic"},groupId="helloGroup")public void onEvent(String userJson,@Header(value=KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(value=KafkaHeaders.RECEIVED_PARTITION_ID) String partition,ConsumerRecord<String,String> record){User user =JSONUtils.toBean(userJson,User.class);System.out.println("读取/消费到的事件,user:"+user+",topic:"+topic+",partition:"+partition);System.out.println("读取/消费到的事件:"+record.toString());}}

3、工具类

package com.power.util;import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;public class JSONUtils {private static final ObjectMapper OBJECTMAPPER = new ObjectMapper();public static String toJSON(Object object){try {return OBJECTMAPPER.writeValueAsString(object);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}public static <T> T toBean(String json,Class<T> clazz){try {return OBJECTMAPPER.readValue(json,clazz);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}
}

4、消息实体对象

package com.power.model;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.util.Date;@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {private Integer id;private String phone;private Date birthday;}

5、配置文件

spring:application:#应用名称name: spring-boot-02-kafka-base#kafka连接地址(ip+port)kafka:bootstrap-servers: <你的kafka服务器IP>:9092

6、启动类

package com.power;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class KafkaApplication {public static void main(String[] args) {SpringApplication.run(KafkaApplication.class, args);System.out.println("启动成功--------------------------");}
}

7、测试类

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot02KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid sendEvent2(){eventProducer.sendEvent2();}}

8、测试结果

先启动消费者
在启动生产者测试类
已接收到消息对象数据:

在这里插入图片描述


http://www.mrgr.cn/news/14487.html

相关文章:

  • 全程云OA UploadEditorFile接口存在任意文件上传漏洞 附POC
  • 【蓝桥杯青少组】第十五届省赛python(2024)
  • 排序题目:颜色分类
  • Android Webview 详解
  • 如何利用命令模式实现一个手游后端架构|命令模式|手游后端|架构设计
  • 最新国内Docker 安装
  • Linux系统ubuntu20.04 无人机PX4 开发环境搭建(失败率很低)
  • nginx访问控制,用户认证,https
  • 苹果9月10将招开发布会:iPhone 16搭配AI将颠覆你的数码生活
  • OpenAI remove key access while using AAD authentication
  • 并行程序设计基础——MPI不连续数据发送(2)
  • 软件设计模式 - 汇总
  • KESSIL A360维修
  • 计算机基础知识复习8.29
  • 设计模式相关
  • Spring八股文
  • 手机三网状态实时查询分享
  • git 如何在切换分支时不丢弃本地修改
  • Android活动(activity)与服务(service)进行通信
  • 计算机毕业设计选题推荐-游戏比赛网上售票系统-Java/Python项目实战