这里写一些SQL中的难点

  • 炸裂函数

    •   spark.sql(
            """
            with t2 as(
                select explode(split(value,' ')) as word from t1 where length (value) > 0
            )
            select word, count(*) cnt from t2 group by word order by cnt
            """
        ).show()  # Explode函数里面的值必须为map类型或者array类型, 也就是列表
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      - <font size=4 color=red face='华文楷体'>开窗函数</font>

      - SQL风格

      - ```python
      # SQL风格 - 创建临时视图, 写SQL
      emp_df.createOrReplaceTempView('emp')
      dept_df.createOrReplaceTempView('dept')

      spark.sql("""
      with tmp as(
      select
      emp_id,
      emp_name,
      salary,
      bonus,
      e.dep_id,
      d.dep_name,
      dense_rank() over(partition by e.dep_id order by salary desc) as dr
      from emp e join dept d on e.dep_id = d.dep_id
      )
      select
      *
      from tmp where dr <= 2
      """).show()
    • DSL 风格 - 不需要创建视图

1
2
3
4
5
6
7
8
# DSL 风格 - 不需要创建视图
dept_df\
.join(emp_df, 'dep_id', 'inner')\
.withColumn(
'rk',
dense_rank().over(Window.partitionBy('dep_id').orderBy(col('salary').desc()))
)\
.where('rk <= 2').show()

电影评分案例

源数据

  • 电影评分数据:datas/movie/ratings.dat【用户id、电影id、评分、评分时间】
1
2
3
4
5
6
7
8
9
10
11
1::1193::5::978300760
1::661::3::
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
1::1197::3::978302268
1::1287::5::978302039
1::2804::5::978300719
1::594::4::978302268
1::919::4::978301368
1::595::5::978824268
  • 电影信息数据:datas/movie/movies.dat【电影id、电影名称、分类】
1
2
3
4
5
6
7
8
9
1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children's
9::Sudden Death (1995)::Action

需求

统计评分次数大于2000的所有电影中平均评分最高的Top10,结果显示电影名称、电影平均评分、电影评分次数

思路

  • 1、将电影评分文件转为movie_rdd
  • 2、将movie_rdd转为movie_df
  • 3、将电影信息文件转为rate_rdd
  • 4、将rate_rdd转为rate_df

SQL风格

  • 5、将rate_df注册为临时视图rate
  • 6、将movie_df注册为临时视图movie
  • 7、编写SparkSQL,将rate和movie进行join操作
    • KeyWords: 内连接, CTE表达式

DSL风格

  • 7、编写SparkSQL,将rate和movie进行join操作

CodeDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import time

from pyspark import SparkContext, SparkConf
import os
import sys
from pyspark.sql import SparkSession

# 1、设置环境变量
from pyspark.sql.functions import *

os.environ['JAVA_HOME'] = '/export/server/jdk'
os.environ['SPARK_HOME'] = '/export/server/spark' # Spark安装位置
os.environ["PYSPARK_PYTHON"] = "/export/server/anaconda3/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/export/server/anaconda3/bin/python3"

if __name__ == '__main__':
print("WordCount")

# 1、在Python代码中入口类对象:SparkSession
"""
.appName:应用的名字
config("spark.sql.shuffle.partitions", 2) spark经过shuffle之后重新分配的分区数,模式是200个
getOrCreate() :如果代码前文已经创建了该对象,则获取,否则创建
"""
spark = SparkSession \
.builder \
.master('local[2]') \
.appName("wordcount_1") \
.config("spark.sql.shuffle.partitions", 2) \
.getOrCreate()


# 获取SparkContext
sc:SparkContext = spark.sparkContext
# 需求: 统计评分次数大于2000的所有电影中平均评分最高的Top10,结果显示电影名称、电影平均评分、电影评分次数
# 思路: 首先读取文件(两个), 创建RDD, 将RDD转换为DataFrame, 在DataFrame中写sql, 清洗转换, 再将DataFrame转为RDD, 写出

# 这个spark.read.text 读取文件直接生成了一个DataFrame
# rate_df = spark.read.text('hdfs://node1:8020/datas/spark/input/movie/ratings.dat')
# movie_df = spark.read.text('hdfs://node1:8020/datas/spark/input/movie/movies.dat')
movie_rdd = sc\
.textFile('hdfs://node1:8020/datas/spark/input/movie/movies.dat')\
.map(lambda line: (line.split("::")[0], line.split("::")[1], line.split("::")[2]))

rate_rdd = sc\
.textFile('hdfs://node1:8020/datas/spark/input/movie/ratings.dat')\
.map(lambda x: (x.split("::")[0], x.split("::")[1], float(x.split("::")[2]), int(x.split("::")[3])))

# 数据处理(清洗转换), 将rdd在转换为DataFrame [movie_rdd 电影id、电影名称、分类]
movie_df: DataFrame = movie_rdd.toDF(['movie_id','movie_name','category'])
movie_df.show()
# 用户id、电影id、评分、评分时间
rate_df: DataFrame = rate_rdd.toDF(['user_id','movie_id','rate','ts'])
rate_df.printSchema()

# 创建两张DataFrame的表视图
movie_df.createOrReplaceTempView('movie')
rate_df.createOrReplaceTempView('rate')

# 将两张表做join(关联字段为movie_id) SQL风格 以及 DSL风格
# 需求: 统计评分次数大于2000的所有电影中平均评分最高的Top10,结果显示电影名称、电影平均评分、电影评分次数
spark.sql("""
with tmp as (
select
movie_name,
count(*) as cnt,
round(avg(rate), 2) as avg
from movie t1 join rate t2 on t1.movie_id=t2.movie_id
group by movie_name
)
select * from tmp where cnt > 2000 order by avg desc limit 10
""").show()

# # 停止SparkSession
# spark.stop()

# DSL 风格是不需要临时视图的(rate_df, movie_df
rate_df\
.join(movie_df,'movie_id','inner')\
.groupBy(col('movie_name'))\
.agg(
round(count('*'), 2).alias('cnt'),
avg('rate').alias('avg')
)\
.where('cnt > 2000')\
.orderBy('avg', ascending=False)\
.limit(10).show()

将RDD转为DataFrame

1
注意: 以后直接用的就是将数据转为DataFrame, 没有先转为RDD这一步操作了, 这里是多看看, 多了解一下.

读取各种文件并转为DataFrame的方式

三种方式将RDD转为DataFrame, 遵循公式(RDD + Schema = DataFrame)

  • 1, 自动推断: RDD中每条数据必须为Row类型,Spark就可以实现自动推断

    • 将RDD变为Row对象

      • movie_rdd_row = fileRdd.map(
          lambda item: Row(user_id = item[0],movieid=item[1], rate=float(item[2]), ts=int(item[3])))
        
        1
        2
        3
        4
        5
        6


        - 将RDD转换成DF

        - ```python
        movie_df = spark.createDataFrame(movie_rdd_row)
  • 2, 自定义Schema转换DataFrame

    • 先构建元组类型的RDD

      • movie_rdd_tuple = fileRdd.map(lambda item: (item[0], item[1], float(item[2]), int(item[3])))
        
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11


        - 自定义一个Schema对象

        - ```python
        movie_schema = StructType([
        StructField(name="userid", dataType=StringType(), nullable=True),
        StructField(name="movieid", dataType=StringType(), nullable=True),
        StructField(name="rate", dataType=DoubleType(), nullable=True),
        StructField(name="ts", dataType=LongType(), nullable=True)
        ])
  • 3, 指定列名称转换DataFrame - 直接调用rdd中的toDF函数就ok

    • 先构建元组类型的RDD

      • movie_rdd_list = fileRdd.map(lambda item: (item[0], item[1], float(item[2]), int(item[3])))
        
        1
        2
        3
        4
        5
        6


        - 将RDD转换成DataFrame

        - ```python
        movie_df = movie_rdd_list.toDF(["userid", "movieid", "rate", "ts"])

