【Spark】PySpark刷题本(一)
PySpark交互式编程
如下chapter2-data1.txt,该数据集包含了某大学计算机系的成绩,数据格式如下所示:
1 | Tom,DataBase,80 |
请根据给定的实验数据,在pyspark中通过编程来计算以下内容:
(1)该系总共有多少学生;
(2)该系共开设了多少门课程;
(3)Tom同学的总成绩平均分是多少;
(4)求每名同学的选修的课程门数;
(5)该系DataBase课程共有多少人选修;
(6)各门课程的平均分是多少;
(7)使用累加器计算共有多少人选了DataBase这门课。
Q1-求学生总数: 不管做什么首先需要从某个地方读取到需要处理的数据, Linux本地的路径协议是file://, 而HDFS的路径协议是hdfs://主机名:端口号. 所以第一步是读取文件,第二步是获取每行中的第一列. 第三步是将获取到的数据进行去重操作. 第四步是获取元素总个数
1
2
3
4 lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")
res = lines.map(lambda x:x.split(",")).map(lambda x: x[0]) //获取每行数据的第1列
distinct_res = res.distinct() //去重操作
distinct_res.count()//取元素总个数Q2-求课程总数:原理同上
1
2
3
4 lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")
res = lines.map(lambda x:x.split(",")).map(lambda x:x[1]) //获取每行数据的第2列
distinct_res = res.distinct()//去重操作
distinct_res.count()//取元素总个数Q3 - 求tom同学每门课程的平均分: 使用到的函数有map,filter, foreach, count, reduce
1
2
3
4
5
6
7
8
9
10
11 # 使用Map算子将数据的每一行都按照指定元素切开, 每行的数据变为了列表, 取列表中的第0个元素, 值为Tom
scoreRdd = fileRdd.map(lambda line: line.split(",")).filter(lambda list: list[0] == 'Tom')
# 上一步得到['Tom,DataBase,80', 'Tom,Algorithm,50', 'Tom,DataStructure,60']
# 通过map算子取每行数据的第三个数据, 并将其转为int类型
allScoreRdd = scoreRdd.map(lambda x: int(x[2]))
# 统计有几个int类型(也就是课程的个数
num = allScoreRdd.count()
# 通过Reduce算子将每个成绩进行聚合
scores = allScoreRdd.reduce(lambda x,y: x+y)
# 求平均成绩
print(scores/num)Q4 - 求每位同学的选课数量
1
2
3
4
5
6 stuRdd = fileRdd.map(lambda line: line.split(","))
# 得到['Tom', 'DataBase', '80'], ['Tom', 'Algorithm', '50'], ['Tom', 'DataStructure', '60']
infoRdd = stuRdd.map(lambda x: (x[0], 1))
# 得到[('Tom', 1), ('Tom', 1), ('Tom', 1), ('Jim', 1), ('Jim', 1), ('Jim', 1)] 有点词频统计的感觉
resRdd = infoRdd.reduceByKey(lambda x,y: x+y)
print(resRdd.collect())Q5 - 求该系DataBase课程共有多少人选修
1
2
3
4
5 listRdd = fileRdd.map(lambda x: x.split(",")).filter(lambda list: list[1] == 'DataBase')
# listRdd = [['Tom', 'DataBase', '80'], ['Jim', 'DataBase', '90']]
# 取每个学生信息的姓名(去重后再统计个数
disRdd = listRdd.map(lambda list:list[0])
print(disRdd.distinct().count())Q6 - 求各门课程的平均分是多少
1
2
3
4
5
6
7
8
9
10 infoListRdd = fileRdd.map(lambda line: line.split(","))
courseListRdd = infoListRdd.map(lambda list: (list[1],(int(list[2]), 1))) # ('ComputerNetwork', (44, 1))
# 按课程名聚合课程总分和选课人数。格式如('DataBase', (170, 2))
# reduceByKey算子分为两步: 1, 得到['Hadoop', [1,1,1,1]] 2, 得到 ['Hadoop', 5] 3, x是1,y是1,然后x是2,y是1
# 首先按照Key分组,Database计算这里的x是(80, 1), y是(90, 1),所以reduceByKey会得到('DataBase', (170, 2))
aggRdd = courseListRdd.reduceByKey(lambda x,y: (x[0]+y[0],x[1]+y[1]))
# 得到: [('DataBase', (170, 2)), ('Algorithm', (110, 2)), ('DataStructure', (140, 2))]
# 这里传入map算子的是个('DataBase', (170, 2)), 将得到的成绩总和 除以 总人数(保留两位小数)就得到了结果
resRdd = aggRdd.map(lambda x:(x[0], round(x[1][0]/x[1][1],2)))
print(resRdd.collect())Q7 - 求使用累加器计算共有多少人选了DataBase这门课
1
2
3
4
5 lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")
res = lines.map(lambda x:x.split(",")).filter(lambda x:x[1]=="DataBase")//筛选出选了DataBase课程的数据
accum = sc.accumulator(0) //定义一个从0开始的累加器accum
res.foreach(lambda x:accum.add(1))//遍历res,每扫描一条数据,累加器加1
accum.value //输出累加器的最终值
编写程序实现数据去重
对于两个输入文件A和B,编写Spark独立应用程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。下面是输入文件和输出文件的一个样例,供参考。
输入文件A的样例如下:
1 | 20170101 x |
输入文件B的样例如下:
1 | 20170101 y |
根据输入的文件A和B合并得到的输出文件C的样例如下:
1 | 20170101 x |
Answer
1 | from pyspark import SparkContext, SparkConf |
编写程序实现求平均值问题
每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。下面是输入文件和输出文件的一个样例,供参考。
Algorithm成绩:
1 | 小明 92 |
Database成绩:
1 | 小明 95 |
Python成绩:
1 | 小明 82 |
平均成绩如下:
1 | (小红,83.67) |
Answer
1 | from pyspark import SparkContext, SparkConf |