基于docker安装flink

news/2024/5/20 21:33:08

文章目录

  • 环境准备
    • Flink
      • docker-compose方式
      • 二进制部署
    • Kafka
    • Mysql
  • Flink 执行 SQL命令
    • 进入SQL客户端CLI
    • 执行SQL查询
      • 表格模式
      • 变更日志模式
      • Tableau模式
      • 窗口计算
    • 窗口计算
      • 滚动窗口demo
      • 滑动窗口
  • 踩坑

环境准备

Flink

docker-compose方式

version: "3"
services:jobmanager:image: flink:latestexpose:- "6123"ports:- "8081:8081"command: jobmanagerenvironment:- JOB_MANAGER_RPC_ADDRESS=jobmanagertaskmanager:image: flink:latestexpose:- "6121"- "6122"depends_on:- jobmanagercommand: taskmanagerlinks:- "jobmanager:jobmanager"environment:- JOB_MANAGER_RPC_ADDRESS=jobmanager

前端访问地址: http://192.168.56.112:8081/#/overview

二进制部署

wget https://archive.apache.org/dist/flink/flink-1.13.3/flink-1.13.3-bin-scala_2.11.tgzvim conf/flink-conf.yamljobmanager.rpc.address: 192.168.56.112 # 修改为本机ip./bin/start-cluster.sh

Kafka

version: '2'
services:zookeeper:image: wurstmeister/zookeeper   ## 镜像ports:- "2181:2181"                 ## 对外暴露的端口号kafka:image: wurstmeister/kafka       ## 镜像volumes:- /etc/localtime:/etc/localtime ## 挂载位置(kafka镜像和宿主机器之间时间保持一直)ports:- "9092:9092"environment:KAFKA_ADVERTISED_HOST_NAME: 192.168.56.112    ## 修改:宿主机IPKAFKA_ZOOKEEPER_CONNECT: 192.168.56.112:2181       ## 卡夫卡运行是基于zookeeper的kafka-manager:image: sheepkiller/kafka-manager                ## 镜像:开源的web管理kafka集群的界面environment:ZK_HOSTS:                    ## 修改:宿主机IPports:- "9000:9000"

Mysql

docker run -d -p3306:3306 --name=mysql57 -e MYSQL_ROOT_PASSWORD=111111 mysql:5.7

在这里插入图片描述

Flink 执行 SQL命令

进入SQL客户端CLI

docker exec  -it flink_jobmanager_1  /bin/bash./bin/sql-client.sh

在这里插入图片描述

执行SQL查询

SELECT 'Hello World';

在这里插入图片描述

表格模式

表格模式(table mode)在内存中物化结果,并将结果用规则的分页表格的形式可视化展示出来。执行如下命令启用:

SET sql-client.execution.result-mode = table;

可以使用如下查询语句查看不同模式的的运行结果:

SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;

在这里插入图片描述

变更日志模式

变更日志模式(changelog mode)不会物化结果。可视化展示由插入(+)和撤销(-)组成的持续查询结果流。

SET sql-client.execution.result-mode = changelog;

在这里插入图片描述

Tableau模式

Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容取决于作业执行模式(execution.type):

SET sql-client.execution.result-mode = tableau;

在这里插入图片描述

注意:当你在流式查询上使用这种模式时,Flink 会将结果持续的打印在当前的控制台上。如果流式查询的输入是有限数据集,那么 Flink 在处理完所有的输入数据之后,作业会自动停止,同时控制台上的打印也会自动停止。如果你想提前结束这个查询,那么可以直接使用 CTRL-C 按键,这个会停止作业同时停止在控制台上的打印。

窗口计算

TUMBLE(time_attr, interval) 定义一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为间隔对行进行分组。滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。

窗口计算

TUMBLE(time_attr, interval) 定义一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为间隔对行进行分组。滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。

滚动窗口demo

根据订单信息使用kafka作为数据源表,JDBC作为数据结果表统计用户在5秒内的订单数量,并根据窗口的订单id和窗口开启时间作为主键,将结果实时统计到JDBC中:

  1. 在MySQL的flink数据库下创建表order_count,创建语句如下:
