Flink 实时数仓(二)【ODS 层开发】

news/2024/5/13 5:51:14

前言

        最近投了不少的实习,也收到不错的反馈,虽然是中小公司偏多,但是毕竟现在这个环境双非进大厂实习可不同当年了。可惜的是学院不放人,无奈啊,遍身罗绮者,不是养蚕人。我累死累活肝了两年了,好不容易找到个不错的实习,可是学院...

        不骂了,没那功夫扯淡。回归主题,实时数仓将来真要是有机会从事的话那真不错,也不枉我当时学Flink的时候花了一整个学期,没暖气没电源的教室跑着三台虚拟机,CPU爆满,每天学俩小时就没电了,但还是花了半个学期学完了。今天开始正式的数仓搭建。

1、ODS 层开发

        在实时数仓这里,当我们把数据采集到 Kafka (topic_log 和 topic_db 主题)的时候,其实 ODS 层的数据存储任务就已经完成了(ODS 层的任务:数据的存储和备份)。接下来我们需要做的就是保持数据的有序:

1.1、Kafka 数据有序

        Kafka 只能保证单分区内有序,并不能保证全局有序。

1.1.1、设置 Kafka 分区默认个数

        这里我们需要设置 Kafka 的分区个数为 4,毕竟实时数仓对数据的吞吐量、并发性能的要求是比较高的,所以我们不能为了数据的有序性而把数据到挤到一个分区中:

vim /opt/module/kafka_2.12-3.4.1/config/server.properties
// 修改配置
num.partition=4

1.1.2、设置 Flink 精准一次

        Flink 程序从 Kafka 消费数据时会启动同属于一个消费者组的四个消费者,Kafka 消费者的默认分区分配策略是 Range + CooperativeSticky,消费者数和分区数相同时,每个消费者消费一个分区的数据。只要单分区数据有序,即可保证 Flink 单个并行度数据有序。

        我们这里的 Kafka 版本是 3.0.0,在 Kafka 1.x 及之后的版本中,保证数据单分区有序,条件如下:

不开启 Kafka 幂等性的情况

max.in.flight.requests.per.connection=1

开启 Kafka 幂等性的情况: 

// 必须小于等于5
max.in.flight.requests.per.connection=5

        在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,故无论如何,都可以保证最近5个request的数据都是有序的。

        默认情况下幂等性是开启的,max.in.flight.requests.per.connection 默认值为 5,所以单分区数据默认是有序的,不需要做任何配置。

综上,我们可以保证 Flink 程序单个并行度的数据有序。

1.2、Maxwell 同步历史维度数据

        我们的实时计算不需要考虑历史的事实数据(比如下单、加购),但要考虑历史维度数据,毕竟没有维度只有业务过程是没法进行计算的。

1.2.1、为什么要同步历史维度数据而不同步历史业务数据

        当一个用户进行下单这个业务过程的时候,比如 user_id 为 1001 的用户在 province_id 为 17 的省份下单了 sku_id 为 10 的商品,并使用了 cupon_id 为 1001 的优惠券。

        当这个数据传到我们实时数仓的时候,我们必须知道用户是谁,它买了什么东西,有没有使用优惠券、使用了什么类型的优惠券、这个省份 id 是什么地方、下单方式是什么。这就涉及到了很多维度数据,所以我们必须提前把维度数据准备好,等到数据来的时候直接拿来用而不是才去业务库同步。

        在 ODS 层这里我们只需要原封不动的把维度数据导入到 Kafka ,等到搭建 DIM 层的时候直接从 ODS 层拿,而不是让 DIM 层去业务数据库中同步。

        在我们这个项目中,我们需要通过 Maxwell 同步下面这些维度表到 Kafka 的 

activity_info
activity_rule
activity_sku
base_category1
base_category2
base_category3
base_province
base_region
base_trademark
coupon_info
coupon_range
financial_sku_cost
sku_info
spu_info
user_info

编写同步脚本:

#!/bin/bash# 该脚本的作用是初始化所有的业务数据,只需执行一次MAXWELL_HOME=/opt/module/maxwellimport_data() {for tab in $@do$MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table ${tab} --config $MAXWELL_HOME/config.propertiesdone
}case $1 in
activity_info | activity_rule | activity_sku | base_category1 | base_category2 | base_category3 | base_dic | base_province | base_region | base_trademark | coupon_info | coupon_range | financial_sku_cost | sku_info | spu_info | user_info)import_data $1 ;;
"all")import_data activity_info activity_rule activity_sku base_category1 base_category2 base_category3 base_dic base_province base_region base_trademark coupon_info coupon_range financial_sku_cost sku_info spu_info user_info;;
esac

