Flink 广播-broadcast

news/2024/5/16 16:24:13

一.Flink 广播-broadcast介绍

在Apache Flink中,"广播"是一种特殊的数据分发模式,用于将数据从一个并行操作传播到整个作业的所有并行任务中。广播操作对于将少量数据有效地分发给并行任务,以便它们能够共享这些数据而不必进行昂贵的网络通信,是非常有用的。它通常用于将配置信息、静态数据集或参考数据传播给Flink作业中的所有并行任务。

广播的主要优势在于,它避免了将数据通过网络发送到所有并行任务的开销,而是直接将数据复制到每个任务的本地内存中。这样,任务可以直接从本地内存访问数据,无需进行远程通信。这对于那些需要频繁访问同一份数据的场景非常有利,可以显著降低作业的整体延迟。

二.Flink 广播-broadcast使用场景

  1. 配置参数传播: 广播非常适合将配置参数传播到所有并行任务。例如,如果您的Flink作业需要根据特定的配置进行处理,您可以将这些配置参数广播到所有任务中,避免了每个任务单独获取配置的开销。

  2. 规则和字典传递: 在一些数据处理场景中,需要使用一组静态的规则或字典进行数据转换或处理。通过广播这些规则或字典,可以避免每个任务都去加载这些数据,从而减少处理延迟和资源消耗。

  3. 静态数据集共享: 如果您有一个静态数据集(不随时间变化)需要在整个作业的所有任务之间共享,广播可以提供一种高效的方式来共享这些数据,而不需要进行重复传输。

  4. 机器学习模型参数共享: 在机器学习任务中,有时候需要将训练好的模型参数广播给所有任务,以便它们在进行推断或预测时能够共享这些参数,避免重复计算。

  5. 数据分片信息: 在一些任务中,需要根据数据的分片信息来进行特定处理。通过广播数据分片信息,所有任务可以了解到数据的分片情况,从而更有效地进行处理。

  6. 数据缓存: 当需要频繁访问同一份数据时,通过广播将数据复制到每个任务的本地内存中,可以避免重复从磁盘或网络读取数据,从而提高处理性能。

需要注意的是,广播适用于较小的数据集。如果广播的数据集非常大,超过了内存容量,反而可能导致性能问题,因为它会消耗过多的内存资源。

三.Flink 广播-broadcast代码使用

在Flink中,要实现广播操作,需要遵循以下步骤:

  1. 创建要广播的数据集:首先,您需要准备要广播的数据集。通常,这是一个小型的静态数据集,例如配置参数、字典、规则等。这些数据将被广播到整个作业的所有并行任务中。

  2. 广播数据集:一旦您有了要广播的数据集,您需要使用broadcast()方法将其标记为广播数据集。这将会在作业的整个执行过程中将数据复制到所有任务的本地内存中。

  3. 使用广播数据集:在作业的其他部分,您可以通过调用withBroadcastSet()方法将广播数据集传递给算子函数。这样,每个并行任务都可以访问广播数据集,而不必重新复制或通过网络通信。

    package com.fwmagic.flink.batch;import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.functions.RichMapFunction;
    import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    import org.apache.flink.api.common.time.Time;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.concurrent.TimeUnit;/*** Broadcast操作*/
    public class Broadcast {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.fixedDelayRestart(4,Time.of(10, TimeUnit.SECONDS)));//1、准备要广播的数据ArrayList<Tuple2<String,Integer>> broadData = new ArrayList<>();broadData.add(new Tuple2<>("zs", 23));broadData.add(new Tuple2<>("ls", 34));broadData.add(new Tuple2<>("ww", 45));//1.1 处理需要广播的数据,把数据集转成map,key为name,value为ageDataSet<HashMap<String, Integer>> dateSet = env.fromCollection(broadData).map(new MapFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {@Overridepublic HashMap<String, Integer> map(Tuple2<String, Integer> tuple) throws Exception {HashMap<String,Integer> hashMap = new HashMap<>();hashMap.put(tuple.f0, tuple.f1);return hashMap;}});//源数据DataSource<String> dataSource = env.fromElements("zs", "ls", "ww");//注意:在这里需要使用到RichMapFunction获取广播变量DataSet<String> result = dataSource.map(new RichMapFunction<String, String>() {List<HashMap<String, Integer>> broadcastMap = new ArrayList<>();HashMap<String, Integer> allMap = new HashMap<>();@Overridepublic void open(Configuration parameters) throws Exception {broadcastMap = getRuntimeContext().getBroadcastVariable("mybroadcastdata");for (HashMap<String, Integer> map : broadcastMap) {allMap.putAll(map);}}@Overridepublic String map(String name) throws Exception {Integer age = allMap.get(name);return name + "," + age;}}).withBroadcastSet(dateSet, "mybroadcastdata");result.print();}
    }
    


