当前位置: 首页 > news >正文

SpringBoot集成kafka-消费者批量消费消息

SpringBoot集成kafka-消费者批量消费消息

  • 1、消费者
  • 2、生产者
  • 3、application.yml配置文件
  • 4、实体类
  • 5、生产者发送消息测试类
  • 6、测试
    • 6.1、测试启动生产者
    • 6.2、测试启动消费者

在这里插入图片描述

1、消费者

设置批量接收消息

package com.power.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.util.List;@Component
public class EventConsumer {@KafkaListener(topics = {"batchTopic"},groupId="batchGroup")public void onEvent(List<ConsumerRecord<String,String>> records){System.out.println("批量消费:records.size() = "+records.size()+", records = "+records);}}

2、生产者

package com.power.producer;import com.power.model.User;
import com.power.util.JSONUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Date;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,Object> kafkaTemplate;public void sendEvent(){for (int i = 0; i < 125; i++) {User user = User.builder().id(i).phone("1567676767"+i).birthday(new Date()).build();String userJson = JSONUtils.toJSON(user);kafkaTemplate.send("batchTopic","k"+i, userJson);}}}

3、application.yml配置文件

spring:application:#应用名称name: spring-boot-03-kafka-base#kafka连接地址(ip+port)kafka:bootstrap-servers: <你的kafka服务器IP>:9092#配置消息监听器listener:#设置批量消费消息,默认是单个消息消费(single)type: batch#设置批量消费每次最多取多少条数据consumer:max-poll-records: 20#从第一条消息开始接收auto-offset-reset: earliest

4、实体类

package com.power.model;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.util.Date;@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {private Integer id;private String phone;private Date birthday;}

5、生产者发送消息测试类

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot03KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid sendEvent3() {eventProducer.sendEvent();}}

6、测试

6.1、测试启动生产者

在这里插入图片描述

6.2、测试启动消费者

每次接收20条消息

在这里插入图片描述


http://www.mrgr.cn/news/7686.html

相关文章:

  • svn使用教程学习
  • 内网横向移动常用方法
  • 粘包和抓包工具
  • FFmpeg源码:read_packet_wrapper、fill_buffer函数分析
  • 闲鱼IP属地地址:去外地会自动变化吗?解析实时更新机制
  • 河南萌新联赛2024第(六)场:郑州大学
  • 鸿蒙Harmony实战开发知识:“UIAbility组件的3种启动模式”
  • 博弈论总结
  • C# --- 异常处理(Exception Handling)
  • leetcode135:分发糖果
  • Niushop商城第三方插件cps联盟_同城配送_上门预约上手教程配置方法适合单商户和多商户以及V6哈
  • 【安全靶场】-DC-8
  • 学习前端面试知识(13)
  • 探索Ruby的自然语言处理宝库:文本魔法的艺术
  • 稚晖君智元机器人远程机器人系列发布:引领具身智能新高度
  • 数据仓库中的表设计模式:全量表、增量表与拉链表
  • 自编码器(Autoencoder, AE):深入理解与应用
  • BEV学习---LSS-1:论文原理及代码串讲
  • 深入探讨视频美颜SDK:直播美颜工具的核心技术与实现
  • 网络设备net_device数据结构之ifindex