初识Spark

按照知识点的重要性由高到低的顺序来进行归纳

分布式和集群

这个东西一下让你说,你不一定能说的出来,概括一下吧

分布式: 强调的是将一个系统的资源由单机分散到多台机器上,一个字拆

强调多台机器做一样的事情

Spark模块

pCW7FVx.png

1、SparkCore:是Spark框架的核心,其他所有组件都基于SparkCore
2、SparkSQL : 使用Spark + SQL语言来对大数据进行离线分析
3、Spark Structed Streaming : Spark的实时部分,需要结合Kafka
4、MLlib:Spark的机器学习库
5:Graph:Spark的图计算

集群部署模式

用的最多的就是Spark On Yarn模式 : Spark计算引擎 + Yarn调度器

1
2
3
4
5
6
7
8
#1、本地模式(单机)
用于测试

#2、集群模式(多机)
了解:StandAlone模式 : Spark计算引擎 + Spark自带的资源调度器
重要:Spark On Yarn模式 : Spark计算引擎 + Yarn调度器
了解:Spark On Messos模式: Spark计算引擎 + Messos调度器
了解:Spark On K8s模式 : Spark计算引擎 + K8s平台调度器

Spark的特点和发展历史

  • 特点

1、Spark是基于内存计算的计算引擎
2、Spark既可以用用于离线分析,也可以用于实时分析
3、Spark理论上可以支持PB级别数据量

  • 发展历史

1、Spark是一个计算引擎,类似于之前学习的MapReduce和Presto
2、Spark是2009诞生于美国的加州大学伯克利分校
3、2013年Spark被捐献给Apache软件基金会
4、2017年Structured streaming发布,统一化实时与离线分布式计算平台
5、2018年Spark2.4.0发布,
6、2020年6月Spark发布3.0.0正式版

工作年限:3年起步,版本使用2.4.2

工作年限:2年起步,版本使用3.1.2

Spark和MapReduce的区别

pCWHFyj.png

1
2
3
4
5
6
7
8
9
10
#进程和线程区别
1、进程可以独立的占用和分配资源(内存和CPU,网络,IO)
2、一个进程可以创建多个线程,这些线程共享进程的资源(一个家长和多个未成年的孩子)
3、多个线程之间是可以竞争资源
4、线程不能独立存在,必须依赖创建它的线程


#Spark的磁盘操作问题
1、一个Spark的计算任务没有Shuffle,则都是基于内存操作
2、一个Spark的计算任务有Shuffle操作,则中间需要将临时结果保存到磁盘上,进行操作

调度: 进程是资源分配的基本单位, 线程是CPU调度的基本单位

并发性: 进程之间可以并发执行,多个线程之间也可以并发执行

拥有资源(地址空间):进程有自己独立的地址空间, 多个线程共享进程的地址空间.

包含关系: 一个线程包含多个进程

系统开销:在创建或撤消进程时的开销明显大于创建或撤消线程时的开销。

Spark环境的代码验证-单机版

1
2
3
4
5
6
7
8
#1、定义列表
list1 = [1,2,3,4,5,6,7,8,9,10]

#2、将列表中每一个元素转为它的平方,然后存放到list2列表
list2 = map(lambda x:x**2,list1)

#3、输出list2列表,理解为将list2中的每一个元素取出,转成字符串输出
print(*list2)

Spark环境的代码验证-分布式

  • 使用glom函数可以查看数据的分布情况
  • parallelize函数相当于将数据进行分布式存储了,存储到多个机器的内存中
  • foreach函数是遍历,括号里面写的是函数lambda x: print(x)
  • collect()函数相当于将分布的数据收回,但是收回的顺序可能会出现多种情况,因为数据到达的先后顺序不一样
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#1、定义列表
list1 = [1,2,3,4,5,6,7,8,9,10]

#2、将本地列表转为RDD(弹性分布式数据集),
inputRdd = sc.parallelize(list1)

#3、输出inputRdd每一个分区的数据
print(inputRdd.collect())
print(inputRdd.glom().collect())

#4、将RDD中每一个分区的数据转为它的平方
rsRdd = inputRdd.map(lambda x: x**2)

#5、遍历rsRDD
print(inputRdd.glom().collect()) #方式1 - 打印
rsRdd.foreach(lambda x: print(x)) #方式2 - 遍历(结果可能乱序)

WordCount词频统计

数据准备

1
2
3
4
5
6
7
8
9
10
11
12
13
14
vim /export/data/word.txt  

粘贴以下内容

hadoop spark
hive hadoop spark spark
hue hbase hbase hue hue
hadoop spark
hive hadoop spark spark
hue hbase hbase hue hue
hadoop spark
hive hadoop spark spark
hue hbase hbase hue hue
hadoop spark

当我用五行代码写完就知道这只是个入门案例

  • 从Linux本地中读取文件使用file://协议, 后面跟上Linux的路径,使用SparkContext简写sc.textFile函数
  • 对文件中的空行进行过滤使用(可以叫rdd算子)filter函数
  • 将读取到的数据RDD1进行分割(按照空格分割)使用flatMap函数
  • 对两个RDD中的数据中的每个元素加上括号,并写成元组的形式使用Map函数
  • 对RDD中的元素进行聚合使用reduceByKey函数
    • reduceByKey函数实际上是分为两步, 第一步得到(‘Hadoop’, [1, 1, 1, 1, 1])
    • 然后将后面的1全部按照x+y的方式相加
1
2
3
4
5
6
7
8
fileRdd = sc.textFile('file:///export/data/word.txt')
filterRdd = fileRdd.filter(lambda line: len(line)>0)
mapRdd = filterRdd.flatMap(lambda x: x.split(" "))
tupleRdd = mapRdd.map(lambda word: (word, 1))
reduceRdd = tupleRdd.reduceByKey(lambda x,y: x+y)
print(reduceRdd.collect())
# 将结果保存到最终的文件,目标目录不能存在
reduceRdd.saveAsTextFile("file:///export/data/output")

链式编程

1
2
3
4
5
6
7
8
9
10
# 读取数据
inputRdd = sc.textFile("file:///export/data/word.txt")
# 转换数据
rsRdd = inputRdd \
.filter(lambda line : len(line.strip()) > 0) \
.flatMap(lambda line : line.strip().split(r" ")) \
.map(lambda word : (word,1)) \
.reduceByKey(lambda tmp,item : tmp+item)
# 保存结果
rsRdd.saveAsTextFile("file:///export/data/wcoutput2")
1
2
3
4
5
6
7
# 另外一种写法
sc.textFile("file:///export/data/word.txt"). \
filter(lambda line : len(line)>0). \
flatMap(lambda line:line.split(" ")). \
map(lambda word: (word,1)). \
reduceByKey(lambda x,y:x+y). \
saveAsTextFile("file:///export/data/output3")

ReduceByKey算子的计算流程

pCWqFZn.png

WordCount全流程

[01-Word-Count.png](https://postimg.cc/SjVtSx14)

Spark提交Python文件

介绍

1
在实际的开发中,我们一般是在Pycharm中编写Spark代码,最后都是以py文件呈现,如果将写好的py文件提交给Spark执行,此时Spark提供一个spark-submit命令

使用方式

1
2
3
4
/export/server/spark/bin/spark-submit \
--master local[2] \
/export/server/spark/examples/src/main/python/pi.py \
100