CREATE TABLE `flink`.`order_count` (`user_id` VARCHAR(32) NOT NULL,`window_start` TIMESTAMP NOT NULL,`window_end` TIMESTAMP NULL,`total_num` BIGINT UNSIGNED NULL,PRIMARY KEY (`user_id`, `window_start`)
)        ENGINE = InnoDBDEFAULT CHARACTER SET = utf8mb4COLLATE = utf8mb4_general_ci;
  1. 创建flink opensource sql作业,并提交运行作业
CREATE TABLE orders (order_id string,order_channel string,order_time timestamp(3),pay_amount double,real_pay double,pay_time string,user_id string,user_name string,area_id string,watermark for order_time as order_time - INTERVAL '3' SECOND
) WITH ('connector' = 'kafka','topic' = 'order_topic','properties.bootstrap.servers' = '192.168.56.112:9092','properties.group.id' = 'order_group','scan.startup.mode' = 'latest-offset','format' = 'json'
);CREATE TABLE jdbcSink (user_id string,window_start timestamp(3),window_end timestamp(3),total_num BIGINT,primary key (user_id, window_start) not enforced
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.56.112:3306/flink','table-name' = 'order_count','username' = 'root','password' = '111111','sink.buffer-flush.max-rows' = '1'
);SELECT 'WINDOW',-- window_start,window_end,group_key,record_num,create_time,SUM(record_num) OVER w AS sum_amount
FROM temp
WINDOW w AS (PARTITION BY group_keyORDER BY rowtimeRANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW)select user_id,TUMBLE_START(order_time, INTERVAL '5' SECOND),TUMBLE_END(order_time, INTERVAL '5' SECOND),COUNT(*) from ordersGROUP BY user_id, TUMBLE(order_time, INTERVAL '5' SECOND) having count(*) > 3;SELECT 'WINDOW',user_id,order_id,real_pay,order_timeCOUNT(*) OVER w AS sum_amount
FROM orders
WINDOW w AS (PARTITION BY user_idORDER BY order_timeRANGE BETWEEN INTERVAL '60' SECOND PRECEDING AND CURRENT ROW) insert into jdbcSink select user_id,TUMBLE_START(order_time, INTERVAL '5' SECOND),TUMBLE_END(order_time, INTERVAL '5' SECOND),COUNT(*) from ordersGROUP BY user_id, TUMBLE(order_time, INTERVAL '5' SECOND) having count(*) > 3;
  1. Kafka 相关操作
bin/kafka-topics.sh --zookeeper 192.168.56.112:2181 --listbin/kafka-topics.sh --zookeeper 192.168.56.112:2181 --create --replication-factor 1 --partitions 1 --topic order_topicbin/kafka-console-producer.sh --broker-list 192.168.56.112:9092 --topic order_topicbin/kafka-console-consumer.sh --bootstrap-server 192.168.56.112:9092 --topic order_topic --from-beginningbin/kafka-topics.sh --zookeeper 192.168.56.112:2181 --describe --topic order_topic bin/kafka-topics.sh --zookeeper 192.168.56.112:2181 --delete --topic order_topic 

发送数据样例