将DataFrame转为RDD

直接DataFrame.rdd就转为rdd了

Spark读取外部文件的方式

读取外部文件转为不同的数据类型

  • 读取外部文件转为RDD
    • fileRdd = sc.textFile(‘文件的路径’)
  • 读取外部文件转为DataFrame
    • demo_df = spark.read.text(‘文件的路径’)

读取外部文件的类型

1
2
3
4
5
6
7
text文件  -- text
csv文件 -- csv
json文件 -- json
parque文件 -- parque
orc文件 -- orc
mysql文件
hive文件

读取方式

三种读取方式

  • 方式1: df = spark.read.文件类型(上述)(path)
    • 举例: dataFrame1 = spark.read.text(path)
    • 这里指定表结构需要加在read后面指定spark.read.schema(‘列名1 类型, 列名2 类型’).文件类型(path)
  • 方式2: df = spark.read.format("文件类型").load(path)
    • 举例: dataFrame1 = spark.read.format(“parquet”).load(path)
  • 方式3: df = spark.read.load(path,format="文件类型")
    • 举例: dataFrame1 = spark.read.load(path,format=”json”)

读取text文件

概念:

  • 1、以text文本的方式进行读取,读取之后数据只有一列,列名默认是value,类型是String
  • 2、由于以text方式读取的DataFrame数据只有一列,分析非常不方便,建议慎用
