Retrospect

转换算子和行为算子的区别

  • 1,会不会触发job任务的执行: 转换算子不会触发job的执行,行为算子会
  • 2, 生成新的RDD: 转换算子会从原有的RDD中产生新的RDD

网络日志案例

我一直保持只有做题才能真正检验你自己的水平, 哪怕你平时不听课,但老师布置的作业,以及上课的题目你都会.那就OK.

下面是一个案例, 能独立做出来也是对自己的一个检验罢.

PV,UV概念

pv:网页访问量,每访问一个页面,则就算一个pv

uv:独立访客数,每来一个不同的用户,就算一个uv

举例

1
2
3
4
5
6
7
8
9
10
11
12
1001  2023-07-15 08:15  a.html
1001 2023-07-15 09:15 b.html
1001 2023-07-15 10:15 c.html
1001 2023-07-15 11:15 d.html

1002 2023-07-15 08:15 a.html
1002 2023-07-15 09:15 b.html
1002 2023-07-15 10:15 c.html
1002 2023-07-15 11:15 d.html

#2023-07-15这天的pv值是多少? 8 一行就是一个pv
#2023-07-15这天的uv值是多少? 2 一个用户id/ip就是一个uv

项目需求

数据模型

1
2
3
4
user_id                      time
56708793,38838972,1,,12103,2014-12-12 08
74163297,41655821,3,,8796,2014-12-14 22
62809951,10095456,1,,5712,2014-11-22 15

需求1:统计每天的PV,并按照日期升序排序
需求2:统计每天的UV,并按照UV个数降序排序

具体思路

  • 首先得到通用RDD
    • 通用RDD的格式为(user_id, time)
    • 注意time是2014-12-12(年月日)这种格式, 要通过字符串切片求出
  • 求PV
    • 思路: 通过map通用RDD得到(time, 1), 通过reduceByKey得到(time, pv)
  • 去重求UV
    • 思路: 在通用RDD的基础上使用distinct对数据去重,剩下的和求PV思路一模一样

CodeDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 用户id,
# '3607854,95585882,1,,6513,2014-11-28 06', '75888387,278602871,1,,5894,2014-11-25 18'
fileRdd = sc.textFile('hdfs://node1:8020/datas/input/tianchi_user.csv')
print(fileRdd.take(3))
# 得到通用RDD即为(用户id, 时间)
commRdd = fileRdd.map(lambda line: (line.split(",")[0], line.split(",")[5][0:10]))
print(commRdd.take(3))


# 求PV - Excepted: (日期, 数量)
mapRdd = commRdd.map(lambda list: (list[1], 1))
reduceRdd = mapRdd.reduceByKey(lambda x,y: x+y).coalesce(1).sortByKey(ascending=False)
reduceRdd.foreach(lambda x:print(x))

print(10 * '-')

# 求UV - Excepted: (日期, 数量)
disRdd = commRdd.distinct()
uvmapRdd = disRdd.map(lambda list: (list[1], 1))
resRdd = uvmapRdd.reduceByKey(lambda x,y: x+y).coalesce(1).sortByKey(ascending=False)
print(resRdd.collect())

Transformation转换算子

常见的转换算子filter, map, flatMap

1
2
3
4
fileRdd = sc.textFile("file:///export/data/word.txt")
rdd1 = fileRdd.filter(lambda line: len(line) > 0) # 使用filter对数据进行过滤,满足条件的是保存下来的
rdd2 = rdd1.flatMap(lambda line: line.split(" ")) # 使用flatMap算子将数据进行炸开
rdd3 = rdd2.map(lambda x: (x, 1)) # 对rdd中的每个元素处理,处理后的元素放入一个新的rdd中

其他转换算子(union, distinct算子)

