PySpark交互式编程

如下chapter2-data1.txt,该数据集包含了某大学计算机系的成绩,数据格式如下所示:

1
2
3
4
5
6
7
Tom,DataBase,80
Tom,Algorithm,50
Tom,DataStructure,60
Jim,DataBase,90
Jim,Algorithm,60
Jim,DataStructure,80
……

请根据给定的实验数据,在pyspark中通过编程来计算以下内容:

(1)该系总共有多少学生;

(2)该系共开设了多少门课程;

(3)Tom同学的总成绩平均分是多少;

(4)求每名同学的选修的课程门数;

(5)该系DataBase课程共有多少人选修;

(6)各门课程的平均分是多少;

(7)使用累加器计算共有多少人选了DataBase这门课。

Q1-求学生总数: 不管做什么首先需要从某个地方读取到需要处理的数据, Linux本地的路径协议是file://, 而HDFS的路径协议是hdfs://主机名:端口号. 所以第一步是读取文件,第二步是获取每行中的第一列. 第三步是将获取到的数据进行去重操作. 第四步是获取元素总个数

1
2
3
4
lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")
res = lines.map(lambda x:x.split(",")).map(lambda x: x[0]) //获取每行数据的第1
distinct_res = res.distinct() //去重操作
distinct_res.count()//取元素总个数

Q2-求课程总数:原理同上

1
2
3
4
lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")
res = lines.map(lambda x:x.split(",")).map(lambda x:x[1]) //获取每行数据的第2
distinct_res = res.distinct()//去重操作
distinct_res.count()//取元素总个数

Q3 - 求tom同学每门课程的平均分: 使用到的函数有map,filter, foreach, count, reduce

1
2
3
4
5
6
7
8
9
10
11
# 使用Map算子将数据的每一行都按照指定元素切开, 每行的数据变为了列表, 取列表中的第0个元素, 值为Tom
scoreRdd = fileRdd.map(lambda line: line.split(",")).filter(lambda list: list[0] == 'Tom')
# 上一步得到['Tom,DataBase,80', 'Tom,Algorithm,50', 'Tom,DataStructure,60']
# 通过map算子取每行数据的第三个数据, 并将其转为int类型
allScoreRdd = scoreRdd.map(lambda x: int(x[2]))
# 统计有几个int类型(也就是课程的个数
num = allScoreRdd.count()
# 通过Reduce算子将每个成绩进行聚合
scores = allScoreRdd.reduce(lambda x,y: x+y)
# 求平均成绩
print(scores/num)

Q4 - 求每位同学的选课数量

1
2
3
4
5
6
stuRdd = fileRdd.map(lambda line: line.split(","))
# 得到['Tom', 'DataBase', '80'], ['Tom', 'Algorithm', '50'], ['Tom', 'DataStructure', '60']
infoRdd = stuRdd.map(lambda x: (x[0], 1))
# 得到[('Tom', 1), ('Tom', 1), ('Tom', 1), ('Jim', 1), ('Jim', 1), ('Jim', 1)] 有点词频统计的感觉
resRdd = infoRdd.reduceByKey(lambda x,y: x+y)
print(resRdd.collect())

Q5 - 求该系DataBase课程共有多少人选修

1
2
3
4
5
listRdd = fileRdd.map(lambda x: x.split(",")).filter(lambda list: list[1] == 'DataBase')
# listRdd = [['Tom', 'DataBase', '80'], ['Jim', 'DataBase', '90']]
# 取每个学生信息的姓名(去重后再统计个数
disRdd = listRdd.map(lambda list:list[0])
print(disRdd.distinct().count())

Q6 - 求各门课程的平均分是多少

1
2
3
4
5
6
7
8
9
10
infoListRdd = fileRdd.map(lambda line: line.split(","))
courseListRdd = infoListRdd.map(lambda list: (list[1],(int(list[2]), 1))) # ('ComputerNetwork', (44, 1))
# 按课程名聚合课程总分和选课人数。格式如('DataBase', (170, 2))
# reduceByKey算子分为两步: 1, 得到['Hadoop', [1,1,1,1]] 2, 得到 ['Hadoop', 5] 3, x是1,y是1,然后x是2,y是1
# 首先按照Key分组,Database计算这里的x是(80, 1), y是(90, 1),所以reduceByKey会得到('DataBase', (170, 2))
aggRdd = courseListRdd.reduceByKey(lambda x,y: (x[0]+y[0],x[1]+y[1]))
# 得到: [('DataBase', (170, 2)), ('Algorithm', (110, 2)), ('DataStructure', (140, 2))]
# 这里传入map算子的是个('DataBase', (170, 2)), 将得到的成绩总和 除以 总人数(保留两位小数)就得到了结果
resRdd = aggRdd.map(lambda x:(x[0], round(x[1][0]/x[1][1],2)))
print(resRdd.collect())