1
2
3
4
5
6
7
# 1、将text文件读取并转为DataFrame
df1 = spark.read.text('hdfs://node1:8020/datas/input/read_type/people.txt')
df1.show()
df2 = spark.read.format('text').load('hdfs://node1:8020/datas/input/read_type/people.txt')
df2.show()
df3 = spark.read.load('hdfs://node1:8020/datas/input/read_type/people.txt',format='text')
df3.show()

读取csv文件-重点

大体顺序可以分为: spark.read.schema.option.csv(path)

  • 分隔符是逗号,没有表头

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    # 方式1----、将csv文件读取并转为DataFrame
    #.schema('stuid string,name string,age int') 给DataFrame添加表结构(列名和列类型)
    df1 = spark.read\
    .schema('stuid string,name string,age int')\
    .csv('hdfs://node1:8020/datas/input/read_type/people.csv')
    df1.show()
    df1.printSchema()
    # 方式2----、将csv文件读取并转为DataFrame
    # .schema('stuid string,name string,age int') 给DataFrame添加表结构(列名和列类型)
    df2 = spark.read\
    .format('csv')\
    .schema('id string,name string,age int')\
    .load('hdfs://node1:8020/datas/input/read_type/people.csv')
    df2.show()
    df2.printSchema()
    # 方式3----将csv文件读取并转为DataFrame
    # .schema('stuid string,name string,age int') 给DataFrame添加表结构(列名和列类型)
    df3 = spark.read\
    .schema('id string,name string,age int')\
    .load('hdfs://node1:8020/datas/input/read_type/people.csv',format='csv')
    df3.show()
    df3.printSchema()
  • 分隔符不是逗号,没有表头

    1
    2
    3
    4
    5
    6
    7
    df1 = spark.read\
    .schema('stuid string,name string,age int')\
    .option('sep','#')\
    .csv('hdfs://node1:8020/datas/input/read_type/people1.csv')

    df1.show()
    df1.printSchema()
  • 分隔符不是逗号,有表头

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    #.schema('stuid string,name string,age int') 给DataFrame添加表结构(列名和列类型)
    #.option('sep','#') 手动指定文件之间的分隔符
    #.option('header', 'true') 如果有表头,spark可以自动推断每列的名字,但是每列的类型都是String类型,
    # 如果想指定每列的类型,则可以定义schema信息 或者添加 .option("inferSchema", "true") 参数
    df1 = spark.read\
    .option('sep','#')\
    .option('header','true')\
    .option("inferSchema", "true")\
    .csv('hdfs://node1:8020/datas/input/read_type/people2.csv')
    df1.show()
    df1.printSchema()

读取json文件

读取json文件,spark可以自动推导每列的名字和类型,我们不需要设置额外参数

