当前位置: 首页 > news >正文

kafka快速上手

一、kafka介绍

Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。

kafka官网:http://kafka.apach e.org/

 

二、kafka入门

  • 生产者发送消息,多个消费者只能有一个消费者接收到消息
  • 生产者发送消息,多个消费者都可以接收到消息

步骤如下: 

(1)创建kafka-demo项目,导入依赖 

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId></dependency>

(2)生产者发送消息

public class ProducerQuickStart {public static void main(String[] args) throws ExecutionException, InterruptedException {//1.kafka链接配置信息Properties prop = new Properties();//kafka链接地址prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//key和value的序列化prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//2.创建kafka生产者对象KafkaProducer<String,String> producer = new KafkaProducer<String,String>(prop);//3.发送消息/*** 第一个参数 :topic* 第二个参数:消息的key* 第三个参数:消息的value*/ProducerRecord<String,String> kvProducerRecord = new ProducerRecord<String,String>("topic-first","key-001","hello kafka");//同步发送消息producer.send(kvProducerRecord);//4.关闭消息通道  必须要关闭,否则消息发送不成功producer.close();}
}

  (3)消费者接收消息

public class ConsumerQuickStart {public static void main(String[] args) {//1.kafka的配置信息Properties prop = new Properties();//链接地址prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//key和value的反序列化器prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//设置消费者组prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");//2.创建消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);//3.订阅主题consumer.subscribe(Collections.singletonList("topic-first"));//4.拉取消息while (true) {// 读取数据,读取超时时间为100ms ,即每个1000ms拉取一次ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());}}}
}

使用情景:

生产者发送消息,多个消费者订阅同一个主题(多个消费者都是一个组)只能有一个消费者收到消息 (一对一)

生产者发送消息,多个消费者订阅同一个主题(多个消费者不是一个组)所有消费者都能收到消息 (一对多) 

三、springboot集成kafka

1.导入spring-kafka依赖信息

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafkfa --><!-- Spring Kafka的依赖,排除了kafka-clients --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><!-- 显式声明kafka-clients的依赖 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency>
</dependencies>

2.在resources下创建文件application.yml

server:port: 9991
spring:application:name: kafka-demokafka:bootstrap-servers: localhost:9092producer:retries: 10 #重试的次数key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3.消息生产者

@RestController
public class HelloController {@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;@GetMapping("/hello")public String hello(){kafkaTemplate.send("test-topic","HelloWorld");
//        User user=new User();
//        user.setUsername("lili");
//        user.setAge(18);
//        kafkaTemplate.send("test-topic", JSON.toJSONString(user));return "ok";}
}

4.消息消费者

@Component
public class HelloListener {@KafkaListener(topics = "test-topic")public void onMessage(String message){if (!StringUtils.isEmpty(message)){System.out.println(message);
//            User user = JSON.parseObject(message, User.class);
//            System.out.println(user);}}
}

传递消息为对象:

目前springboot整合后的kafka,因为序列化器是StringSerializer,可以把要传递的对象进行转json字符串,接收消息后再转为对象即可


http://www.mrgr.cn/news/18064.html

相关文章:

  • NFTScan | 08.26~09.02 NFT 市场热点汇总
  • 第十三节:学习Springboot整合mybatis——完整篇(自学Spring boot 3.x的第三天)
  • 人工智能在现代科技中的应用和未来发展趋势。
  • 如何找回误删的文件?4个常用文件恢复方法!
  • P2147 [SDOI2008] 洞穴勘测(LCT)
  • Harbor部署docker私人仓库
  • springboot篇
  • 【原创】edge-tts与基于mpv的edge-playback,使命令行和Python的Text To Speech唾手可得
  • stm32f103VET6和stm32f103C8T6有什么区别?
  • 半导体是什么?
  • C# 7个方法比较两个对象是否相等
  • 【Spring Boot-IDEA创建spring boot项目方法】
  • 学懂C++(五十):深入详解 C++ 陷阱:对象切片(Object Slicing)
  • 用 BigQuery ML 和 Google Sheets 数据预测电商网站访客趋势
  • 8个考完PMP后的发展方向,建议收藏
  • python脚本处理---(不同文件夹中的文件对比、移动,提取指定类型文件、中文文件名转英文)
  • 无代码搭建网站zion
  • QSlider禁止点击 和精准点击跳转
  • FFmpeg源码:avpriv_set_pts_info函数分析
  • 快速了解开源RAG-UI工具“kotaemon”