Q7 - 求使用累加器计算共有多少人选了DataBase这门课

1
2
3
4
5
lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")
res = lines.map(lambda x:x.split(",")).filter(lambda x:x[1]=="DataBase")//筛选出选了DataBase课程的数据
accum = sc.accumulator(0) //定义一个从0开始的累加器accum
res.foreach(lambda x:accum.add(1))//遍历res,每扫描一条数据,累加器加1
accum.value //输出累加器的最终值

编写程序实现数据去重

对于两个输入文件A和B,编写Spark独立应用程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。下面是输入文件和输出文件的一个样例,供参考。

输入文件A的样例如下:

1
2
3
4
5
6
7
8
9
10
11
20170101    x

20170102 y

20170103 x

20170104 y

20170105 z

20170106 z

输入文件B的样例如下:

1
2
3
4
5
6
7
8
9
20170101    y

20170102 y

20170103 x

20170104 z

20170105 y

根据输入的文件A和B合并得到的输出文件C的样例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
20170101    x

20170101 y

20170102 y

20170103 x

20170104 y

20170104 z

20170105 y

20170105 z

20170106 z

Answer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from pyspark import SparkContext, SparkConf
import os

os.environ['JAVA_HOME'] = '/export/server/jdk'
os.environ['SPARK_HOME'] = '/export/server/spark' #Spark安装位置
os.environ["PYSPARK_PYTHON"] = "/export/server/anaconda3/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/export/server/anaconda3/bin/python3"

if __name__ == '__main__':
conf = SparkConf().setMaster('local[2]').setAppName('wordcount01')
sc = SparkContext(conf=conf)
"""
对于两个输入文件A和B,编写Spark独立应用程序,
对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。
"""
lines1 = sc.textFile('file:///export/data/a.txt')
lines2 = sc.textFile('file:///export/data/b.txt')
# 合并两个文件中的内容
lines = lines1.union(lines2).filter(lambda line:len(line) > 0)
# 对文件中的内容去重
distinct_lines = lines.distinct()
# 排序操作
res = distinct_lines.sortBy(lambda x:x)
# 结果验真
print(res.collect())
# 将结果输出到文件中(repartition(1)的作用是让结果合并到一个文件中,不加的话会结果写入到两个文件
res.repartition(1).saveAsTextFile("file:///export/data/result.txt")

编写程序实现求平均值问题

每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。下面是输入文件和输出文件的一个样例,供参考。

Algorithm成绩:

1
2
3
4
5
6
7
小明 92

小红 87

小新 82

小丽 90

Database成绩:

1
2
3
4
5
6
7
小明 95

小红 81

小新 89

小丽 85

Python成绩:

1
2
3
4
5
6
7
小明 82

小红 83

小新 94

小丽 91

平均成绩如下:

1
2
3
4
5
6
7
(小红,83.67)

(小新,88.33)

(小明,89.67)

(小丽,88.67)

Answer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from pyspark import SparkContext, SparkConf
import os

os.environ['JAVA_HOME'] = '/export/server/jdk'
os.environ['SPARK_HOME'] = '/export/server/spark' #Spark安装位置
os.environ["PYSPARK_PYTHON"] = "/export/server/anaconda3/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/export/server/anaconda3/bin/python3"

if __name__ == '__main__':
conf = SparkConf().setMaster('local[2]').setAppName('wordcount01')
sc = SparkContext(conf=conf)
"""
每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;
编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。
"""
Algorithm = sc.textFile("file:///export/data/result.txt/Algorithm.txt")
Database = sc.textFile("file:///export/data/result.txt/Database.txt")
Python = sc.textFile("file:///export/data/result.txt/Python.txt")
print(Algorithm.collect())
print(Database.collect())
print(Python.collect())
# 合并三个文件的内容
contentRdd = Algorithm.union(Database).union(Python)
print(contentRdd.collect())
# 首先将数据按照空格切开, 转为data类型, 形如('小红',(92, 1))
data = contentRdd.map(lambda x: x.split(" ")).map(lambda x:(x[0], (int(x[1]), 1)))
# 其次将数据聚合,得到('小红',(190, 3))
res = data.reduceByKey(lambda x,y: (x[0]+y[0],x[1]+y[1]))
# 最后计算最终结果
result = res.map(lambda line: (line[0],round(line[1][0]/line[1][1], 2)))
print(result.collect())
# 将结果写入一个文件中(repartition(1)的作用是让结果合并到一个文件中,不加的话会写入到3个文件
result.repartition(1).saveAsTextFile("file:////export/data/demo.txt")