头歌:SparkSQL简单使用

news/2024/5/17 12:28:03

第1关:SparkSQL初识
 

任务描述


本关任务:编写一个sparksql基础程序。

相关知识


为了完成本关任务,你需要掌握:1. 什么是SparkSQL 2. 什么是SparkSession。  

什么是SparkSQL


Spark SQL是用来操作结构化和半结构化数据的接口。
当每条存储记录共用已知的字段集合,数据符合此条件时,Spark SQL就会使得针对这些数据的读取和查询变得更加简单高效。具体来说,Spark SQL提供了以下三大功能:
(1) Spark SQL可以从各种结构化数据源(例如JSON、Parquet等)中读取数据。

(2) Spark SQL不仅支持在Spark程序内使用SQL语句进行数据查询,也支持从类似商业智能软件Tableau这样的外部工具中通过标准数据库连接器(JDBC/ODBC)连接sparkSQL进行查询。

(3) 当在Spark程序内使用Spark SQL时,Spark SQL支持SQL与常规的Python/Java/Scala代码高度整合,包括连接RDD与SQL表、公开的自定义SQL函数接口等。

什么是SparkSession


Spark中所有功能的入口点都是SparkSession类。要创建基本的SparkSession,只需使用SparkSession.builder()。

import  org.apache.spark.sql.SparkSession;
SparkSession  spark  =  SparkSession .builder().appName("Java Spark SQL基本示例").master("local").config("spark.some.config.option" , "some-value").getOrCreate();//打印spark版本号System.out.println(spark.version());


编程要求


请仔细阅读右侧代码,根据方法内的提示,在Begin - End区域内进行代码补充,具体任务如下:

打印spark的版本号。
测试说明
补充完代码后,点击测评,平台会对你编写的代码进行测试,当你的结果与预期输出一致时,即为通过。