{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-09-26 15:20:11", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:28:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:29:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:29:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:29:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:30:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:30:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}

滑动窗口

SELECT * FROM TABLE(HOP(TABLE orders, DESCRIPTOR(order_time), INTERVAL '2' SECOND, INTERVAL '10' SECOND));SELECT * FROM TABLE(HOP(DATA => TABLE orders,TIMECOL => DESCRIPTOR(order_time),SLIDE => INTERVAL '5' MINUTES,SIZE => INTERVAL '10' MINUTES));SELECT window_start, window_end, SUM(pay_amount)FROM TABLE(HOP(TABLE orders, DESCRIPTOR(order_time), INTERVAL '2' SECOND, INTERVAL '10' SECOND))GROUP BY window_start, window_end;

踩坑

  1. Could not find any factory for identifier ‘kafka’ that implements ‘org.apache.flink.table.factories.DynamicTableFactory’ in the classpath.

查看flink version

flink-sql-connector-kafka-1.17.1.jar

https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka/1.17.1

下载对应版本jar,放到lib目录下,重启

  1. Could not find any factory for identifier ‘jdbc’ that implements 'org.apache.flink.table.factories.DynamicTableFactory
    flink-connector-jdbc-3.1.0-1.17.jar
    https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar

  2. Caused by: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.0.31


http://www.mrgr.cn/p/57352737

相关文章

2022年windows的Visual Studio常用插件及使用手册

前景提要Viusual Studio 是一款很好用的C/C++集成开发工具,具有强大的扩展功能,好用的插件,但是,很多人都是只写了有什么插件,但是,没写怎么使用这种插件,使得使用的时候很是不方便,所以,笔者最近本着自己的学习,在这里写下自己关于好用的插件的研究,希望对您的学习/工作有帮助…

JDK源码阅读-------自学笔记(二十六)(java.util.Map 自定义讲解)

一、简介Map就是用来存储“键(key)-值(value)”对的. 通过键寻找value,所以键不能重复. 数组的本质也是一种键值对,区别就是索引一般是数字,而Map的Key可以是任意对象(字符串,数字),相当于把数组的索引范围扩的更大,使用更方便. 实际开发中较为常用.二、Map的常用方法实例(1)存…

华普检测温湿度监测系统建设方案

一、项目背景 随着医疗行业的蓬勃发展,药品、试剂和血液的储存安全直接关系到患者的健康。根据《药品存储管理规范》、《医疗器械冷链(运输、贮存)管理指南》、《疫苗储存和运输管理规范》和《血液存储要求》等相关法规,医院药剂…

Oracle修改字段长度及属性

首发微信公众号:SQL数据库运维 原文链接:https://mp.weixin.qq.com/s?__biz=MzI1NTQyNzg3MQ==&mid=2247486117&idx=1&sn=02e2cd05e5db7eaa5758c70e81cf3972&chksm=ea375ed5dd40d7c367727562bdb00788f3bd139cbbda377f599586a47ce13ad9d04c56fd4d2d&token…

初识C语言——第十六天

C语言中的语句结构类型:顺序/选择/循环 分支语句 if else switch 循环语句 while for do whlie goto语句 代码练习:找两个整数的最大公约数和最小公倍数 #define _CRT_SECURE_NO_WARNINGS #include <stdio.h>//int main() //{ // int age 60; // if (ag…

let命令

let 命令 let 与 var 二者区别&#xff1a; 作用域不同&#xff1a;变量提升&#xff08;Hoisting&#xff09;&#xff1a;临时性死区重复声明&#xff1a; 联系&#xff1a;举例说明&#xff1a; 块级作用域 块级作用域的关键字使用 var&#xff08;无块级作用域&#xff09;…

【数学建模】天然肠衣搭配问题衍生问题/线性规划限制条件建立问题

线性规划限制条件建立问题 前景回顾/提出问题回顾1回顾2/问题提出解决前提 解决方法坐标轴(区间)法总结 前景回顾/提出问题 回顾1 首先回顾一下DVD在线租赁问题 在 question2中&#xff0c;需要保证每个人都不会收到自己不喜欢的DVD&#xff0c;即客户在线订单数为0时候&…

C#/.NET/.NET Core优秀项目和框架2024年4月简报

前言 公众号每月定期推广和分享的C#/.NET/.NET Core优秀项目和框架(每周至少会推荐两个优秀的项目和框架当然节假日除外),公众号推文中有项目和框架的介绍、功能特点、使用方式以及部分功能截图等(打不开或者打开GitHub很慢的同学可以优先查看公众号推文,文末一定会附带项…

c#word文档:3.向Word文档中插入表格/4.读取Word文档中表格

--向Word文档中插入表格-- &#xff08;1&#xff09;在OfficeOperator项目的WordOperator类中定义向Word文档插入换页的函数NewPage &#xff08;2&#xff09;在WordOperator类中定义向Word文档插入表格的函数InsertTable using Microsoft.Office.Interop.Word;// 引入Mic…

30分钟彻底了解Flutter整个渲染流程(超详细)

30分钟彻底了解Flutter整个渲染流程[超详细] 从运行第一行代码出发WidgetsFlutterBinding初始化了一堆娃 三个中流砥柱SchedulerBindingRendererBindingWidgetsBinding 申请Vsync流程下发Vsync承接Vsync 从运行第一行代码出发 void main() {runApp(const MyApp()); }void runA…

linux中进程相关概念(一)

什么是程序&#xff0c;什么是进程&#xff0c;有什么区别&#xff1f; 程序是静态的概念&#xff0c;当我们使用gcc xxx.c -o pro进行编译时&#xff0c;产生的pro文件&#xff0c;就是一个程序。 进程是程序的一次运行活动&#xff0c;通俗点就是说程序跑起来了就是进程。 …

C++反汇编,指针和内存分配细节,面试题05

文章目录 20. 指针 vs 引用21. new vs malloc 20. 指针 vs 引用 指针是实体&#xff0c;占用内存空间&#xff0c;逻辑上独立&#xff1b;引用是别名&#xff0c;与变量共享内存空间&#xff0c;逻辑上不独立。指针定义时可以不初始化&#xff1b;引用定义时必须初始化。指针的…

Vue自定义封装音频播放组件(带拖拽进度条)

Vue自定义封装音频播放组件&#xff08;带拖拽进度条&#xff09; 描述 该款自定义组件可作为音频、视频播放的进度条&#xff0c;用于控制音频、视频的播放进度、暂停开始、拖拽进度条拓展性极高。 实现效果 具体效果可以根据自定义内容进行位置调整 项目需求 有播放暂停…

localhost 重定向次数过多

在完成javaweb作业时出现了错误初始页面只有两个功能, 但是无论是点击登录还是注册,都会跳转到login.jsp页面从网上找到的答案是代码陷入死循环,因为总是跳转到login.jsp, 所以我查看了所有servlet类中跳转到login.jsp页面的代码,逻辑上并没有问题;然后我又查看了过滤器以…

Windows平台使用CMake+MinGW64编译OpenCV

Windows平台使用CMake+MinGW64编译OpenCV (注:2年前写的笔记, 可能有些地方过时了) 目录Windows平台使用CMake+MinGW64编译OpenCV1.安装及配置环境1.1 MinGW-w641.2 CMake1.3 OpenCV源码2.CMake配置及生成2.1 新建目录2.2 CMake-GUI2.3 编译配置2.4 生成2.5 Make编译和安装3.配…

【大模型赋能开发者】海云安入选数世咨询LLM驱动数字安全2024——AI安全系列报告

近日&#xff0c;国内知名数字产业领域第三方调研咨询机构数世咨询发布了LLM驱动数字安全2024——AI安全系列报告。报告通过调研、公开信息收集等方式对目前十余家已具备LLM相关的应用能力安全厂商对比分析出了这一领域当前的产业现状并进行了各厂商的能力展示。 海云安凭借近…

金融业开源软件应用 评估规范

金融业开源软件应用 评估规范 1 范围 本文件规定了金融机构在应用开源软件时的评估要求&#xff0c;对开源软件的引入、维护和退出提出了实现 要求、评估方法和判定准则。 本文件适用于金融机构对应用的开源软件进行评估。 2 规范性引用文件 下列文件中的内容通过文中的规范…

介绍适用于 Node.js 的 Elastic OpenTelemetry 发行版

作者&#xff1a;来自 Elastic Trent Mick 我们很高兴地宣布推出 Elastic OpenTelemetry Distribution for Node.js 的 alpha 版本。 该发行版是 OpenTelemetry Node.js SDK 的轻量级包装&#xff0c;可以让你更轻松地开始使用 OpenTelemetry 来观察 Node.js 应用程序。 背景 …

x64dbg中类似于*.exe+地址偏移

在CE和xdb中&#xff0c;形如*.exe数字偏移形式的地址被称为模块地址&#xff0c;CE附加到进程后点击查看内存&#xff0c;显示如下图 这种地址学名叫做模块地址&#xff0c;在x64dbg中显示如下图&#xff1a; CE中可以关闭&#xff0c;从而显示绝对的虚拟地址&#xff0c;如下…

时间复杂度空间复杂度 力扣:转轮数组,消失的数字

1. 算法效率 如何衡量一个算法的好坏&#xff1f;一般是从时间和空间的维度来讨论复杂度&#xff0c;但是现在由于计算机行业发展迅速&#xff0c;所以现在并不怎么在乎空间复杂度了下面例子中&#xff0c;斐波那契看上去很简洁&#xff0c;但是复杂度未必如此 long long Fib…