http://www.mrgr.cn/p/72504611

相关文章

论文笔记--GloVe: Global Vectors for Word Representation

论文笔记--GloVe: Global Vectors for Word Representation 1. 文章简介2. 文章概括3 文章重点技术3.1 两种常用的单词向量训练方法3.2 GloVe3.3 模型的复杂度 4. 文章亮点5. 原文传送门6. References 1. 文章简介 标题&#xff1a;GloVe: Global Vectors for Word Representa…

<MySQL> Centos 7环境安装MySQL

Centos 7环境安装MySQL 1.卸载不要的环境 停止MySQL服务 systemctl stop mariadb.service systemctl stop mysqld禁止MySQL服务开机自启 systemctl disable mysqld卸载MySQL软件包 yum remove mysql-server mysql-client删除MySQL数据目录 rm -rf /var/lib/mysql清理MySQ…

安装了pyintaller后出现:‘pyinstaller‘ 不是内部或外部命令,也不是可运行的程序或批处理文件。

2023年7月31日&#xff0c;周一上午 我昨天晚上也遇到了这个问题&#xff0c;后来解决了 目录 出错原因解决方法怎么找到Scripts文件夹 出错原因 出现这个错误是因为你没给python的Scripts文件夹添加环境变量&#xff0c; Scripts存放着pip安装包时产生的可执行文件。 解决…

CentOS下 Docker、Docker Compose 的安装教程

Docker 是一个开源的应用容器引擎&#xff0c;让开发者可以打包他们的应用以及依赖包到一个可移植的容器中&#xff0c;然后发布到任何流行的 Linux 机器上&#xff0c;也可以实现虚拟化。容器是完全使用沙箱机制&#xff0c;相互之间不会有任何接口。 Docker Compose是用于定义…

如何启用路由器dhcp?快解析如何内网穿透?

一、什么是DHCP&#xff1f; 动态主机设置协议&#xff08;DHCP&#xff09;是一种使网络管理员能够集中管理和自动分配 IP 网络地址的通信协议。在网络中&#xff0c;每个联网设备都需要分配独有的 IP 地址。并当有新计算机移到网络中的其它位置时&#xff0c;能自动收到新的…

百度文心一言接入教程-Java版

原文链接 前言 前段时间由于种种原因我的AI BOT网站停运了数天&#xff0c;后来申请了百度的文心一言和阿里的通义千问开放接口&#xff0c;文心一言的接口很快就通过了&#xff0c;但是文心一言至今杳无音讯。文心一言通过审之后&#xff0c;很快将AI BOT的AI能力接入了文心…

从头学前端-CSS3提升-续

CSS3 2D转换 关键字&#xff1a;transform 移动&#xff1a;沿着x,y轴移动&#xff0c;不会影响盒子的位置&#xff0c;对行内元素没有效果 div {width: 100px;height: 100px;background-color: rebeccapurple;transform: translate(100px,100px);transform: translateX(100p…

TSINGSEE青犀视频安防监控视频平台EasyCVR新增密码复杂度提示

智能视频监控平台TSINGSEE青犀视频EasyCVR可拓展性强、视频能力灵活、部署轻快&#xff0c;可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等&#xff0c;以及支持厂家私有协议与SDK接入&#xff0c;包括海康Ehome、海大宇等设备的SDK等&#xff0c;能对外分发RTSP、RTM…

孩子近视有必要用全光谱灯吗?全光谱led灯推荐