package com.educoder.bigData.sparksql;import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;public class Test1 {public static void main(String[] args) throws AnalysisException {/********* Begin *********/SparkSession  spark  =  SparkSession   .builder()  .appName("Java Spark SQL基本示例")  .master("local")  .config("spark.some.config.option" , "some-value")  .getOrCreate();  //打印spark版本号  System.out.println(spark.version());  /********* End *********/}}

第2关:Dataset创建及使用
 

任务描述


本关任务:创建Dataset并使用

相关知识


为了完成本关任务,你需要掌握:

什么是Dataset;
Dataset如何创建 ;
Dataset如何操作数据。
什么是Dataset
在Spark2.0版本以后,DataFrame API将会和Dataset  API合并,统一数据处理API。故实训中的Dateset和DataFrame可看成一个概念。
Dataset和RDD一样,也是Spark的一种弹性分布式数据集,它是一个由列组成的数据集,概念上等同于关系型数据库中的一张表,但在底层具有更丰富的优化。Dataset可以从多种来源构建,例如:结构化数据文件,Hive中的表,外部数据库或现有RDD。有人肯定会问,已经有了弹性分布式数据集RDD,为什么还要引入Dataset呢?因为在Spark中,我们可以像在关系型数据库中使用SQL操作数据库表一样,使用Spark SQL操作Dataset。这让熟悉关系型数据库SQL人员也能轻松掌握。

上图直观地体现了 Dataset 和 RDD 的区别。左侧的 RDD[Person] 虽然以 Person 为类型参数,但 Spark 框架本身不了解 Person 类的内部结构。而右侧的 Dataset 却提供了详细的结构信息,使得 Spark SQL 可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。Dataset 除了提供了比 RDD 更丰富的算子以外,更重要的特点能提升执行效率、减少数据读取以及执行计划的优化等。

Dataset 上可用的操作分为转换和操作。转换是产生新 Dataset 的转换,动作是触发计算和返回结果的转换。示例转换包括map,filter,select和aggregate(groupBy)。示例操作将数据计数,显示或写入文件系统。

Dataset 是“懒惰的”,即只有在调用动作时才会触发计算。在内部,Dataset 表示描述生成数据所需计算的逻辑计划。调用操作时,Spark 的查询优化器会优化逻辑计划,并以并行和分布式方式生成有效执行的物理计划。要探索逻辑计划以及优化的物理计划,请使用explain函数。

要有效地支持特定于域的对象,需要使用编码器。编码器将域特定类型T映射到 Spark 的内部类型系统。例如,给定一个具有两个字段的  Person,name(string)和age(int),编码器用于告诉 Spark 在运行时生成代码以将 Person 对象序列化为二进制结构。该二进制结构通常具有低得多的存储器占用面积以及针对数据处理(例如,柱状格式)的效率进行优化。要了解数据的内部二进制表示,请使用模式函数。

Dataset如何创建
通常有两种方法来创建Dataset。最常见的方法是使用SparkSession上提供的读取功能将Spark指向存储系统上的某些文件。

//创建泛型的Dataset
Dataset<Row> df = spark.read().json("people.json");
//创建Person类型的DatasetDataset<Person> people = spark.read().json("people.json").as(Encoders.bean(Person.class));
//以表格形式显示前20行Dataset
df.show();
people.show();


也可以通过现有数据集上的转换来创建Dataset。 例如,以下内容通过对现有数据集应用过滤器来创建新Dataset:

   Dataset<String> names = people.map((Person p) -> p.name, Encoders.STRING));


Dataset如何操作数据
Dataset操作数据有两种方式:API方式处理数据和以编程方式处理数据。

API方式处理数据


Dataset操作也可以通过以下定义的各种特定于域的语言(DSL)函数进行无类型操作:Dataset(类),列和函数。 这些操作与R或Python中的数据框抽象中可用的操作非常相似。
要从数据集中选择列,请在在Java中使用col 。

  

 Column ageCol = people.col("age");


请注意,Column类型也可以通过其各种功能进行操作。

import static org.apache.spark.sql.functions.col;
以树格式
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// 仅选择“名称”列
df.select("name").show();
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+
// 选择所有人,但将年龄增加1 
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+
// 选择年龄超过21 
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// 计数按年龄的人
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+


以编程方式处理数据
SparkSession支持让应用程序以编程方式运行SQL查询并返回结果。

//读取json,并将Dataset,并注册为SQL临时视图
sparkSession.read().json("people.json").createOrReplaceTempView("people");
//以表格形式显示前20行Dataset
sparkSession.sql("select * from people").show();
// + ---- + ------- + 
// | 年龄| 名称| 
// + ---- + ------- + 
// | null | Michael | 
// | 30 | 安迪| 
// | 19 | 贾斯汀| 
// + ---- + ------- +


编程要求


根据提示,在右侧编辑器补充代码,读取people.json文件,过滤age为23的数据,并以表格形式显示前20行Dataset。

people.json文件内容如下:

{"age":21,"name":"张三", "salary":"3000"}
{"age":22,"name":"李四", "salary":"4500"}
{"age":23,"name":"王五", "salary":"7500"}

package com.educoder.bigData.sparksql;import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.SparkSession;public class Test2 {public static void main(String[] args) throws AnalysisException {SparkSession  spark  =  SparkSession .builder().appName("test1").master("local").config("spark.some.config.option" , "some-value").getOrCreate();/********* Begin *********/spark.read().json("people.json").createOrReplaceTempView("people");  spark.sql("select * from people where age != '23'").show();/********* End *********/}}

第3关:Dataset自定义函数

 

任务描述


本关任务:编写Dataset自定义函数。

相关知识


为了完成本关任务,你需要掌握:

UserDefinedAggregateFunction介绍;
如何使用。
UserDefinedAggregateFunction
UserDefinedAggregateFunction是实现用户定义的聚合函数基础类,用户实现自定义无类型聚合函数必须扩展UserDefinedAggregateFunction 抽象类,相关方法如下:

方法及方法返回    描述


StructType bufferSchema()    StructType表示聚合缓冲区中值的数据类型。
DataType dataType()    UserDefinedAggregateFunction的返回值的数据类型
boolean deterministic()    如果此函数是确定性的,则返回true
Object evaluate(Row buffer)    根据给定的聚合缓冲区计算此UserDefinedAggregateFunction的最终结果
void initialize(MutableAggregationBuffer buffer)    初始化给定的聚合缓冲区
StructType inputSchema()    StructType表示此聚合函数的输入参数的数据类型。
void merge(MutableAggregationBuffer buffer1, Row buffer2)    合并两个聚合缓冲区并将更新的缓冲区值存储回buffer1
void update(MutableAggregationBuffer buffer, Row input)    使用来自输入的新输入数据更新给定的聚合缓冲区
如何使用
我们以计算员工薪水平均值的例子来说:
首先在用户自定义函数的构造函数中,定义聚合函数的输入参数的数据类型和聚合缓冲区中值的数据类型。

//定义员工薪水的输入参数类型为LongType
List<StructField> inputFields = new ArrayList<StructField>();
inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.LongType, true));
inputSchema = DataTypes.createStructType(inputFields);
//定义员工薪水总数、员工个数的参数类型
List<StructField> bufferFields = new ArrayList<StructField>();
bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true));
bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));
bufferSchema = DataTypes.createStructType(bufferFields);


