当前位置: 首页 > news >正文

flink读写案例合集

文章目录

  • 前言
  • 一、flink 写kafka
    • 1.第一种使用FlinkKafkaProducer API
    • 2.第二种使用自定义序列化器
    • 3.第三种使用FlinkKafkaProducer011 API
    • 4.使用Kafka的Avro序列化 (没有使用过,感觉比较复杂)
    • 5.第五种使用 (强烈推荐使用)
  • 二、Flink读kafka
  • 三、Flink写其他外部系统


前言

提示:这里主要总结在工作中使用到的和遇到到的问题:Java flink版本1.15+

一、flink 写kafka

1.第一种使用FlinkKafkaProducer API

// 假设有一个DataStream<String> named text  DataStream<String> text = env.fromElements("Hello", "World", "Flink", "Kafka");  Properties props = new Properties();  props.put("bootstrap.servers", "localhost:9092");  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");  FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(  "my-topic",                // 目标Kafka topic  new SimpleStringSchema(),  // 序列化schema  props,                     // 生产者配置  FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // 语义保证  // 添加sink  text.addSink(myProducer);

此FlinkKafkaConsumer或FlinkKafkaProducer API 在flink1.15版本后,已经被弃用。推出了新的消费kafka 的 API KafkaSource和KafkaSink。
在这里插入图片描述

2.第二种使用自定义序列化器

import org.apache.flink.api.common.serialization.SerializationSchema;  
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;  public class CustomKafkaSerializationSchema implements KafkaSerializationSchema<JSONObject> {private static final long serialVersionUID = 8497940668660042203L;private String topic;public CustomKafkaSerializationSchema(final String topic) {this.topic = topic;}@Overridepublic ProducerRecord<byte[], byte[]> serialize(final JSONObject element, final Long timestamp) {return new ProducerRecord<byte[], byte[]>(topic, element.toJSONString().getBytes());}
}

在低版本的flink-connector-kafka中,不支持KafkaSerializationSchema

3.第三种使用FlinkKafkaProducer011 API

       // 假设有一个DataStream<String>  DataStream<String> text = env.fromElements("Hello", "World", "Flink", "Kafka");// Kafka 生产者配置  Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 使用 FlinkKafkaProducer011 写入 Kafka  FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>("my-topic",          // 目标 Kafka topic  new SimpleStringSchema(),  // 序列化 schema  props,                     // 生产者配置  FlinkKafkaProducer011.Semantic.EXACTLY_ONCE); // 语义保证  // 添加 sink  text.addSink(myProducer);

使用FlinkKafkaProducer011有问题,由于flink1.15+版本FlinkKafkaProducer


http://www.mrgr.cn/news/9112.html

相关文章:

  • 【KivyMD 应用程序 1.1.1】MDBottomNavigation TabbedPanelBas选项卡底座
  • 【C++ Primer Plus习题】5.8
  • huggingface下载model
  • Go语言操作文件上传和下载应用教程
  • 微服务基础与Spring Cloud框架
  • LlamaIndex 实现 RAG(三)- 向量数据
  • PostgreSQL16.1(Linux版本离线下载)
  • OpenAI 重回巅峰:ChatGPT-4O 最新模型超越谷歌 Gemini 1.5,多项测试夺冠!
  • RongCallKit iOS 端本地私有 pod 方案
  • docker手动部署django项目Dockerfile编排-后端发布
  • HTML静态网页成品作业(HTML+CSS)——家乡漳州介绍设计制作(1个页面)
  • 如何知道当前网卡连接的下位机的IP,通过工具实现
  • python异步编程-channels使用,创建websocket服务
  • 尚品汇-选中状态缓存变更、删除缓存购物车(三十八)
  • 案例分享—医疗行业国外优秀界面设计案例
  • 趣味算法------煤球数目
  • 关闭Chrome快捷键
  • 回归分析系列11—时间序列数据中的回归
  • 海思SD3403/SS928V100开发(16)Tsensor驱动开发
  • 两种Python进行cpu并行运算的方式