【SparkSQL】Spark读取外部文件及写出数据(附开窗函数)
这里写一些SQL中的难点
炸裂函数
spark.sql( """ with t2 as( select explode(split(value,' ')) as word from t1 where length (value) > 0 ) select word, count(*) cnt from t2 group by word order by cnt """ ).show() # Explode函数里面的值必须为map类型或者array类型, 也就是列表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 - <font size=4 color=red face='华文楷体'>开窗函数</font>
- SQL风格
- ```python
# SQL风格 - 创建临时视图, 写SQL
emp_df.createOrReplaceTempView('emp')
dept_df.createOrReplaceTempView('dept')
spark.sql("""
with tmp as(
select
emp_id,
emp_name,
salary,
bonus,
e.dep_id,
d.dep_name,
dense_rank() over(partition by e.dep_id order by salary desc) as dr
from emp e join dept d on e.dep_id = d.dep_id
)
select
*
from tmp where dr <= 2
""").show()DSL 风格 - 不需要创建视图
1
2
3
4
5
6
7
8 # DSL 风格 - 不需要创建视图
dept_df\
.join(emp_df, 'dep_id', 'inner')\
.withColumn(
'rk',
dense_rank().over(Window.partitionBy('dep_id').orderBy(col('salary').desc()))
)\
.where('rk <= 2').show()
电影评分案例
源数据
- 电影评分数据:datas/movie/ratings.dat【用户id、电影id、评分、评分时间】
1
2
3
4
5
6
7
8
9
10
11 1::1193::5::978300760
1::661::3::
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
1::1197::3::978302268
1::1287::5::978302039
1::2804::5::978300719
1::594::4::978302268
1::919::4::978301368
1::595::5::978824268
- 电影信息数据:datas/movie/movies.dat【电影id、电影名称、分类】
1
2
3
4
5
6
7
8
9 1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children's
9::Sudden Death (1995)::Action
需求
统计评分次数大于2000的所有电影中平均评分最高的Top10,结果显示电影名称、电影平均评分、电影评分次数
思路
- 1、将电影评分文件转为movie_rdd
- 2、将movie_rdd转为movie_df
- 3、将电影信息文件转为rate_rdd
- 4、将rate_rdd转为rate_df
SQL风格
- 5、将rate_df注册为临时视图rate
- 6、将movie_df注册为临时视图movie
- 7、编写SparkSQL,将rate和movie进行join操作
- KeyWords: 内连接, CTE表达式
DSL风格
- 7、编写SparkSQL,将rate和movie进行join操作
CodeDemo
1 | import time |
将RDD转为DataFrame
1 | 注意: 以后直接用的就是将数据转为DataFrame, 没有先转为RDD这一步操作了, 这里是多看看, 多了解一下. |
读取各种文件并转为DataFrame的方式
三种方式将RDD转为DataFrame, 遵循公式(RDD + Schema = DataFrame)
1, 自动推断: RDD中每条数据必须为Row类型,Spark就可以实现自动推断
将RDD变为Row对象
movie_rdd_row = fileRdd.map( lambda item: Row(user_id = item[0],movieid=item[1], rate=float(item[2]), ts=int(item[3])))
1
2
3
4
5
6
- 将RDD转换成DF
- ```python
movie_df = spark.createDataFrame(movie_rdd_row)2, 自定义Schema转换DataFrame
先构建元组类型的RDD
movie_rdd_tuple = fileRdd.map(lambda item: (item[0], item[1], float(item[2]), int(item[3])))
1
2
3
4
5
6
7
8
9
10
11
- 自定义一个Schema对象
- ```python
movie_schema = StructType([
StructField(name="userid", dataType=StringType(), nullable=True),
StructField(name="movieid", dataType=StringType(), nullable=True),
StructField(name="rate", dataType=DoubleType(), nullable=True),
StructField(name="ts", dataType=LongType(), nullable=True)
])3, 指定列名称转换DataFrame - 直接调用rdd中的toDF函数就ok
先构建元组类型的RDD
movie_rdd_list = fileRdd.map(lambda item: (item[0], item[1], float(item[2]), int(item[3])))
1
2
3
4
5
6
- 将RDD转换成DataFrame
- ```python
movie_df = movie_rdd_list.toDF(["userid", "movieid", "rate", "ts"])
将DataFrame转为RDD
直接DataFrame.rdd就转为rdd了
Spark读取外部文件的方式
读取外部文件转为不同的数据类型
- 读取外部文件转为RDD
- fileRdd = sc.textFile(‘文件的路径’)
- 读取外部文件转为DataFrame
- demo_df = spark.read.text(‘文件的路径’)
读取外部文件的类型
1 | text文件 -- text |
读取方式
三种读取方式
- 方式1:
df = spark.read.文件类型(上述)(path)
- 举例: dataFrame1 = spark.read.text(path)
- 这里指定表结构需要加在read后面指定spark.read.schema(‘列名1 类型, 列名2 类型’).文件类型(path)
- 方式2:
df = spark.read.format("文件类型").load(path)
- 举例: dataFrame1 = spark.read.format(“parquet”).load(path)
- 方式3:
df = spark.read.load(path,format="文件类型")
- 举例: dataFrame1 = spark.read.load(path,format=”json”)
读取text文件
概念:
- 1、以text文本的方式进行读取,读取之后数据只有一列,列名默认是value,类型是String
- 2、由于以text方式读取的DataFrame数据只有一列,分析非常不方便,建议慎用
1 | # 1、将text文件读取并转为DataFrame |
读取csv文件-重点
大体顺序可以分为:
spark.read.schema.option.csv(path)
分隔符是逗号,没有表头
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22# 方式1----、将csv文件读取并转为DataFrame
#.schema('stuid string,name string,age int') 给DataFrame添加表结构(列名和列类型)
df1 = spark.read\
.schema('stuid string,name string,age int')\
.csv('hdfs://node1:8020/datas/input/read_type/people.csv')
df1.show()
df1.printSchema()
# 方式2----、将csv文件读取并转为DataFrame
# .schema('stuid string,name string,age int') 给DataFrame添加表结构(列名和列类型)
df2 = spark.read\
.format('csv')\
.schema('id string,name string,age int')\
.load('hdfs://node1:8020/datas/input/read_type/people.csv')
df2.show()
df2.printSchema()
# 方式3----将csv文件读取并转为DataFrame
# .schema('stuid string,name string,age int') 给DataFrame添加表结构(列名和列类型)
df3 = spark.read\
.schema('id string,name string,age int')\
.load('hdfs://node1:8020/datas/input/read_type/people.csv',format='csv')
df3.show()
df3.printSchema()分隔符不是逗号,没有表头
1
2
3
4
5
6
7df1 = spark.read\
.schema('stuid string,name string,age int')\
.option('sep','#')\
.csv('hdfs://node1:8020/datas/input/read_type/people1.csv')
df1.show()
df1.printSchema()分隔符不是逗号,有表头
1
2
3
4
5
6
7
8
9
10
11#.schema('stuid string,name string,age int') 给DataFrame添加表结构(列名和列类型)
#.option('sep','#') 手动指定文件之间的分隔符
#.option('header', 'true') 如果有表头,spark可以自动推断每列的名字,但是每列的类型都是String类型,
# 如果想指定每列的类型,则可以定义schema信息 或者添加 .option("inferSchema", "true") 参数
df1 = spark.read\
.option('sep','#')\
.option('header','true')\
.option("inferSchema", "true")\
.csv('hdfs://node1:8020/datas/input/read_type/people2.csv')
df1.show()
df1.printSchema()
读取json文件
读取json文件,spark可以自动推导每列的名字和类型,我们不需要设置额外参数
1 | # 将json文件读取并转为DataFrame |
读取parquet和orc文件
读取orc或者Parquet文件,我们不需要设置额外参数,spark可以自动推导列名和列类型
1 | #1、将orc文件读取并转为DataFrame |
Spark写入外部文件的方式
语法
写入模式
1
2
3
4append: 追加模式,当数据存在时,继续追加
overwrite: 覆写模式,当数据存在时,覆写以前数据,存储当前最新数据;
error/errorifexists: 如果目标存在就报错,默认的模式
ignore: 忽略,数据存在时不做任何操作方式1
1
2
3
4df.write.mode(saveMode="append").format("text").save(path)
df.write.mode(saveMode="overwrite").format("json").save(path)
df.write.mode(saveMode="errorifexists").format("csv").save(path)
df.write.mode(saveMode="ignore").format("parquet").save(path)方式2
1
2
3
4df.write.text(path)
df.write.json(path)
df.write.csv(path)
df.write.parquet(path)代码
text文件写入
1
2#建议放弃text文件写入,因为text要求所有字段都是string类型,要求只能有一列
df_rs.write.mode(saveMode="append").format("text").save('hdfs://node1:8020/datas/output/text')csv写入
1
2#csv写入,不需要设置任何参数,默认分隔符是逗号
df_rs.write.mode(saveMode="errorifexists").format("csv").save('hdfs://node1:8020/datas/output/csv')json写入
1
2
3#json写入,不需要设置任何参数,默认每行都生成json串
df_rs.write.mode(saveMode="overwrite").format("json").save('hdfs://node1:8020/datas/output/json')orc 和 parque
1
2
3
4
5#orc和parquet写入,不需要设置任何参数,文件内容看不懂,但是程序可以识别
df_rs.write.mode(saveMode="ignore").format("orc").save('hdfs://node1:8020/datas/output/orc')
df_rs.write.mode(saveMode="ignore").format("parquet").save('hdfs://node1:8020/datas/output/parquet')
Hive及MySQL数据读取
MySQL数据的读写
环境准备
概述
1
连接mysql必须知道的参数:url地址,数据库,端口,用户名、密码
环境准备
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
191、在node1,node2,node3上进入以下目录,将MySQL的驱动jar包上传到该目录
cd /export/server/anaconda3/lib/python3.8/site-packages/pyspark/jars
2、根据不同模式进行以下操作
2.1 -----------本地模式---------------
在node1上进入以下目录上传驱动jar包
cd /export/server/spark-local/jars
2.2 -----------Yarn模式---------------
在node1,node2,node3上进入以下目录上传驱动jar包
cd /export/server/spark-yarn/jars
2.3 -----------StandAlone模式---------------
在node1,node2,node3上进入以下目录上传驱动jar包
cd /export/server/spark-standalone/jars
3、将MySQL的驱动包传入HDFS上/spark/jars目录数据准备
1
2
3
41、查看node1上mysql数据库中是否有db_company数据库
2、如果没有则执行以下操作,否则什么也不做
mysql -uroot -p
source source /root/db_company.sql #将db_company.sql放入node1的/root目录
读取MySQL数据
代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21#-------------------读取mysql数据-方式1---------------------------------
prop = {'user':'root', 'password':'123456', 'driver':'com.mysql.jdbc.Driver'}
table = "db_company.emp"
url = "jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true"
jdbc_df1 = spark.read.jdbc(url=url, table=table, properties=prop)
jdbc_df1.show()
jdbc_df1.printSchema()
print('-------------------------------------------------------')
#-------------------读取mysql数据-方式2---------------------------------
jdbc_df2 = spark.read \
.format("jdbc") \
.option("url", "jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true") \
.option("dbtable", "db_company.emp") \
.option("user", "root") \
.option("password", "123456") \
.load()
jdbc_df2.show()
jdbc_df2.printSchema()
写入MySQL数据
环境准备
1
2
3
4
5
6
7
8#1、在mysql中创建表
CREATE TABLE db_company.emp_v3 (
`empno` int(11) NOT NULL,
`ename` varchar(10) DEFAULT NULL,
`job` varchar(9) DEFAULT NULL,
`sal` double DEFAULT NULL,
`deptno` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ;代码
1
2
3
4
5
6
7
8
9
10
11
12#------------------写入mysql数据-方式2---------------------------------
jdbc_df2 \
.select('empno', 'ename', 'job', 'sal', 'deptno') \
.write \
.mode("append") \
.format("jdbc") \
.option("url", "jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true") \
.option("dbtable", "db_company.emp_v3") \
.option("user", "root") \
.option("password", "123456") \
.save()
Hive的数据读写
数据准备,进入hive终端,执行以下命令
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39CREATE DATABASE IF NOT EXISTS `db_hive`;
USE `db_hive`;
DROP TABLE IF EXISTS `dept`;
CREATE TABLE `dept` (
`deptno` int,
`dname` string,
`loc` string
)
row format delimited fields terminated by '\t';
--------方式1-insert方式----------------------
insert into `dept`(`deptno`,`dname`,`loc`) values (10,'ACCOUNTING','NEW YORK'),(20,'RESEARCH','DALLAS'),(30,'SALES','CHICAGO'),(40,'OPERATIONS','BOSTON');
--------方式2-load方式----------------------
load data local inpath '/root/dept.txt' overwrite into table dept;
DROP TABLE IF EXISTS `emp`;
CREATE TABLE `emp` (
`empno` int,
`ename` string,
`job` string,
`mgr` string,
`hiredate` string,
`sal` double,
`comm` double,
`deptno` int
)
row format delimited fields terminated by '\t';
导入数据使用以下命令或者使用load方式加载文件
--------方式1-insert方式----------------------
insert into `emp`(`empno`,`ename`,`job`,`mgr`,`hiredate`,`sal`,`comm`,`deptno`) values (7369,'SMITH','CLERK',7902,'1980-12-17',800,NULL,20),(7499,'ALLEN','SALESMAN',7698,'1981-02-20',1600,300,30),(7521,'WARD','SALESMAN',7698,'1981-02-22',1250,500,30),(7566,'JONES','MANAGER',7839,'1981-04-02',2975,NULL,20),(7654,'MARTIN','SALESMAN',7698,'1981-09-28',1250,1400,30),(7698,'BLAKE','MANAGER',7839,'1981-05-01',2850,NULL,30),(7782,'CLARK','MANAGER',7839,'1981-06-09',2450,NULL,10),(7788,'SCOTT','ANALYST',7566,'1987-07-13',3000,NULL,20),(7839,'KING','PRESIDENT',NULL,'1981-11-17',5000,NULL,10),(7844,'TURNER','SALESMAN',7698,'1981-09-08',1500,0,30),(7876,'ADAMS','CLERK',7788,'1987-07-13',1100,NULL,20),(7900,'JAMES','CLERK',7698,'1981-12-03',950,NULL,30),(7902,'FORD','ANALYST',7566,'1981-12-03',3000,NULL,20),(7934,'MILLER','CLERK',7782,'1982-01-23',1300,NULL,10);
--------方式2-load方式----------------------
load data local inpath '/root/emp.txt' overwrite into table emp;环境准备
- 启动服务
1
2
3
4
5
6
7启动HDFS服务:NameNode和DataNodes
start-dfs.sh
启动HiveMetaStore 服务
nohup hive --service metastore 2>&1 &
start-metastore.sh- 配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31----------------local模式:在node1配置--------------------------
进入配置文件目录,创建配置文件
cd /export/server/spark-local/conf
vim hive-site.xml
添加以下内容
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://node1.itcast.cn:9083</value>
</property>
</configuration>
----------------Spark On Yarn模式:在node1,node2,node3配置------------------------
进入配置文件目录创建配置文件
cd /export/server/spark-yarn/conf
vim hive-site.xml
添加以下内容
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://node1.itcast.cn:9083</value>
</property>
</configuration>- 启动命令行
1
/export/server/spark-local/bin/pyspark --master local[2] --conf spark.sql.shuffle.partitions=2
- 验证环境
1
2
3列举
spark.sql("show databases").show()
spark.sql("show tables in db_hive") .show()代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67import time
from pyspark import SparkContext, SparkConf, StorageLevel
import os
import sys
from pyspark.sql import SparkSession, Window
# spark入门案例 --- WordCount
# 1、设置环境变量
from pyspark.sql.functions import *
os.environ['JAVA_HOME'] = '/export/server/jdk'
os.environ['SPARK_HOME'] = '/export/server/spark' # Spark安装位置
os.environ["PYSPARK_PYTHON"] = "/export/server/anaconda3/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/export/server/anaconda3/bin/python3"
if __name__ == '__main__':
#创建用于操作Hive的SparkSession
"""
spark.sql.warehouse.dir:用来指定Hive表数据在HDFS的位置
hive.metastore.uris :用来指定hive的metastore服务(runjar)在哪台主机
enableHiveSupport :开启spark支持hive
"""
spark = SparkSession \
.builder \
.appName("SparkSQLAppName") \
.master("local[2]") \
.config("spark.sql.shuffle.partitions", 2) \
.config("spark.sql.warehouse.dir", 'hdfs://node1.itcast.cn:8020/user/hive/warehouse')\
.config("hive.metastore.uris", "thrift://node1.itcast.cn:9083")\
.enableHiveSupport()\
.getOrCreate()
# 设置日志级别为WARN
spark.sparkContext.setLogLevel("WARN")
#--------------------从hive读取-SQL风格--------------------------------
#使用spark操作hive的表
spark.sql(
"""
select * from db_hive.emp;
"""
).show()
spark.sql(
"""
select deptno,count(*) as cnt from db_hive.emp group by deptno;
"""
).show()
# --------------------从hive读取-DSL风格--------------------------------
dataFrame1 = spark.read.table("db_hive.emp")
dataFrame2 = dataFrame1 \
.select("deptno", "sal") \
.groupBy("deptno") \
.agg(
count("*").alias("cnt")
)
dataFrame2.show()
# --------------------向hive写入--------------------------------
#test_rs表如果不存在,则创建,如果存在,则覆盖数据
dataFrame2.write.mode("overwrite").format("hive").saveAsTable("db_hive.test_rs")
All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.