对聚合缓冲区中值设置初始值。

@Overridepublic void initialize(MutableAggregationBuffer buffer) {// TODO Auto-generated method stubbuffer.update(0, 0L);buffer.update(1, 0L);}


把自定义函数的输入薪水数据转化为定义的聚合缓冲区的值(薪水总数、员工个数),并更新。

@Override
public void update(MutableAggregationBuffer buffer, Row input) {if (!input.isNullAt(0)) {long updatedSum = buffer.getLong(0) + input.getLong(0);long updatedCount = buffer.getLong(1) + 1;buffer.update(0, updatedSum);buffer.update(1, updatedCount);}
}


把多个聚合缓冲区的值进行合并。

@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {// TODO Auto-generated method stublong mergedSum = buffer1.getLong(0) + buffer2.getLong(0);long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);buffer1.update(0, mergedSum);buffer1.update(1, mergedCount);
}


最后通过聚合缓冲区的值计算输出结果。

@Override
public Object evaluate(Row buffer) {// TODO Auto-generated method stubreturn ((double) buffer.getLong(0)) / buffer.getLong(1);
}


就此自定义函数就开发完了,通过SparkSession的udf()方法会返回注册用户定义函数的方法集合UDFRegistration
通过UDFRegistration调用register方法进行自定义函数注册,使用如下:

// 注册自定义函数myAverage
spark.udf().register("myAverage", new MyAverage());
//读取json文件
spark.read().json("people.json").createOrReplaceTempView("people");
//使用自定义函数计算薪水平均值
spark.sql("SELECT myAverage(salary) as average_salary FROM people").show();
// +--------------+
// |average_salary|
// +--------------+
// |        5000|
// +--------------+


编程要求


请仔细阅读右侧代码,根据方法内的提示,在Begin - End区域内进行代码补充,编写自定义函数类MyAverage,用来计算用户薪水平均值,平台已提供了最后的实现:

spark.udf().register("myAverage", new MyAverage());
spark.read().json("people.json").createOrReplaceTempView("people");
spark.sql("SELECT myAverage(salary) as average_salary FROM people").show();

测试说明


补充完代码后,点击测评,平台会对你编写的代码进行测试,当你的结果与预期输出一致时,即为通过。

package com.educoder.bigData.sparksql;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class MyAverage extends UserDefinedAggregateFunction {
private static final long serialVersionUID = 1L;
private StructType inputSchema;
private StructType bufferSchema;
public MyAverage() {
List<StructField> inputFields = new ArrayList<StructField>();
inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.LongType, true));
inputSchema = DataTypes.createStructType(inputFields);
List<StructField> bufferFields = new ArrayList<StructField>();
bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true));
bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));
bufferSchema = DataTypes.createStructType(bufferFields);
}
@Override
public StructType bufferSchema() {
// TODO Auto-generated method stub
return bufferSchema;
}
@Override
public DataType dataType() {
// TODO Auto-generated method stub
return DataTypes.DoubleType;
}
@Override
public boolean deterministic() {
// TODO Auto-generated method stub
return true;
}
@Override
public Object evaluate(Row buffer) {
// TODO Auto-generated method stub
return ((double) buffer.getLong(0)) / buffer.getLong(1);
}
@Override
public void initialize(MutableAggregationBuffer buffer) {
// TODO Auto-generated method stub
buffer.update(0, 0L);
buffer.update(1, 0L);
}
@Override
public StructType inputSchema() {
// TODO Auto-generated method stub
return inputSchema;
}
@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
// TODO Auto-generated method stub
long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);
long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);
buffer1.update(0, mergedSum);
buffer1.update(1, mergedCount);
}
@Override
public void update(MutableAggregationBuffer buffer, Row input) {
if (!input.isNullAt(0)) {
long updatedSum = buffer.getLong(0) + input.getLong(0);
long updatedCount = buffer.getLong(1) + 1;
buffer.update(0, updatedSum);
buffer.update(1, updatedCount);
}
}
}


