Anaconda

Anaconda是什么

1、Anaconda是一个工具包,里边包含了几百个开发工具,其中也包含Python
2、Anaconda还可以模拟多个虚拟环境,在该虚拟环境中可以安装不同版本的软件,多个虚拟环境彼此独立,以后你可以自由选择使用哪一个虚拟环境
3、你安装了Anaconda之后,自动会给你创建一个基础环境,名字为base

Anaconda常用命令

  • 查看当前服务器安装的所有虚拟环境 conda env list
  • 创建新的虚拟环境conda create -n 虚拟环境的名字 python=版本
  • 切换虚拟环境conda activate 虚拟环境名称
  • 退出虚拟环境-进入上一个虚拟环境conda deactivate
  • 删除某个虚拟环境conda remove -n 虚拟环境名称 —all
  • 查看虚拟环境中安装的软件包conda list
  • 卸载软件包conda uninstall 包名 或pip uninstall 包名

Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#查看当前服务器安装过哪些虚拟环境
conda env list

#创建新的虚拟环境
#--语法
conda create -n 虚拟环境名字 python=版本

#--示例
conda create -n bigdata01 python=3.7
conda create -n bigdata02 python=3.9
conda env list


#切换虚拟环境
#--语法
conda activate 虚拟环境名字
#--示例
conda activate bigdata01 #会发现自己创建的环境比base少了很多组件

#退出当前虚拟环境
conda deactivate #退出进入你上一次的虚拟环境


#删除某一个虚拟环境
#--语法
conda remove -n 虚拟环境名字 --all
#--示例
conda remove -n bigdata02 --all
conda env list


#如何在一个虚拟环境中安装软件包
#1、切换虚拟环境
conda activate bigdata01
#2、查看虚拟环境中安装的软件包
conda list

#3、如果没有设置镜像源,则可以在安装的时候,手动设置镜像源
pip install pandas==1.1.1 -i https://pypi.tuna.tsinghua.edu.cn/simple/
conda list

#4、卸载软件包
conda uninstall 包名
或者
pip uninstall 包名

SparkTemplete模板

创建模版

1
2
3
创建项目模板:File > Settings > Editor > File and Code Template > Python Script -->复制模板--->粘贴你的代码--->保存 

创建python文件,选择你的模板

讲解:

  • 下面代码需要导入Pyspark包,这个包以及运行代码的解释器都是在Linux中的Anaconda中配置的

  • 需要导入Pyspark包中的SparkContext及SparkConf,还有在后面设置JAVA_HOME等一系列环境变量

  • setMaster是集群的Master参数,setAppName位置填上运行任务的名称

    • 集群StandAlone下参数为spark://node1.itcast.cn:7077要用单引号引起来
    • 单机模式下的参数为local[CPU核数]
  • SparkContext中的参数是关键字参数,必须写conf=

Templete模板

1
2
3
4
5
6
7
8
9
10
11
from pyspark import SparkContext, SparkConf
import os

os.environ['JAVA_HOME'] = '/export/server/jdk'
os.environ['SPARK_HOME'] = '/export/server/spark' #Spark安装位置
os.environ["PYSPARK_PYTHON"] = "/export/server/anaconda3/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/export/server/anaconda3/bin/python3"

if __name__ == '__main__':
conf = SparkConf().setMaster('local[2]').setAppName('wordcount01')
sc = SparkContext(conf=conf)

Pycharm开发Spark

  • 首先删除Python中自带的解释器, 通过远端的解释器来解释本地写的Python文件
  • 其次配置远端的解释器(Anaconda中虚拟环境的解释器)

pCfLK2T.png

详情信息配置

pCfLqJ0.png

Spark本地模式

就是单机版

外部动态传参

1
2
3
4
5
6
7
8
9
10
1、写代码是,可以将输入和输出路径作为参数传入,而不是写死
2、如果要实现动态参数传入需要做以下操作:
#1、导入包
import sys

#2、在合适的地方写入参数
sys.argv[0] # python文件的名称,所以不作为第一个参数
sys.argv[1] # 表示程序真正传递到程序中的第1个参数
sys.argv[2] # 表示程序真正传递到程序中的第2个参数
……

pCfOo6O.png

SparkStandAlone模式

介绍

1、StandAlone模式是:Spark引擎 + 自带的资源调度平台(master + work)
2、StandAlone是集群模式,每一台机器都要安装