当然&#xff0c;有必要!全光谱LED灯的光源分布更加均匀&#xff0c;使空间更加美观舒适&#xff0c;而普通灯的光源分布可能会在一定范围内分布不均匀。全光谱它的使用寿命长达20-30万小时&#xff0c;而普通灯的使用寿命仅为1000-2000小时&#xff0c;因此在长期使用上&#…

kettle开发-Day40-AI分流之case/switch

前言&#xff1a; 前面我们讲到了很多关于数据流的AI方面的介绍&#xff0c;包括自定义组件和算力提升这块的&#xff0c;今天我们来学习一个关于kettle数据分流处理非常重要的组件Switch / Case 。当我们的数据来源于类似日志、csv文件等半结构化数据时&#xff0c;我们需要在…

React之组件的生命周期

React之组件的生命周期 一、概述二、整体说明三、挂载阶段四、更新阶段五、卸载阶段 一、概述 生命周期:一个事务从创建到最后消亡经历的整个过程组件的生命周期&#xff1a;组件从被创建到挂载到页面中运行&#xff0c;再到组件不用时卸载的过程意义&#xff1a;理解组件的生…

Windows环境下git客户端中的git-bash和MinGW64

我们在 Windows10 操作系统下&#xff0c;安装了 git 客户端之后&#xff0c;可以通过 git-bash.exe 打开一个 shell&#xff1a; 执行一些 linux 系统里的命令&#xff1a; 注意到上图紫色的 MINGW64. Mingw-w64 是原始 mingw.org 项目的改进版&#xff0c;旨在支持 Window…

数据安全

数据的备份与恢复 1. 数据备份技术 任何数据在长期使用过程中&#xff0c;都存在一定的安全隐患。由于认为操作失误或系统故障&#xff0c;例如认为错误、程序出错、计算机失效、灾难和偷窃&#xff0c;经常造成数据丢失&#xff0c;给个人和企业造成灾难性的影响。在这种情况…

公网访问的Linux CentOS本地Web站点搭建指南

文章目录 前言1. 本地搭建web站点2. 测试局域网访问3. 公开本地web网站3.1 安装cpolar内网穿透3.2 创建http隧道&#xff0c;指向本地80端口3.3 配置后台服务 4. 配置固定二级子域名5. 测试使用固定二级子域名访问本地web站点 前言 在web项目中,部署的web站点需要被外部访问,则…

TypeScript -- 函数

文章目录 TypeScript -- 函数JS -- 函数的两种表现形式函数声明函数的表达式es6 箭头函数 TS -- 定义一个函数TS -- 函数声明使用接口(定义)ts 定义参数可选参数写法 -- ?的使用TS函数 -- 设置剩余参数函数重载 TypeScript – 函数 JS – 函数的两种表现形式 我们熟知js有两…

python字典:怎么取出key对应的值

目录 python中的字典是什么 怎么判断key是否在字典中 怎么取出key对应的值 总结 python中的字典是什么 在Python中&#xff0c;字典&#xff08;Dictionary&#xff09;是一种无序且可变的数据类型&#xff0c;用于存储键-值&#xff08;Key-Value&#xff09;对。字典通过…

基于传统检测算法hog+svm实现图像多分类

直接上效果图&#xff1a; 代码仓库和视频演示b站视频005期&#xff1a; 到此一游7758258的个人空间-到此一游7758258个人主页-哔哩哔哩视频 代码展示&#xff1a; 数据集在datasets文件夹下 运行01train.py即可训练 训练结束后会保存模型在本地 运行02pyqt.py会有一个可视化…

Element Drawer 抽屉改变默认宽度和高度

Drawer 抽屉 默认宽度为30%&#xff0c;想要改变宽度只需要使用 :size"你需要设置的值" <el-drawer title"我是标题" :visible.sync"drawer" :size"你需要设置的值"><span>我来啦!</span> </el-drawer> dat…

nginx实战

1.nginx简介 1.1 什么是nginx Nginx 是高性能的 HTTP 和反向代理的web服务器&#xff0c;处理高并发能力是十分强大的&#xff0c;能经受高负 载的考验,有报告表明能支持高达 50,000 个并发连接数。 其特点是占有内存少&#xff0c;并发能力强&#xff0c;事实上nginx的并发…