http://www.mrgr.cn/p/32671882

相关文章

ES与关系数据库的同步练习(hotel_admin)

目录 1 es与数据库同步的方法2 实践2.1 任务介绍2.2 MQ方面操作2.2.1 声明交换机队列并且绑定2.2.2 hotel_admin端web层设置mq发送消息2.3 hotel_demo端监听接受消息并执行es操作 1 es与数据库同步的方法 方式一&#xff1a;同步调用 优点&#xff1a;实现简单&#xff0c;粗…

【项目构建】04:动态库与静态库制作

OVERVIEW 1.编译动态链接库&#xff08;1&#xff09;编译动态库&#xff08;2&#xff09;链接动态库&#xff08;3&#xff09;运行时使用动态库 2.编译静态链接库&#xff08;1&#xff09;编译静态库&#xff08;2&#xff09;链接静态库&#xff08;3&#xff09;运行时使…

C#反射应用

1.根据类名名称生成类实例 CreateInstance后面的参数部分一定要和所构造的类参数数量对应&#xff0c;即使设置参数默认值&#xff0c;也不可省略。 2.只知道类名&#xff0c;需要将该类作为参数调用泛型接口。 3.只知道类名&#xff0c;需要将该类的数组作为参数调用泛型接口…

php合并时间区间

