当前位置: 首页 > news >正文

Spark2.x:通过 JDBC 连接数据库(DataFrame)

这里以关系数据库MySQL(MarinDB)为例。下面我们要新建一个测试Spark程序的数据库,数据库名称是“spark”,表的名称是“student”。

请执行下面命令在Linux中启动MySQL数据库,并完成数据库和表的创建,以及样例数据的录入:

MariaDB [(none)]> create database spark;
Query OK, 1 row affected (0.00 sec)MariaDB [(none)]> use spark;
Database changedMariaDB [spark]> create table student(id int(4), name char(20), gender char(4), age int(4));
Query OK, 0 rows affected (0.05 sec)MariaDB [spark]> insert into student values(1,'Xueqian','F',23);
Query OK, 1 row affected (0.01 sec)MariaDB [spark]> insert into student values(2,'Weiliang','M',24);
Query OK, 1 row affected (0.01 sec)MariaDB [spark]>  select * from student;
+------+----------+--------+------+
| id   | name     | gender | age  |
+------+----------+--------+------+
|    1 | Xueqian  | F      |   23 |
|    2 | Weiliang | M      |   24 |
+------+----------+--------+------+
2 rows in set (0.00 sec)

上面已经创建好了我们所需要的MySQL数据库和表,下面我们编写Spark应用程序连接MySQL数据库并且读写数据。

Spark支持通过JDBC方式连接到其他数据库获取数据生成DataFrame。

下面,我们要启动一个spark-shell,而且启动的时候,要附加一些参数。启动Spark Shell时,必须指定mysql连接驱动jar包。

[songxitang@master ~]$ spark2-shell \
> --jars /home/songxitang/spark/jars/mysql-connector-java-5.1.41-bin.jar \
> --driver-class-path /home/songxitang/spark/jars/mysql-connector-java-5.1.41-bin.jar

上面的命令行中,在一行的末尾加入斜杠\,是为了告诉spark-shell,命令还没有结束。

启动进入spark-shell以后,可以执行以下命令连接数据库,读取数据,并显示:

scala> val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://master:3306/spark").option("driver","com.mysql.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "123456").load()
jdbcDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]scala> jdbcDF.show()
+---+--------+------+---+
| id|    name|gender|age|
+---+--------+------+---+
|  1| Xueqian|     F| 23|
|  2|Weiliang|     M| 24|
+---+--------+------+---+

下面我们再来看一下如何往MySQL中写入数据。

现在我们开始在spark-shell中编写程序,往spark.student表中插入两条记录。
下面,我们要启动一个spark-shell,而且启动的时候,要附加一些参数。启动Spark Shell时,必须指定mysql连接驱动jar包(如果你前面已经采用下面方式启动了spark-shell,就不需要重复启动了)

启动进入spark-shell以后,可以执行以下命令连接数据库,写入数据,程序如下(你可以把下面程序一条条拷贝到spark-shell中执行):

import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row//下面我们设置两条数据表示两个学生信息
val studentRDD = spark.sparkContext.parallelize(Array("3 Rongcheng M 26","4 Guanhua M 27")).map(_.split(" "))//下面要设置模式信息
val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("gender", StringType, true),StructField("age", IntegerType, true)))//下面创建Row对象,每个Row对象都是rowRDD中的一行
val rowRDD = studentRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))//建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
val studentDF = spark.createDataFrame(rowRDD, schema)//下面创建一个prop变量用来保存JDBC连接参数
val prop = new Properties()
prop.put("user", "root") //表示用户名是root
prop.put("password", "123456") //表示密码是123456
prop.put("driver","com.mysql.jdbc.Driver") //表示驱动程序是com.mysql.jdbc.Driver//下面就可以连接数据库,采用append模式,表示追加记录到数据库spark的student表中
studentDF.write.mode("append").jdbc("jdbc:mysql://master:3306/spark", "spark.student", prop)

在spark-shell中执行完上述程序后,我们可以看一下效果,看看MarinDB数据库中的spark.student表发生了什么变化。

MariaDB [spark]> select * from student;
+------+-----------+--------+------+
| id   | name      | gender | age  |
+------+-----------+--------+------+
|    1 | Xueqian   | F      |   23 |
|    2 | Weiliang  | M      |   24 |
|    3 | Rongcheng | M      |   26 |
|    4 | Guanhua   | M      |   27 |
+------+-----------+--------+------+
4 rows in set (0.00 sec)

http://www.mrgr.cn/news/14969.html

相关文章:

  • 设计模式 11 享元模式
  • mysql基础知识
  • 音视频入门基础:WAV专题(6)——通过FFprobe显示WAV音频文件每个数据包的信息
  • 自己开发完整项目一、登录注册功能-01
  • UML之时序图
  • 【Springboot服务实现类】用户登录逻辑梳理(未完待续)
  • 使用matplotlib可视化dataframe:让你的数据更生动有趣
  • Baumer工业相机堡盟工业相机如何通过NEOAPISDK实现根据每次触发信号移动感兴趣区域ROI(Python)
  • VMware安装Win10系统后,启动系统提示不支持的处理器,怎么解决
  • 记Spring HTTP Invoker远程调用的使用(二)基于Servlet方式,配置servlet映射url-pattern实现
  • 谷歌又出三款Gemini模型
  • NLP从零开始------文本中阶序列处理之语言模型(完整版)
  • OpenCV绘图函数(9)填充多边形函数fillPoly()的使用
  • 【软件测试】软件测试生命周期与Bug
  • Redis 数据结构与持久化机制详解及缓存异常处理策略
  • 前端开发学习Docker记录02容器操作
  • 焦虑相关症状与错误相关大脑活动的机器学习研究
  • 2024年西安交通大学软件工程专业考研915真题
  • 基于爬山法MPPT和PI的直驱式永磁同步风力发电机控制系统simulink建模与仿真
  • 「bug」nvitop ERROR: Failed to initialize curses