1
2
3
4
5
6
list1 = [1, 2, 3, 4, 5, 6]
list2 = [5, 6, 7, 8, 9, 10]
list1Rdd = sc.parallelize(list1)
list2Rdd = sc.parallelize(list2)
resRdd = list1Rdd.union(list2Rdd) # 通过union算子将两个rdd中的元素合并, 但是没有去重
print(resRdd.glom().collect())

分组聚合算子(groupByKey, reduceByKey)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
list1 = [
('hadoop', 1),
('hadoop', 1), # ---> ('hadoop', 1, 1, 1)
('hadoop', 1),
('hdfs', 1),
('hdfs', 1), # ---> ('hdfs', 1, 1)
('spark', 1) # ---> ('spark', 1)
]
# 2、将列表转为RDD
rdd1 = sc.parallelize(list1, 3)
print(rdd1.glom().collect())
# 3、使用groupByKey进行分组聚合
rdd2 = rdd1.groupByKey(2)
print(rdd2.glom().collect())
# 4, 通过*遍历可迭代容器中的元素
demo = rdd2.map(lambda x: (x[0], *x[1]))
print(demo.collect())

# reduceByKey 就是在第一步聚合的基础上再算出结果,具体见词频统计.(reduceByKey会提前做预聚合, 而groupByKey不会,所以效率会慢
# 先变为('hadoop', 1, 1, 1), 然后通过lambda x,y: x+y 计算出具体结果

排序算子(sortBy, sortByKey)

对RDD中所有元素进行整体排序, 可以指定排序规则, 一般是sortBy使用的比较多

1
2
3
4
5
6
7
8
fileRdd = sc.textFile('file:///export/data/sort.txt')
resRdd = fileRdd.sortBy(lambda line: line.split(",")[1], ascending=False)
print(resRdd.collect())
list1 = [(28, '张三'), (21, '李四'), (25, '王五'), (31, '赵六'), (18, '周七')]
# 将列表转为RDD
rdd1 = sc.parallelize(list1)
rdd2 = rdd1.sortByKey(ascending=False)
print(rdd2.collect())

TopN算子:top、takeOrdered

top算子会自动将数据进行降序排序, 取最大的n个数, takeOrdered会将数据进行升序排序, 取最小的前n个)

  • 将RDD集合中的元素降序排序, 并返回前N个元素(最大的)top(N)
  • 将RDD集合中的元素降序排序, 并返回前N个元素(最小的)takeOrderd()
1
2
3
4
5
list1 = [5, 1, 3, 4, 2]
rdd = sc.parallelize(list1)
rdd1 = rdd.top(1)
rdd2 = rdd.takeOrdered(1)
print(rdd1, rdd2)

重分区算子:repartition、coalesce

repartition是增加分区个数的-必须有shuffle操作,要不会有空分区, coalesce是减少分区个数的

1
2
3
4
5
6
list1 = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
listRdd = sc.parallelize(list1, 2)
res = listRdd.repartition(5).distinct() # 增加分区,为了不产生空分区,必须重新shuffle洗牌
print(res.glom().collect())
res1 = res.coalesce(2) # 减少分区的数量到2个, 可以不重新shuffle
print(res1.glom().collect())

<补充>聚合算子fold/foldByKey算子

讲真,这两个算子有些鸡肋.

功能:将RDD中的每个元素按照给定的聚合函数进行聚合,返回聚合的结果

语法:

1
2
def fold(self, zeroValue: T, f:(T,T) -> T) -> T
def foldByKey(self, zeroValue, func, numPartitions=None, partitionFunc=portable_hash)

zeroValue就是初始值.

特点:有初始值,每个分区在计算时都会计算上初始值,分区间计算时也会计算1次初始值

  • 对于fold算子: 初始值一共会被聚合N+1次,N代表分区数
    • 如果zeroValue为0, 则等价于reduce算子
  • 对于foldByKey算子: 初始值一共会被聚合N次,N代表分区数, 分区之间累加不再使用zeroValue
    • 如果zeroValue为0, 则等价于reduceByKey算子

CodeDemo