架构

1
2
3
4
5
6
1、StandAlone模式是一个主从架构
主节点:Master =等价于Yarn的ResourceManager
从节点:Worker =等价于Yarn的NodeManager
2、当一个Spark任务在StandAlone模式下执行时,会多出来一些组件
Executor进程 = 等价于Map进程和Reduce进程
Task线程 = 每一个Executor内部是由多个Task线程组成的
  • Master

    1
    2
    3
    - 接受客户端请求:所有程序的提交,都是提交给主节点
    - 管理从节点: 通过心跳机制检测所有的从节点的健康状态
    - 资源管理和任务调度:将所有从节点的资源在逻辑上合并为一个整体,将任务分配给不同的从节点
  • Worker

    1
    2
    - 使用自己所在节点的资源运行计算,计算时会启动Executor进程
    - 所有Task线程计算任务就运行在Executor进程中

集群启动

  • 在node1一键启动

    1
    2
    3
    4
    5
    cd /export/server/spark/sbin

    start-all.sh #启动Master和Worder

    start-history-server.sh #启动历史服务
  • 在node1一键关闭

    1
    2
    3
    4
    5
    cd /export/server/spark/sbin

    stop-all.sh

    stop-history-server.sh

网页验证

1
2
http://node1:8080/    = Master的信息查看页面
http://node1:18080/ = 历史任务查看页面

SparkSubmit提交

StandAlone方式提交

1
2
3
4
5
#1、将你的代码上传到/export/data目录
#2、在linux上执行以下命令
/export/server/spark/bin/spark-submit \
--master spark://node1.itcast.cn:7077 \
/export/data/05-WordCountStandAlone.py

本地模式提交

1
2
3
4
/export/server/spark/bin/spark-submit \
--master local[2] \
/export/server/spark/examples/src/main/python/pi.py \
100
  • 常见的Master参数有:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    #本地模式
    --master local[2] =手动指定cpu核数为2
    --master local[*] =你虚拟机有多少个虚拟核数,就用多少个虚拟核数

    #StandAlone模式
    #--单节点
    --master spark://node1.itcast.cn:7077
    #--高可用
    --master spark://node1.itcast.cn:7077,node2.itcast.cn:7077

    #Yarn模式
    --master yarn

    #Mesos模式
    --master mesos://node1:5050

    #K8s模式
    --master k8s://node1:7888
  • 还有一些重要的参数

    1
    2
    3
    4
    5
    6
    7
    --deploy-mode  任务的在集群运行模式(Client和Cluster)
    --name 指定任务的名字
    --num-executors 指定Executor个数
    --executor-cores 指定Executor的CPU核数
    --total-executor-cores 指定总的Executor核数
    --executor-memory Executor所占内存大小
    --driver-memory Driver所占内存大小

    WordCount案例

Linux文件版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from pyspark import SparkContext,SparkConf
import os

# spark入门案例 --- WordCount
# 1、设置环境变量
os.environ['JAVA_HOME'] = '/export/server/jdk'
os.environ['SPARK_HOME'] = '/export/server/spark' #Spark安装位置
os.environ["PYSPARK_PYTHON"]="/export/server/anaconda3/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"]="/export/server/anaconda3/bin/python3"

if __name__ == '__main__':
print("spark入门案例 --- WordCount")
#2、创建SparkContext类对象
# 2.1 设置我们的任务运行的配置信息,得到SparkConf对象
conf = SparkConf().setMaster('local[2]').setAppName('wordcount01')
# 2.2 根据SparkConf对象得到SparkContext
sc = SparkContext(conf=conf)

#3、实现wordcount案例
#3.1 将外部文件读取到rdd中
fileRdd = sc.textFile('/export/data/word.txt')
print(fileRdd.glom().collect())
# 将文件空行过滤
filterRdd = fileRdd.filter(lambda line: len(line) > 0)
print(filterRdd.glom().collect())
# 通过flatMap函数炸开每行中的元素
explodeRdd = filterRdd.flatMap(lambda line: line.split(" "))
print(explodeRdd.glom().collect())
# 通过Map函数给每个元素后面加上1, 变为元组的形式
reflectRdd = explodeRdd.map(lambda word: (word, 1))
print(reflectRdd.glom().collect())
# 对元素进行聚合操作, ReduceByKey涉及到suffle过程(需要和磁盘进行交互)
aggRdd = reflectRdd.reduceByKey(lambda x,y: x+y)
print(aggRdd.glom().collect())
# 将处理得到的数据写入Linux文件中
aggRdd.saveAsTextFile('file:///export/data/output')

