【SparkCore】RDD算子
Retrospect
转换算子和行为算子的区别
- 1,会不会触发job任务的执行: 转换算子不会触发job的执行,行为算子会
- 2, 生成新的RDD: 转换算子会从原有的RDD中产生新的RDD
网络日志案例
我一直保持只有做题才能真正检验你自己的水平, 哪怕你平时不听课,但老师布置的作业,以及上课的题目你都会.那就OK.
下面是一个案例, 能独立做出来也是对自己的一个检验罢.
PV,UV概念
pv:网页访问量,每访问一个页面,则就算一个pv
uv:独立访客数,每来一个不同的用户,就算一个uv
举例
1 | 1001 2023-07-15 08:15 a.html |
项目需求
数据模型
1 | user_id time |
需求1:统计每天的PV,并按照日期升序排序
需求2:统计每天的UV,并按照UV个数降序排序
具体思路
- 首先得到通用RDD
- 通用RDD的格式为(user_id, time)
- 注意time是2014-12-12(年月日)这种格式, 要通过字符串切片求出
- 求PV
- 思路: 通过map通用RDD得到(time, 1), 通过reduceByKey得到(time, pv)
- 去重求UV
- 思路: 在通用RDD的基础上使用distinct对数据去重,剩下的和求PV思路一模一样
CodeDemo
1 | # 用户id, |
Transformation转换算子
常见的转换算子filter, map, flatMap
1 | fileRdd = sc.textFile("file:///export/data/word.txt") |
其他转换算子(union, distinct算子)
1 | list1 = [1, 2, 3, 4, 5, 6] |
分组聚合算子(groupByKey, reduceByKey)
1 | list1 = [ |
排序算子(sortBy, sortByKey)
对RDD中所有元素进行整体排序, 可以指定排序规则, 一般是sortBy使用的比较多
1 | fileRdd = sc.textFile('file:///export/data/sort.txt') |
TopN算子:top、takeOrdered
top算子会自动将数据进行降序排序, 取最大的n个数, takeOrdered会将数据进行升序排序, 取最小的前n个)
- 将RDD集合中的元素降序排序, 并返回前N个元素(最大的)
top(N)
- 将RDD集合中的元素降序排序, 并返回前N个元素(最小的)
takeOrderd()
1 | list1 = [5, 1, 3, 4, 2] |
重分区算子:repartition、coalesce
repartition是增加分区个数的-必须有shuffle操作,要不会有空分区, coalesce是减少分区个数的
1 | list1 = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] |
<补充>聚合算子fold/foldByKey算子
讲真,这两个算子有些鸡肋.
功能:将RDD中的每个元素按照给定的聚合函数进行聚合,返回聚合的结果
语法:
1
2 def fold(self, zeroValue: T, f:(T,T) -> T) -> T
def foldByKey(self, zeroValue, func, numPartitions=None, partitionFunc=portable_hash)zeroValue就是初始值.
特点:有初始值,每个分区在计算时都会计算上初始值,分区间计算时也会计算1次初始值
- 对于fold算子: 初始值一共会被聚合N+1次,N代表分区数
- 如果zeroValue为0, 则等价于reduce算子
- 对于foldByKey算子: 初始值一共会被聚合N次,N代表分区数, 分区之间累加不再使用zeroValue
- 如果zeroValue为0, 则等价于reduceByKey算子
CodeDemo
1 | list = [1, 2, 3, 4, 5, 6] |
<补充>KV类型算子
keys
- 针对二元组KV类型的RDD,返回RDD中所有的Key,放入一个新的RDD中
values
- 针对二元组KV类型的RDD,返回RDD中所有的Value,放入一个新的RDD中
mapvalues
- 针对二元组KV类型的RDD,对RDD中每个元素的Value进行map处理,结果放入一个新的RDD中
collectAsMap
- 将二元组类型的RDD转换成一个Dict字典
DemoCode
1 | list1 = [("a", 1), ("b", 1), ("a", 1)] |
<补充>Join关联算子
算子: join / fullOuterJoin / leftOuterJoin / rightOuterJoin
功能: 实现两个==KV类型==的RDD之间==按照K==实现关联,将两个RDD的关联结果放入一个新的RDD中
语法:
1 | def join(self: RDD[Tuple[K,V]], otherRdd: RDD[Tuple[K,W]]) -> RDD[Tuple[K,(V,W)]] |
CodeDemo
1 | rdd_singer_age = sc.\ |
<补充>分区处理算子
两个算子: 分别是mapPratitions和foreachPartition
这里引入了一个新的问题:
1
2
3
4
5
6
7
8
9
10
11
12 #1、定义列表,并转为rdd1
rdd1 = sc.parallelize([1,2,3,4,5,6],2)
#2、对rdd数据进行处理(让每个元素转为它的2倍)
rdd2 = rdd1.map(lambda x:x*2)
# 定义将元素写入数据库中的函数
def write_to_sql(x):
print('----------------------------')
print('1-创建连接')
print(f'2-写入{x}元素到mysql')
print('3-关闭连接')
#3、使用foreach将转换好的结果写入MySQL
rdd2.foreach(lambda x: write_to_sql(x))
- 上面的写入数据库的方式可想而知, 每次写入一个数据就会创建游标, 创建连接,然后再关闭游标,关闭连接.极大地浪费了资源, 因为跟MySQL服务器交互是靠TCP协议进行通信的, 需要经过3次握手, 四次挥手
- 解决方式: 基于分区来操作,每次读取一个分区代替每次读取一个元素,每个分区构建一个MySQL连接代替每个元素构建一个连接
mapPratitions
- 功能:对RDD每个分区的数据进行操作,将每个分区的数据进行map转换,将转换的结果放入新的RDD中
- foreachParition
- 功能:对RDD每个分区的数据进行操作,将整个分区的数据加载到内存进行foreach处理,没有返回值
使用foreachParition将转换好的结果写入MySQL
CodeDemo
1 | def write_to_mysql2(x): |
Action行为算子
常用触发算子:count/foreach/saveAsTextFile
count, foreach, saveAsTextFile, first, take, collect, reduce
- 统计RDD中有多少个元素count
- 遍历RDD中的元素: 除了foreach还可以使用print(*rdd1)
- 将RDD中的数据写入到文件中saveAsTextFile, 同时在写入的时候还可以指定分区的数量如: resRdd.repartition(1).saveAsTextFile
- saveAsTextFile
- 1、该算子将rdd的结果存入文件
- 2、写入文件的文件个数由分区数来决定
其他触发算子:first/take/collect/reduce
- 返回RDD集合中的第一个元素
first()
- 返回RDD集合中的前三个个元素
take(3)
- 将集合中的元素进行累加
reduce(lambda x,y: x+y)
但是要注意reduce是先在分区内累加,再在分区间累加
1 | list = [1, 2, 3, 4, 5, 6] |
最值, 平均值算子max/min/mean
求出RDD中每个元素的最大值, 最小值 以及平均值
1 | # 需求2: 求用户最大,最小平均搜索次数 (用户id, 访问次数) |
补充
为了多次写入一个文件出现报错的情况,需要写.
安装方式: pip install hdfs
导包: from hdfs.client import Client
1 | client = Client('http://node1:9870/') |