1
2
3
4
5
# 将json文件读取并转为DataFrame
df1 = spark.read\
.json('hdfs://node1:8020/datas/input/read_type/people.json')
df1.show()
df1.printSchema()

读取parquet和orc文件

读取orc或者Parquet文件,我们不需要设置额外参数,spark可以自动推导列名和列类型

1
2
3
4
5
6
7
8
9
10
#1、将orc文件读取并转为DataFrame
df1 = spark.read\
.orc('hdfs://node1:8020/datas/input/read_type/users.orc')
df1.show()
df1.printSchema()
#2、将parquet文件读取并转为DataFrame
df2 = spark.read\
.parquet('hdfs://node1:8020/datas/input/read_type/users.parquet')
df2.show()
df2.printSchema()

Spark写入外部文件的方式

语法

  • 写入模式

    1
    2
    3
    4
    append: 追加模式,当数据存在时,继续追加
    overwrite: 覆写模式,当数据存在时,覆写以前数据,存储当前最新数据;
    error/errorifexists: 如果目标存在就报错,默认的模式
    ignore: 忽略,数据存在时不做任何操作
  • 方式1

    1
    2
    3
    4
    df.write.mode(saveMode="append").format("text").save(path)
    df.write.mode(saveMode="overwrite").format("json").save(path)
    df.write.mode(saveMode="errorifexists").format("csv").save(path)
    df.write.mode(saveMode="ignore").format("parquet").save(path)
  • 方式2

    1
    2
    3
    4
    df.write.text(path)
    df.write.json(path)
    df.write.csv(path)
    df.write.parquet(path)
  • 代码

    • text文件写入

      1
      2
      #建议放弃text文件写入,因为text要求所有字段都是string类型,要求只能有一列
      df_rs.write.mode(saveMode="append").format("text").save('hdfs://node1:8020/datas/output/text')
    • csv写入

      1
      2
      #csv写入,不需要设置任何参数,默认分隔符是逗号
      df_rs.write.mode(saveMode="errorifexists").format("csv").save('hdfs://node1:8020/datas/output/csv')
    • json写入

      1
      2
      3
      #json写入,不需要设置任何参数,默认每行都生成json串

      df_rs.write.mode(saveMode="overwrite").format("json").save('hdfs://node1:8020/datas/output/json')
    • orc 和 parque

      1
      2
      3
      4
      5
      #orc和parquet写入,不需要设置任何参数,文件内容看不懂,但是程序可以识别 

      df_rs.write.mode(saveMode="ignore").format("orc").save('hdfs://node1:8020/datas/output/orc')
      df_rs.write.mode(saveMode="ignore").format("parquet").save('hdfs://node1:8020/datas/output/parquet')

    Hive及MySQL数据读取

    MySQL数据的读写

    环境准备

    • 概述

      1
      连接mysql必须知道的参数:url地址,数据库,端口,用户名、密码
    • 环境准备

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      #1、在node1,node2,node3上进入以下目录,将MySQL的驱动jar包上传到该目录
      cd /export/server/anaconda3/lib/python3.8/site-packages/pyspark/jars


      #2、根据不同模式进行以下操作
      #2.1 -----------本地模式---------------
      #在node1上进入以下目录上传驱动jar包
      cd /export/server/spark-local/jars

      #2.2 -----------Yarn模式---------------
      #在node1,node2,node3上进入以下目录上传驱动jar包
      cd /export/server/spark-yarn/jars

      #2.3 -----------StandAlone模式---------------
      #在node1,node2,node3上进入以下目录上传驱动jar包
      cd /export/server/spark-standalone/jars


      #3、将MySQL的驱动包传入HDFS上/spark/jars目录
    • 数据准备

      1
      2
      3
      4
      1、查看node1上mysql数据库中是否有db_company数据库
      2、如果没有则执行以下操作,否则什么也不做
      mysql -uroot -p
      source source /root/db_company.sql #将db_company.sql放入node1的/root目录

    读取MySQL数据

    • 代码

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      #-------------------读取mysql数据-方式1---------------------------------
      prop = {'user':'root', 'password':'123456', 'driver':'com.mysql.jdbc.Driver'}
      table = "db_company.emp"
      url = "jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true"
      jdbc_df1 = spark.read.jdbc(url=url, table=table, properties=prop)
      jdbc_df1.show()
      jdbc_df1.printSchema()


      print('-------------------------------------------------------')
      #-------------------读取mysql数据-方式2---------------------------------
      jdbc_df2 = spark.read \
      .format("jdbc") \
      .option("url", "jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true") \
      .option("dbtable", "db_company.emp") \
      .option("user", "root") \
      .option("password", "123456") \
      .load()
      jdbc_df2.show()
      jdbc_df2.printSchema()

    写入MySQL数据

    • 环境准备

      1
      2
      3
      4
      5
      6
      7
      8
      #1、在mysql中创建表
      CREATE TABLE db_company.emp_v3 (
      `empno` int(11) NOT NULL,
      `ename` varchar(10) DEFAULT NULL,
      `job` varchar(9) DEFAULT NULL,
      `sal` double DEFAULT NULL,
      `deptno` int(11) DEFAULT NULL
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ;
    • 代码

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      #------------------写入mysql数据-方式2---------------------------------
      jdbc_df2 \
      .select('empno', 'ename', 'job', 'sal', 'deptno') \
      .write \
      .mode("append") \
      .format("jdbc") \
      .option("url", "jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true") \
      .option("dbtable", "db_company.emp_v3") \
      .option("user", "root") \
      .option("password", "123456") \
      .save()

    Hive的数据读写

    • 数据准备,进入hive终端,执行以下命令

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      CREATE DATABASE IF NOT EXISTS `db_hive`;
      USE `db_hive`;
      DROP TABLE IF EXISTS `dept`;
      CREATE TABLE `dept` (
      `deptno` int,
      `dname` string,
      `loc` string
      )
      row format delimited fields terminated by '\t';



      #--------方式1-insert方式----------------------
      insert into `dept`(`deptno`,`dname`,`loc`) values (10,'ACCOUNTING','NEW YORK'),(20,'RESEARCH','DALLAS'),(30,'SALES','CHICAGO'),(40,'OPERATIONS','BOSTON');

      #--------方式2-load方式----------------------
      load data local inpath '/root/dept.txt' overwrite into table dept;


      DROP TABLE IF EXISTS `emp`;
      CREATE TABLE `emp` (
      `empno` int,
      `ename` string,
      `job` string,
      `mgr` string,
      `hiredate` string,
      `sal` double,
      `comm` double,
      `deptno` int
      )
      row format delimited fields terminated by '\t';

      #导入数据使用以下命令或者使用load方式加载文件

      #--------方式1-insert方式----------------------
      insert into `emp`(`empno`,`ename`,`job`,`mgr`,`hiredate`,`sal`,`comm`,`deptno`) values (7369,'SMITH','CLERK',7902,'1980-12-17',800,NULL,20),(7499,'ALLEN','SALESMAN',7698,'1981-02-20',1600,300,30),(7521,'WARD','SALESMAN',7698,'1981-02-22',1250,500,30),(7566,'JONES','MANAGER',7839,'1981-04-02',2975,NULL,20),(7654,'MARTIN','SALESMAN',7698,'1981-09-28',1250,1400,30),(7698,'BLAKE','MANAGER',7839,'1981-05-01',2850,NULL,30),(7782,'CLARK','MANAGER',7839,'1981-06-09',2450,NULL,10),(7788,'SCOTT','ANALYST',7566,'1987-07-13',3000,NULL,20),(7839,'KING','PRESIDENT',NULL,'1981-11-17',5000,NULL,10),(7844,'TURNER','SALESMAN',7698,'1981-09-08',1500,0,30),(7876,'ADAMS','CLERK',7788,'1987-07-13',1100,NULL,20),(7900,'JAMES','CLERK',7698,'1981-12-03',950,NULL,30),(7902,'FORD','ANALYST',7566,'1981-12-03',3000,NULL,20),(7934,'MILLER','CLERK',7782,'1982-01-23',1300,NULL,10);

      #--------方式2-load方式----------------------
      load data local inpath '/root/emp.txt' overwrite into table emp;
    • 环境准备

      • 启动服务
      1
      2
      3
      4
      5
      6
      7
      # 启动HDFS服务:NameNode和DataNodes
      start-dfs.sh

      # 启动HiveMetaStore 服务
      # nohup hive --service metastore 2>&1 &

      start-metastore.sh
      • 配置文件
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      #----------------local模式:在node1配置--------------------------
      # 进入配置文件目录,创建配置文件
      cd /export/server/spark-local/conf
      vim hive-site.xml

      # 添加以下内容
      <?xml version="1.0"?>
      <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
      <configuration>
      <property>
      <name>hive.metastore.uris</name>
      <value>thrift://node1.itcast.cn:9083</value>
      </property>
      </configuration>



      #----------------Spark On Yarn模式:在node1,node2,node3配置------------------------
      # 进入配置文件目录创建配置文件
      cd /export/server/spark-yarn/conf
      vim hive-site.xml

      # 添加以下内容
      <?xml version="1.0"?>
      <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
      <configuration>
      <property>
      <name>hive.metastore.uris</name>
      <value>thrift://node1.itcast.cn:9083</value>
      </property>
      </configuration>
      • 启动命令行
      1
      /export/server/spark-local/bin/pyspark --master local[2] --conf spark.sql.shuffle.partitions=2
      • 验证环境
      1
      2
      3
      # 列举
      spark.sql("show databases").show()
      spark.sql("show tables in db_hive") .show()
    • 代码

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      import time

      from pyspark import SparkContext, SparkConf, StorageLevel
      import os
      import sys
      from pyspark.sql import SparkSession, Window

      # spark入门案例 --- WordCount
      # 1、设置环境变量
      from pyspark.sql.functions import *

      os.environ['JAVA_HOME'] = '/export/server/jdk'
      os.environ['SPARK_HOME'] = '/export/server/spark' # Spark安装位置
      os.environ["PYSPARK_PYTHON"] = "/export/server/anaconda3/bin/python3"
      os.environ["PYSPARK_DRIVER_PYTHON"] = "/export/server/anaconda3/bin/python3"

      if __name__ == '__main__':
      #创建用于操作Hive的SparkSession
      """
      spark.sql.warehouse.dir:用来指定Hive表数据在HDFS的位置
      hive.metastore.uris :用来指定hive的metastore服务(runjar)在哪台主机
      enableHiveSupport :开启spark支持hive
      """
      spark = SparkSession \
      .builder \
      .appName("SparkSQLAppName") \
      .master("local[2]") \
      .config("spark.sql.shuffle.partitions", 2) \
      .config("spark.sql.warehouse.dir", 'hdfs://node1.itcast.cn:8020/user/hive/warehouse')\
      .config("hive.metastore.uris", "thrift://node1.itcast.cn:9083")\
      .enableHiveSupport()\
      .getOrCreate()

      # 设置日志级别为WARN
      spark.sparkContext.setLogLevel("WARN")

      #--------------------从hive读取-SQL风格--------------------------------
      #使用spark操作hive的表
      spark.sql(
      """
      select * from db_hive.emp;
      """
      ).show()

      spark.sql(
      """
      select deptno,count(*) as cnt from db_hive.emp group by deptno;
      """
      ).show()

      # --------------------从hive读取-DSL风格--------------------------------
      dataFrame1 = spark.read.table("db_hive.emp")

      dataFrame2 = dataFrame1 \
      .select("deptno", "sal") \
      .groupBy("deptno") \
      .agg(
      count("*").alias("cnt")
      )
      dataFrame2.show()



      # --------------------向hive写入--------------------------------
      #test_rs表如果不存在,则创建,如果存在,则覆盖数据
      dataFrame2.write.mode("overwrite").format("hive").saveAsTable("db_hive.test_rs")