HDFS版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from pyspark import SparkContext, SparkConf
import os

os.environ['JAVA_HOME'] = '/export/server/jdk'
os.environ['SPARK_HOME'] = '/export/server/spark' #Spark安装位置
os.environ["PYSPARK_PYTHON"] = "/export/server/anaconda3/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/export/server/anaconda3/bin/python3"

if __name__ == '__main__':
conf = SparkConf().setMaster('local[2]').setAppName('wordcount01')
sc = SparkContext(conf=conf)

# hdfs路径要加上 hdfs://node1:8020后面写hdfs的文件路径
fileRdd = sc.textFile("hdfs://node1:8020/datas/input/word.txt")
print(fileRdd.glom().collect())

# 也可以直接import sys 然后在参数的位置写argv[1], argv[2]分别是第一个第二个参数, argv[0]代表文件名称

# 链式编程
resRdd = fileRdd.filter(lambda line: len(line)>0).\
flatMap(lambda line: line.split(" ")).\
map(lambda word: (word, 1)).\
reduceByKey(lambda x,y: x+y).\
saveAsTextFile("hdfs://node1:8020/output/res")

"""
本地模式: 提交命令的方式为
/export/server/spark/bin/spark-submit \
--master local[2] \
py文件目录 \
参数
"""

StandAlone模式下的HDFS版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from pyspark import SparkContext, SparkConf
import os

os.environ['JAVA_HOME'] = '/export/server/jdk'
os.environ['SPARK_HOME'] = '/export/server/spark' #Spark安装位置
os.environ["PYSPARK_PYTHON"] = "/export/server/anaconda3/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/export/server/anaconda3/bin/python3"

if __name__ == '__main__':
conf = SparkConf().setMaster('spark://node1.itcast.cn:7077').setAppName('wordcount01')
sc = SparkContext(conf=conf)

"""
集群StandAlone的Python文件提交方式为:
/export/server/spark/bin/spark-submit \
--master spark://node1.itcast.cn:7077 \
py文件目录[比如: /export/server/spark/examples/src/main/python/pi.py] \
参数
"""
resRdd = sc.textFile("hdfs://node1:8020/datas/input/word.txt").\
filter(lambda line: len(line) > 0). \
flatMap(lambda line: line.split(" ")). \
map(lambda word: (word, 1)). \
reduceByKey(lambda x, y: x + y). \
saveAsTextFile("hdfs://node1:8020/datas/output/res")

SparkStandAlone的高可用

概述

1
2
1、高可用就是有两个master主节点,一个在node1上,一个在node2上
2、主节点和备用节点切换使用Zookeeper来实现

操作

关闭原集群

1
2
/export/server/spark/sbin/stop-all.sh
/export/server/spark/sbin/stop-history-server.sh

配置高可用

  • 1.修改配置文件

    1
    2
    cd /export/server/spark/conf/
    vim spark-env.sh
    1
    2
    3
    4
    #注释以下内容
    #SPARK_MASTER_HOST=node1.itcast.cn
    #添加
    SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node1:2181,node2:2181,node3:2181 -Dspark.deploy.zookeeper.dir=/spark-ha"
  • 2.分发

    1
    2
    3
    cd /export/server/spark/conf
    scp -r spark-env.sh node2:$PWD
    scp -r spark-env.sh node3:$PWD
  • 3.启动ZK:三台机器都要启动

    1
    /export/server/zookeeper/bin/zkServer.sh start
  • 4.启动Master

    • 第一台

      1
      /export/server/spark/sbin/start-master.sh
    • 第二台

      1
      /export/server/spark/sbin/start-master.sh
  • 5.启动Worker

    1
    /export/server/spark/sbin/start-workers.sh 
  • 6.测试主备切换

    1
    2
    3
    4
    5
    6
    7
    8
    9
    #1、查看网页
    http://192.168.88.100:8080/ #主
    http://192.168.88.101:8080/ #备

    #2、测试主备切换
    在node1上,kill原来的master
    kill -9 master的进程号

    查看 http://192.168.88.101:8080/ 页面的Status是否变成 ALIVE (可能需要等几分钟)