思考

        我们之前在学习离线数仓的时候, 使用 Maxwell 和 DataX 来同步业务数据,其中的 Maxwell 在离线数仓中其实并没有什么作用,至于削峰解耦在离线数仓中是根本不用考虑的。而如果不使用 Kafka ,我们可以直接通过 Flume 直接采集到 HDFS。

        在离线数仓中使用 Maxwell 的作用完全是为了现在学习实时数仓时,方便 Flink 来直接从 Kafka 去读取数据。但是 Flume 的数据中包含的 Event Header ,它对于实时数仓来说是完全没有用的,所以我们当时为了妥协实时数仓,就把 Flume 数据中的 Header 给去掉了,但是也就引入了零点漂移的问题,毕竟 Event Header 中保存着 timestamp 信息,而它在经过 Kafka 之后,会被 Kafka 给它添加一个 Header ,Header 中的 timestamp 时间默认为 Kafka 处理的时间,所以我们当时又设置了 Flume 拦截器来把 Header 中的 timestamp 值设置为 body 中的时间戳(因为拦截器只能设置在 Source 和 Channel 之间,所以还需要一个 Flume 再从 Kafka 读出来)。

Flume 拦截器代码

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;public class TimeStampInterceptor implements Interceptor {private ArrayList<Event> events = new ArrayList<>();@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {Map<String, String> headers = event.getHeaders();String log = new String(event.getBody(), StandardCharsets.UTF_8);JSONObject jsonObject = JSONObject.parseObject(log);String ts = jsonObject.getString("ts");headers.put("timestamp", ts);return event;}@Overridepublic List<Event> intercept(List<Event> list) {events.clear();for (Event event : list) {events.add(intercept(event));}return events;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new TimeStampInterceptor();}@Overridepublic void configure(Context context) {}}
}

总结

        实时数仓中 ODS 层的工作很简单,我们只需要用 Maxwell 把业务数据库中的维度表进行实时同步即可。至于服务器里的日志数据我们根本不需要实时处理!一个日志数据没必要实时处理!所以实时数仓中我们也就不需要用 Flume 这个工具。

        所以对于实时数仓的 ODS 层,我们主要用的就是 Maxwell 来同步维度数据,而对于事实数据(比如下单、加购),等到 DWD 层再进行处理。


http://www.mrgr.cn/p/58222266

相关文章

内网渗透-防火墙上线方案

windows防火墙默认规则为入站阻止,出站允许。 场景一:SQL服务器配置了防火墙 在单向防火墙中,攻击机可以使用正向,也可以使用反向上线web服务器,SQL服务器使用反向上线 场景二:WEB服务器配置了防火墙 在此场景中,攻击机需要使用反向来上线WEB服务器,SQL服务器使用正向上…

WEB攻防-.NET特性常见漏洞

目录 前置知识&#xff1a; DLL文件 .NET和DLL文件 C#和DLL文件 关系总结 .NET 配置调试-信息泄露 .NET 源码反编译-DLL 反编译与未授权访问 编译DLL文件 反编译DLL文件 注意事项 案例&#xff1a; 验证代码文件有没有可以绕过&#xff08;Cookie&Session&…

39、BlackRose(VulnHub)

BlackRose 一、nmap二、web渗透 随便看看注册进去 账号:xxxxxx 密码:xxxxxx目录爆破 有很多特殊的目录,不过访问后都重定向了burpsuite改包进admin 查看xxxxxx用户数据包 抓一些xxxxxx用户的一些记录包,看看有什么可用的 signature=&command=id&indexcsrf=3cb58993…

软件测试笔记_习题_面经

软件测试------按测试阶段划分有几个阶段? 单元测试、集成测试、系统测试、验收测试 软件测试------按是否查看源代码划分有几种测试方法? 黑盒、白盒、灰盒 软件测试------按是否运行划分有几种测试方法? 静态测试、动态测试 软件测试------按是否自动化划分有几种测试方…

Elasticsearch集群部署(Linux)

1. 准备环境 这里准备三台Linux虚拟机&#xff0c;用于配置Elasticsearch集群和部署可视化工具Kibana。 角色IP域名集群名称节点名称版本操作系统ES192.168.243.100linux100cluster-eses-node-1007.12.0CentOS 7192.168.243.101linux101cluster-eses-node-101192.168.243.102…

让研发规范管得住 - 我们为什么在流水线之上又做了研发流程?

让研发规范管得住是中大型研发团队的核心诉求,传统的流水线的模式去落地研发规范会存在不少局限。本文将聊聊我们在研发规范落地上的思考、从阿里内部获得的启发以及我们的解决方案。作者:子丑 为什么会有研发规范 很多程序员入职一家新的公司,领完电脑再安装完必备的开发工…

【Linux】基础指令

文章目录 基础指令1. pwd 指令2. cd 指令3. ls 指令4. touch 指令5. mkdir 指令6. rmdir 和 rm 指令7. man 指令8. cp 指令9. mv 指令10. cat 指令11. more 和 less 指令12. head 和 tail 指令13. date 指令14. cal 指令15. find 指令16. grep 指令18. zip 和 unzip 指令19. ta…

基于 Spring Boot 博客系统开发(四)

