当前位置: 首页 > news >正文

Kafka 实战演练:创建、配置与测试 Kafka全面教程

文章目录

  • 1.配置文件
  • 2.消费者
    • 1.注解方式
    • 2.KafkaConsumer
  • 3.依赖
    • 1.注解依赖
    • 2.KafkaConsumer依赖

本文档只是为了留档方便以后工作运维,或者给同事分享文档内容比较简陋命令也不是特别全,不适合小白观看,如有不懂可以私信,上班期间都是在得

1.配置文件

  1. Yml配置
spring:kafka:bootstrap-servers: consumer:group-id: iot-testaaaaaaaaaa11aaaaaaaaaauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:security:protocol: SASL_SSLsasl:mechanism: PLAINjaas:config: org.apache.kafka.common.security.plain.PlainLoginModule required username="" password="";ssl:truststore:type: JKSlocation: src/main/resources/client.truststore.jkspassword: endpoint.identification.algorithm:

yaml配置

2.消费者

1.注解方式

    @KafkaListener(topics = {"abcd"})public void listen(ConsumerRecord<?, ?> record){Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {Object message = kafkaMessage.get();System.out.println("---->"+record);System.out.println("---->"+message);}}

注解方式

2.KafkaConsumer

/*** @author XHao*/
public class MqsConsumer {public static final String CONFIG_CONSUMER_FILE_NAME = "mqs.sdk.consumer.properties";private KafkaConsumer<Object, Object> consumer;MqsConsumer(String path) {Properties props = new Properties();try {InputStream in = new BufferedInputStream(new FileInputStream(path));props.load(in);} catch (IOException e) {e.printStackTrace();return;}consumer = new KafkaConsumer<Object, Object>(props);}public MqsConsumer() {Properties props = new Properties();try {props = loadFromClasspath(CONFIG_CONSUMER_FILE_NAME);} catch (IOException e) {e.printStackTrace();return;}consumer = new KafkaConsumer<Object, Object>(props);}public void consume(List topics) {consumer.subscribe(topics);}public ConsumerRecords<Object, Object> poll(long timeout) {return consumer.poll(timeout);}public void close() {consumer.close();}/*** get classloader from thread context if no classloader found in thread* context return the classloader which has loaded this class** @return classloader*/public static ClassLoader getCurrentClassLoader() {ClassLoader classLoader = Thread.currentThread().getContextClassLoader();if (classLoader == null) {classLoader = MqsConsumer.class.getClassLoader();}return classLoader;}/*** 从classpath 加载配置信息** @param configFileName 配置文件名称* @return 配置信息* @throws IOException*/public static Properties loadFromClasspath(String configFileName) throws IOException {ClassLoader classLoader = getCurrentClassLoader();Properties config = new Properties();List<URL> properties = new ArrayList<URL>();Enumeration<URL> propertyResources = classLoader.getResources(configFileName);while (propertyResources.hasMoreElements()) {properties.add(propertyResources.nextElement());}for (URL url : properties) {InputStream is = null;try {is = url.openStream();config.load(is);} finally {if (is != null) {is.close();is = null;}}}return config;}
}

在这里插入图片描述

3.依赖

1.注解依赖

      <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

2.KafkaConsumer依赖

        <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.1.0</version></dependency>

可视化大屏项目经常用到消息转换,实时状态等 记录一下吧

如果点赞多,评论多会更新详细教程,待补充。


http://www.mrgr.cn/news/25956.html

相关文章:

  • 最好用的 Redis 可视化工具,不愧是官方出品,功能确实强大(带私活源码)
  • element select + tree
  • 跨境电商,一人搞定?自从有了这个AI工具,赚遍全球市场
  • Linux云计算 |【第三阶段】PROJECT1-DAY2
  • “xi” 和 “dbscan” 在OPTICS聚类中是什么意思
  • 提升LLM能力表现的四种AI代理策略
  • JavaScript控制语句和函数的使用
  • 2024年道路运输安全员考试题库及答案
  • SRT3D: A Sparse Region-Based 3D Object Tracking Approach for the Real World
  • C++速通LeetCode第6题-环形链表
  • 无人机建模详解!!!
  • 【机器学习】结构学习的基本概念以及基于约束的结构学习和基于评分的结构学习
  • EE trade:沙金是黄金吗
  • [网络]http/https的简单认识
  • 外贸获客:主动开发有用吗?
  • 零基础5分钟上手亚马逊云科技-为网站服务器配置DNS域名
  • 如何为Google RSA安排广告定制器 [2024]
  • 31. 如何在MyBatis中使用自定义拦截器?有哪些常见应用场景?
  • 显示屏芯片ST7920测试
  • 7-6 列出连通集