flink读写案例合集
文章目录
- 前言
- 一、flink 写kafka
- 1.第一种使用FlinkKafkaProducer API
- 2.第二种使用自定义序列化器
- 3.第三种使用FlinkKafkaProducer011 API
- 4.使用Kafka的Avro序列化 (没有使用过,感觉比较复杂)
- 5.第五种使用 (强烈推荐使用)
- 二、Flink读kafka
- 三、Flink写其他外部系统
前言
提示:这里主要总结在工作中使用到的和遇到到的问题:Java flink版本1.15+
一、flink 写kafka
1.第一种使用FlinkKafkaProducer API
// 假设有一个DataStream<String> named text DataStream<String> text = env.fromElements("Hello", "World", "Flink", "Kafka"); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>( "my-topic", // 目标Kafka topic new SimpleStringSchema(), // 序列化schema props, // 生产者配置 FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // 语义保证 // 添加sink text.addSink(myProducer);
此FlinkKafkaConsumer或FlinkKafkaProducer API 在flink1.15版本后,已经被弃用。推出了新的消费kafka 的 API KafkaSource和KafkaSink。

2.第二种使用自定义序列化器
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; public class CustomKafkaSerializationSchema implements KafkaSerializationSchema<JSONObject> {private static final long serialVersionUID = 8497940668660042203L;private String topic;public CustomKafkaSerializationSchema(final String topic) {this.topic = topic;}@Overridepublic ProducerRecord<byte[], byte[]> serialize(final JSONObject element, final Long timestamp) {return new ProducerRecord<byte[], byte[]>(topic, element.toJSONString().getBytes());}
}
在低版本的flink-connector-kafka中,不支持KafkaSerializationSchema
3.第三种使用FlinkKafkaProducer011 API
// 假设有一个DataStream<String> DataStream<String> text = env.fromElements("Hello", "World", "Flink", "Kafka");// Kafka 生产者配置 Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 使用 FlinkKafkaProducer011 写入 Kafka FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>("my-topic", // 目标 Kafka topic new SimpleStringSchema(), // 序列化 schema props, // 生产者配置 FlinkKafkaProducer011.Semantic.EXACTLY_ONCE); // 语义保证 // 添加 sink text.addSink(myProducer);
使用FlinkKafkaProducer011有问题,由于flink1.15+版本FlinkKafkaProducer