1
2
3
4
5
6
7
8
9
10
11
12
list = [1, 2, 3, 4, 5, 6]                                                                            
rdd1 = sc.parallelize(list)
print(rdd1.glom().collect())
rdd2 = rdd1.fold(1, lambda x,y: x+y)
# 如果初始值为0, 则fold等价于reduce.即是将所有的的元素通过lambda函数元素加起来
# 首先在分区内累加, 将zeroValue和分区内部的值加起来, 每个分区都要加上初始值zeroValue, 然后再分区间累加(同样加上初始值
print(rdd2)
list1 = [("a", 1), ("b", 1), ("a", 1)]
rdd3 = sc.parallelize(list1)
print(rdd3.glom().collect())
foldByKeyRdd = rdd3.foldByKey(0, lambda x,y: x+y)
print(foldByKeyRdd.collect())

<补充>KV类型算子

  • keys

    • 针对二元组KV类型的RDD,返回RDD中所有的Key,放入一个新的RDD中
  • values

    • 针对二元组KV类型的RDD,返回RDD中所有的Value,放入一个新的RDD中
  • mapvalues

    • 针对二元组KV类型的RDD,对RDD中每个元素的Value进行map处理,结果放入一个新的RDD中
  • collectAsMap

    • 将二元组类型的RDD转换成一个Dict字典

DemoCode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
list1 = [("a", 1), ("b", 1), ("a", 1)]
# 将列表转为RDD
rdd3 = sc.parallelize(list1)
# 查看RDD的每个分区的每个元素的键
print(rdd3.keys().collect())
# 查看RDD的每个分区的每个元素的值
print(rdd3.values().collect())
# 对RDD的分区中元素的值进行操作,每个值都乘以2
rdd4 = rdd3.mapValues(lambda x: x*2)
# 查看mapValues的结果
print(rdd4.glom().collect())
# 将RDD中的键值转变为Python中的字典类型
rdd5_dict = rdd3.collectAsMap()
print(f"rdd5_dict的类型是{type(rdd5_dict)}, 值为{rdd5_dict}")

<补充>Join关联算子

  • 算子: join / fullOuterJoin / leftOuterJoin / rightOuterJoin

  • 功能: 实现两个==KV类型==的RDD之间==按照K==实现关联,将两个RDD的关联结果放入一个新的RDD中

  • 语法:

1
def join(self: RDD[Tuple[K,V]], otherRdd: RDD[Tuple[K,W]]) -> RDD[Tuple[K,(V,W)]]

CodeDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
rdd_singer_age = sc.\
parallelize([("周杰伦", 43), ("陈奕迅", 47), ("蔡依林", 41), ("林子祥", 74), ("陈升", 63)], numSlices=2)
rdd_singer_music = sc.\
parallelize([("周杰伦", "青花瓷"), ("陈奕迅", "孤勇者"), ("蔡依林", "日不落"), ("林子祥", "男儿当自强"), ("动力火车", "当")],numSlices=2)
# 将两个RDD中的数据做join(按照键来进行关联, 只有键值相同的才会join上, 类型大概为('周杰伦', (43, '青花瓷'))
rdd_join = rdd_singer_age.join(rdd_singer_music)
rdd_join.foreach(lambda x:print(x))
print('*' * 10)
# 左外连接, 以左表为主
rdd_left_join = rdd_singer_age.leftOuterJoin(rdd_singer_music)
rdd_left_join.foreach(lambda x: print(x))
# fullOutJoin是满外连接, 都是类似于sql
rdd_full_outer_join = rdd_singer_age.fullOuterJoin(rdd_singer_music)
rdd_full_outer_join.foreach(lambda x: print(x))

<补充>分区处理算子

两个算子: 分别是mapPratitions和foreachPartition

这里引入了一个新的问题:

