当前位置: 首页 > news >正文

大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例

Flink SQL

Flink SQL 是 Apache Flink 提供的一种高层次的查询语言接口,它基于 SQL 标准,为开发者提供了处理流式数据和批处理数据的能力。Flink SQL 允许用户使用标准 SQL 查询语言在数据流和数据表上执行复杂的操作,适用于多种应用场景,如实时分析、数据流处理、机器学习等。下面是 Flink SQL 的一些重要概念和功能:

流与批统一的查询模式

Flink SQL 的一大特点是流处理和批处理的统一性。通过同一套 SQL 语法,用户可以同时处理静态数据(批处理)和动态数据(流处理)。这使得应用程序的开发更加简化,因为可以用相同的逻辑编写实时流数据处理和历史数据的查询。

动态表 (Dynamic Tables)

Flink SQL 通过动态表的概念将流数据建模为不断变化的表。这种动态表随着时间推移不断更新,数据的每个变化(插入、更新、删除)都会影响表的状态。通过动态表的概念,Flink 可以使用 SQL 查询连续的流数据,并在查询执行时获得不断更新的结果。

窗口操作 (Windowing)

在流式数据处理场景中,窗口操作非常重要。Flink SQL 提供了多种类型的窗口操作,包括:

  • 滚动窗口 (Tumbling Window):将数据按照固定长度分割成不重叠的窗口。
  • 滑动窗口 (Sliding Window):窗口之间存在重叠,数据可能被分配到多个窗口。
  • 会话窗口 (Session Window):窗口由活动间隔定义,不同的事件可能会聚合在一个窗口中。

连接操作 (Joins)

Flink SQL 支持多种连接操作:

  • 流与流的连接:允许用户将多个流结合在一起,基于时间或键进行匹配。
  • 流与表的连接:将静态表与流数据进行匹配,从而使流式数据处理能够结合历史数据或参考数据。
  • 时态表连接 (Temporal Table Join):用于将流数据与一个时态表进行连接,时态表会随着时间不断更新。

内置函数和自定义函数

Flink SQL 提供了丰富的内置函数,涵盖了字符串操作、数学运算、时间日期处理、聚合操作等。此外,Flink SQL 还支持用户自定义函数(UDF、UDTF、UDAF),用户可以根据具体需求扩展 SQL 的功能。

Table API 与 SQL API 的互操作性

Flink 提供了两种高级数据处理 API:

  • Table API:一种与关系代数类似的编程接口,支持链式调用,功能类似于 SQL。
  • SQL API:用户可以直接使用标准 SQL 语句进行数据处理。

Table API 和 SQL API 具有很高的互操作性,用户可以在同一个程序中混合使用这两者。例如,可以先用 Table API 进行表定义和部分操作,再通过 SQL 语句执行复杂的查询。

支持多种数据源和数据接收器

Flink SQL 支持连接多种数据源和数据接收器,如 Kafka、文件系统、数据库(如 MySQL、PostgreSQL)、Hive、HBase 等。通过 SQL 语法,用户可以轻松地将流数据写入这些外部系统,也可以从这些系统中读取数据进行处理。

状态管理与容错机制

Flink SQL 继承了 Flink 强大的状态管理和容错机制。在流处理任务中,Flink SQL 能够有效地处理有状态的计算,并保证在失败时自动恢复。基于 Flink 的检查点(Checkpointing)和保存点(Savepoint)机制,Flink SQL 提供了 Exactly-Once 的状态一致性保障。

实时分析与 ETL

Flink SQL 可以用于实时数据的分析与处理,常用于构建实时 ETL (Extract, Transform, Load) 流程。例如,用户可以通过 SQL 查询对从 Kafka、数据库等数据源接收到的流数据进行清洗、过滤、转换,并将结果写入到其他系统中(如 Elasticsearch、HDFS、JDBC)。

HelloWorld

添加依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table</artifactId><type>pom</type><version>${flink.version}</version>
</dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version><scope>provided</scope>
</dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version><scope>provided</scope>
</dependency>

依赖说明:

  • flink-table-api-java-bridge_2.12:桥接器,主要负责 TableAPI 和 DataStream/DataSetAPI 的连接支持,按照语言分Java和Scala。
  • flink-table-planner-blink_2.12:计划期,是TableAPI最主要的部分,提供了运行时环境和生成程序执行计划的Planner。
  • 如果是生产环境,则已经有 planner,就只需要有bridge就可以了
  • flink-table:基础依赖

编写代码

package icu.wzk;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class TableApiDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);DataStreamSource<Tuple2<String, Integer>> data = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {@Overridepublic void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {while (true) {ctx.collect(new Tuple2<>("name", 10));Thread.sleep(1000);}}@Overridepublic void cancel() {}});// =======================// Table 方式Table table = tableEnvironment.fromDataStream(data, $("name"), $("age"));// 对Table的数据查询Table name = table.select($("name"));// 将数据输出到控制台DataStream<Tuple2<Boolean, Row>> result = tableEnvironment.toRetractStream(name, Row.class);result.print();System.out.println("=========================");// =======================// SQL 方式tableEnvironment.createTemporaryView("users",data, $("name"), $("age"));String sql = "select name from users";table = tableEnvironment.sqlQuery(sql);result = tableEnvironment.toRetractStream(table, Row.class);result.print();System.out.println("=========================");env.execute("TableApiDemo");}}

运行代码

控制台会一直不间断的输出如下的内容:

=========================
=========================
1> (true,name)
6> (true,name)
2> (true,name)
7> (true,name)
3> (true,name)
8> (true,name)
4> (true,name)
1> (true,name)
5> (true,name)
2> (true,name)
6> (true,name)
3> (true,name)

控制台的运行结果如下所示:
在这里插入图片描述


http://www.mrgr.cn/news/25157.html

相关文章:

  • 亲测好用,ChatGPT 3.5/4.0新手使用手册~
  • UEFI学习笔记(四):inf、dec和dsc
  • tekton pipeline resources
  • 项目启动 | 盘古信息携手晶捷电子,开启数字化生产管理新纪元
  • GDB的基本使用方法(之一)
  • linux驱动之模块化编程
  • 【数据结构】线性表的定义和基本操作
  • Tita的OKR:公司级 OKR 案例
  • 基于R语言结构方程模型分析与实践技术应用
  • Servlet(三)
  • 将独立的生成式AI系统整合为一个大脑
  • 从Apple Intelligence到IoT Intelligence,端侧生成式AI时代加速到来
  • python绘制3D瀑布图
  • 进程调度相关
  • windows.open使用
  • 低空经济 | 世界经济论坛:先进空中交通白皮书
  • JavaScript在网页设计中的应用案例
  • 图像拼接C++代码记录
  • Sqlserver常用sql
  • Python画笔案例-045 绘制渐变圆盘