需要写一段合并时间区间的代码,写个demo记录下 <?php $arr = [["2024-04-16 11:25:46", "2024-04-16 12:19:21"],["2024-04-16 03:14:06", "2024-04-16 10:13:21"],["2024-04-16 13:14:59", "2024-04-16 15:44:46…

罗宾斯《管理学》第15版笔记/课后习题/考研真题答案

第Ⅰ篇 管理导论 第1章 工作场所中的管理者和你 1.1 知识结构导图 1.2 考点难点归纳 1.3 课后习题详解 1.4 考研真题详解 附加模块一 管理史 知识结构导图 考点难点归纳 课后习题详解 考研真题详解 第2章 决 策 2.1 知识结构导图 2.2 考点难点归纳 2.3 课后习题详解…

王道数据结构个人向笔记-第二章(线性表)

目录2.1 线性表的定义和基本操作2.2 顺序表2.2.1 顺序表的定义2.2.2 顺序表的插入、删除(实现是基于静态分配)2.2.3 顺序表的查找2.3 链表2.3.1 单链表的定义2.3.2 单链表的插入删除2.3.3 单链表的查找2.3.4 单链表的建立2.3.4 双链表2.3.5 循环链表2.1 线性表的定义和基本操…

配置 Trunk,实现相同VLAN的跨交换机通信

1.实验环境 公司的员工人数已达到 100 人&#xff0c;其网络设备如图所示。现在的网络环境导致广播较多网速慢&#xff0c;并且也不安全。公司希望按照部门划分网络&#xff0c;并且能够保证一定的网络安全性。 其网络规划如下。 PC1和 PC3为财务部&#xff0c;属于VLAN 2&…

centos7 openresty lua 自适应webp和缩放图片

目录 背景效果图准备安装cwebp等命令&#xff0c;转换文件格式安装ImageMagick&#xff0c;压缩文件下载Lua API 操控ImageMagick的依赖包 代码参考 背景 缩小图片体积&#xff0c;提升加载速度&#xff0c;节省流量。 效果图 参数格式 &#xff1a; ?image_processformat,…

短视频素材哪个App最好?短视频素材哪里有免费的?

在数字媒体的黄金时代&#xff0c;富有创意的视频内容已成为吸引观众的关键。高质量的视频素材不仅能增强视觉效果&#xff0c;还能提升整体叙述的力度。以下列出了一系列全球顶尖的视频素材提供网站&#xff0c;它们将为你的广告制作、社交媒体或任何视频项目提供极具影响力的…

Java安全基础之Java反射机制和ClassLoader类加载机制

反射机制允许程序在运行时检查和操作类、对象、方法以及属性的信息。类加载机制负责将类的字节码加载到内存中,并且在运行时动态地链接和初始化类。目录Java 反射机制反射 java.lang.RuntimeClassLoader 类加载机制URLClassLoaderloadClass() 与 Class.forName() 的区别? Jav…

高中生一定就会了么???(i)

\(题源:2023星光杯数学思维能力测评(小学组)第一试\)\(表示离谱\)

打破失联困境:门店如何利用AI智能名片B2B2C商城小程序重构与消费者的紧密连接?

在如今这个消费者行为日益碎片化的时代&#xff0c;门店经营者们时常感叹&#xff1a;消费者进店如同一场不期而遇的缘分&#xff0c;然而一旦离开门店&#xff0c;就仿佛消失在茫茫人海中&#xff0c;难以再觅其踪迹。这种“进店靠缘分&#xff0c;离店就失联”的困境&#xf…

鸿蒙安装apk软件失败(不支持该设备)

1.关闭纯净模式增强模块 2.给文件管理器一个权限,一个安装外部来源应用的权限魔芋爽要犯了.jpg1.关闭纯净血压增高模块 2.安装外部来源(默认文件管理器是没有权限的) 3.没登华为账号,等七天过了再来试试后续(matepad11.5就是垃圾,快退!.jpg)

Outlook大附件插件 有效解决附件大小限制问题

很多企业都是使用Outlook来进行邮件的收发&#xff0c;可是由于附件大小有限&#xff0c;导致很多大文件发不出去&#xff0c;就会产生Outlook大附件插件这种业务需求。 邮件系统在发送大文件时面临的限制问题主要如下&#xff1a; 1、附件大小限制&#xff1a;大多数邮件服务…

1. SpringBoot 入门

1. SpringBoot 简介 SpringBoot是由Pivotal团队提供的全新框架,可以帮助我们开发基于Spring的、独立的、生产级的应用程序。​ 其中SpringBoot的官网是:Spring Boot Reference DocumentationSpringBoot的主要目标是:为所有Spring开发提供更快的入门体验开箱即用,提供了自动…

uniapp视频播放器(h5+app)

关于uniapp视频播放器遇到的一些问题&#xff0c;mark下。 中途遇到了很多问题&#xff0c;如果有相同的伙伴遇到了类似的&#xff0c;欢迎交流 官方的video播放器在app上不友好&#xff0c;有以下功能不支持。 loadedmetadata、controlstoggle不支持导致只能手写控制层。 不…

mysql连接不上,服务中找不到mysql

分析 因为太久没使用mysql,服务自动删除了解决 注册/安装服务 win+x,a,以管理员打开powershell(或者使用cmd,随你) # 注意此处需要引号,因为有空格 # 1. cd到mysql的可执行文件,如果记不得或者像我一样懒,直接everything搜索mysqld.exe即可 cd C:\Program Files\MySQL\My…

星趴解包教程

目录#1 提取资源文件#2 解密#3 读取资源#4 导出资源#4.1 导出单个 / 少量资源#4.2 按种类导出资源#5 资源去重(可选) 本篇文章偏小白向,有一定基础的可以选择性阅读 本文仅供学习交流使用,请勿用于商业用途。更新至 2024.5.2, 星趴版本号 v1.2.3_20240430_123a#1 提取资源文…

快速了解Django:核心概念解析与实践指南

title: 快速了解Django&#xff1a;核心概念解析与实践指南 date: 2024/5/1 20:31:41 updated: 2024/5/1 20:31:41 categories: 后端开发 tags: Django核心路由系统视图系统ORM管理中间件Web框架登录装饰器 第一章&#xff1a;Django简介 背景和发展历程&#xff1a; Djan…