基于 Spring Boot 博客系统开发&#xff08;四&#xff09; 本系统是简易的个人博客系统开发&#xff0c;为了更加熟练地掌握 SprIng Boot 框架及相关技术的使用。&#x1f33f;&#x1f33f;&#x1f33f; 基于 Spring Boot 博客系统开发&#xff08;三&#xff09;&#x1f…

怎么提高职场辩论的口才能力的方法

提高职场辩论的口才能力是一个综合而复杂的过程&#xff0c;涉及知识积累、技巧学习、实践锻炼等多个方面。以下是关于如何提高职场辩论口才能力的详细分析和建议。 一、引言 在职场中&#xff0c;良好的口才能力对于个人职业发展具有重要意义。优秀的口才不仅能够提升个人的…

php 检测网络连接情况,限制超时时间

使用 dig 命令,并限制超时时间2秒:public function check_connection(){$response = shell_exec(dig +retries=1 +timeout=2 www.sina.com.cn);if (strpos($response, Got answer) !== false) {return 1;}return 0;} 试了 fsockopen 和调用 curl 命令,超时时间设置都不生效。…

解决IDEA中Tomcat控制台乱码问题(包括sout输出乱码)

文章目录 前言一、控制台直接输出乱码二、sout输出内容在控制台显示乱码 前言 今天在使用Tomcat的时候发现控制台输入出现了乱码问题&#xff0c;其实之前就出现过一次&#xff0c;解决了&#xff0c;但是新创建一个项目后又会出现sout的内容在控制台输出的乱码问题&#xff0…

模拟赛日常

赛前:胡了一个离谱的做法,毛估要写6k+时:写了一个完全不知道正确性的东西,还期望能过大样例时:发现样例于与程序输出差1,于是直接给答案加减1时:应用了一个懒得证明的看起来很对的结论后发现假完了:理论复杂度1e9甚至1e10跑进1s时:写了一个很假的甚至过不去大样例的东…

云原生Kubernetes: K8S 1.29版本 部署GitLab

目录 一、实验 1.环境 2.搭建NFS 3.K8S 1.29版本 部署Redis 4.K8S 1.29版本 部署Postgresql 5.K8S 1.29版本 部署GitLab 6.K8S 部署istio微服务 7.K8S 部署ingress应用路由 二、问题 1.K8S部署gitlab报错 2.gitlab创建失败 3.生成网关资源报错 4.安装istio 报错 …

LLM大语言模型(十三):ChatGLM3-6B兼容Langchain的Function Call的一步一步的详细转换过程记录

# LangChain&#xff1a;原始prompt System: Respond to the human as helpfully and accurately as possible. You have access to the following tools: Calculator: Useful for when you need to calculate math problems, args: {\calculation\: {\description\: \calcul…

[Paper Reading] DETR3D: 3D Object Detection from Multi-view Images via 3D-to-2D Queries

名称 DETR3D: 3D Object Detection from Multi-view Images via 3D-to-2D Queries 时间:21.10 机构:mit/CMU/Stanford TL;DR 一种利用Transformer做E2E的3D目标检测方法,在nuScenes自动驾驶数据集上取得很好效果。 Method主要创新点在于2D-to-3D Feature Transforms模块,细…

探索项目管理系统:解析五大功能,洞悉项目成功的关键

项目管理新手往往喜欢埋头苦干,殊不知优秀的项目经理已经熟练运用项目管理系统,让项目规划条理清晰。项目管理系统具备的功能,好用的项目管理系统都有这5大功能。分别是项目WBS分解、项目图表和报表、工时管理、团队协作、任务流程自动化。一、项目WBS分解 1.什么是项目WBS分…

PotatoPie 4.0 实验教程(26) —— FPGA实现摄像头图像拉普拉斯锐化

为什么要对图像进行拉普拉斯锐化 对图像进行拉普拉斯锐化的目的是增强图像的边缘和细节&#xff0c;使图像看起来更加清晰和锐利。这种技术常用于图像处理中&#xff0c;具体原因如下&#xff1a; 增强图像的边缘信息&#xff1a;拉普拉斯锐化可以突出图像中的边缘特征&#x…

适用于Windows和Mac的十大误删除数据恢复软件

数据恢复是从辅助存储或可移动文件中找回丢失、删除或损坏的数据的过程。数据丢失的原因有很多。因此&#xff0c;有必要恢复已删除的数据。有各种可用的软件工具&#xff0c;使用户能够恢复任何类型的已删除数据。但是&#xff0c;任何数据恢复都有四个主要阶段。他们正在修复…

CRM软件功能大揭秘:商业利器的多面功效与应用

一、 什么是CRM软件? CRM软件是指一款可以让企业利用相应的信息技术和互联网技术,协调企业与客户在销售、营销和服务方面的互动,提高其管理模式,为客户提供创新的个性化客户互动和服务的软件。使用CRM软件最终目标是吸引新客户,保留老客户,将现有客户转化为忠实客户,增加…