1
2
3
4
5
6
7
8
9
10
11
12
#1、定义列表,并转为rdd1
rdd1 = sc.parallelize([1,2,3,4,5,6],2)
#2、对rdd数据进行处理(让每个元素转为它的2倍)
rdd2 = rdd1.map(lambda x:x*2)
# 定义将元素写入数据库中的函数
def write_to_sql(x):
print('----------------------------')
print('1-创建连接')
print(f'2-写入{x}元素到mysql')
print('3-关闭连接')
#3、使用foreach将转换好的结果写入MySQL
rdd2.foreach(lambda x: write_to_sql(x))
  • 上面的写入数据库的方式可想而知, 每次写入一个数据就会创建游标, 创建连接,然后再关闭游标,关闭连接.极大地浪费了资源, 因为跟MySQL服务器交互是靠TCP协议进行通信的, 需要经过3次握手, 四次挥手
  • 解决方式: 基于分区来操作,每次读取一个分区代替每次读取一个元素,每个分区构建一个MySQL连接代替每个元素构建一个连接
  • mapPratitions
    • 功能:对RDD每个分区的数据进行操作,将每个分区的数据进行map转换,将转换的结果放入新的RDD中
  • foreachParition
    • 功能:对RDD每个分区的数据进行操作,将整个分区的数据加载到内存进行foreach处理,没有返回值

使用foreachParition将转换好的结果写入MySQL

CodeDemo

1
2
3
4
5
6
def write_to_mysql2(x):
print('1-创建连接')
for i in x:
print(f'2-写入{i}元素到mysql')
print('3-关闭连接')
rdd2.foreachPartition(lambda x:write_to_mysql2(x)) #x代表rdd中的每个分区

Action行为算子

常用触发算子:count/foreach/saveAsTextFile

count, foreach, saveAsTextFile, first, take, collect, reduce

  • 统计RDD中有多少个元素count
  • 遍历RDD中的元素: 除了foreach还可以使用print(*rdd1)
  • 将RDD中的数据写入到文件中saveAsTextFile, 同时在写入的时候还可以指定分区的数量如: resRdd.repartition(1).saveAsTextFile
  • saveAsTextFile
    • 1、该算子将rdd的结果存入文件
    • 2、写入文件的文件个数由分区数来决定

其他触发算子:first/take/collect/reduce

  • 返回RDD集合中的第一个元素first()
  • 返回RDD集合中的前三个个元素take(3)
  • 将集合中的元素进行累加reduce(lambda x,y: x+y)但是要注意reduce是先在分区内累加,再在分区间累加
1
2
3
4
5
6
7
8
list = [1, 2, 3, 4, 5, 6]
rdd1 = sc.parallelize(list, 2)
print(rdd1.count())
rdd1.foreach(lambda x: print(x))
print("第一个元素是:",rdd1.first()) # 返回RDD集合中的第一个元素
print("前三个元素是:",rdd1.take(3)) # 注意, take返回的值要放在Driver的内存中,take的数据量不能过大(超过1g就算
print("全部元素是:",rdd1.glom().collect()) # collect的数据量不能过大,否则Driver内存溢出
print("所有元素的累加和为:",rdd1.reduce(lambda x,y: x+y))

最值, 平均值算子max/min/mean

求出RDD中每个元素的最大值, 最小值 以及平均值

1
2
3
4
5
6
7
8
# 需求2: 求用户最大,最小平均搜索次数  (用户id, 访问次数)
need2Rdd = commonRdd.\
map(lambda list: (list[1], 1)).\
reduceByKey(lambda x,y: x+y).\
values()
print("最大访问次数", need2Rdd.max())
print("最小访问次数", need2Rdd.min())
print("平均访问次数", need2Rdd.mean())

补充

为了多次写入一个文件出现报错的情况,需要写.

安装方式: pip install hdfs

导包: from hdfs.client import Client

1
2
3
4
client = Client('http://node1:9870/')                
client.delete('data/output', True)
# 为了防止文件写入报错
rdd.saveAsTextFile('hdfs://node1:8020/datas/output')