将从Kafka中读取的binary二进制数据(DataFrame结构)转为String类型的数据, 方便后续的处理

这里有两种方式

  • DSL风格-传统方式
    • select(col("value").cast(StringType()))
  • DSL风格使用selec表达式替换
    • selectExpr("cast(value as string)")

SparkStreaming和Kafka整合

无非就是Kafka中的几个组件(生产者, 消费者, topic, partition)和Streaming进行整合

这里的重点就是理解Kafka中的原理

从Kafka中读取数据

  • 订阅一个主题
1
2
3
4
5
6
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
  • 订阅多个主题
1
2
3
4
5
6
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1,topic2") \
.load()
  • 订阅指定名称开头的主题
1
2
3
4
5
6
7
#订阅的主题使用通配符:只要主题是topic.开头,都进行订阅
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribePattern", "topic.*") \
.load()

将数据写出到Kafka中

  • 向Kafka写入:只写value,写入内容必须只有一列 —-》随机分区
1
2
3
4
5
6
7
ds = df \
.selectExpr("CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.start()
  • 向Kafka写入:写入key和value,写入内容必须只有二列 —-》hash分区
1
2
3
4
5
6
7
ds = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.start()
  • 向Kafka写入:指定topic、key、value,写入内容必须只有3列 —-》hash分区
1
2
3
4
5
6
ds = df \
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.start()

Spark从Kafka中读取数据的格式

1
2
3
4
5
- Key:写入数据的Key
- Value:存储在Kafka中的: 真正内容
- Topic:当前这条数据所属的Topic
- Partition:当前这条数据所属的分区
- offset:当前这条数据所在的offset

Spark作为消费者

Structured Streaming从kafka消费数据进行处理

环境准备

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 在node1、node2、node3上,分别切换到pyspark的依赖包目录下,上传资料中的kafka依赖包
cd /export/server/anaconda3/lib/python3.8/site-packages/pyspark/jars


#在node1,node2,node3分别执行以下命令,启动Zookeeper
/export/server/zookeeper/bin/zkServer.sh start


#在node1,node2,node3分别执行以下命令启动kafka
cd /export/server/kafka_2.12-2.4.1/
bin/kafka-server-start.sh config/server.properties >>/dev/null 2>&1 &

# 创建Topic
kafka-topics.sh --create --topic wordsTopic --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092

CodeDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import time

from pyspark import SparkContext, SparkConf
import os
import sys
from pyspark.sql import SparkSession

# 1、设置环境变量
from pyspark.sql.functions import *

os.environ['JAVA_HOME'] = '/export/server/jdk'
# os.environ['SPARK_HOME'] = '/export/server/spark' # Spark安装位置
os.environ["PYSPARK_PYTHON"] = "/export/server/anaconda3/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/export/server/anaconda3/bin/python3"

if __name__ == '__main__':
print("WordCount")

# 1、在Python代码中入口类对象:SparkSession
"""
.appName:应用的名字
config("spark.sql.shuffle.partitions", 2) spark经过shuffle之后重新分配的分区数,模式是200个
getOrCreate() :如果代码前文已经创建了该对象,则获取,否则创建
"""
spark = SparkSession \
.builder \
.master('local[2]') \
.appName("wordcount_1") \
.config("spark.sql.shuffle.partitions", 2) \
.getOrCreate()

# StructuredStreaming 从Kafka中读取数据
input_df = spark\
.readStream\
.format('kafka')\
.option('kafka.bootstrap.servers', 'node1:9092, node2:9092, node3:9092')\
.option('subscribe', 'wordsTopic').load()

# 读取kafka中的数据并进行类型转换, 进行单词统计(从kafka中读取的数据类型是binary二进制类型
# kafka的每个数据包含了很多信息, key, value, topic, partition, offset, timestamp
value_df = input_df.selectExpr("CAST(value as STRING)")

rs_df = value_df\
.select(explode(split(col('value'), '\\s+')).alias('word'))\
.groupBy(col('word'))\
.agg(count(col('word')).alias('cnt'))

# 保存结果
rs_df\
.writeStream\
.outputMode('complete')\
.format('console')\
.start()\
.awaitTermination()

运行操作

1
2
3
4
5
6
7
8
1、启动生产者
kafka-console-producer.sh --topic wordsTopic --broker-list node1:9092,node2:9092,node3:9092

2、运行pycharm代码

3、在生产端模拟单词

4、观察pycharm输出

Spark作为生产者

Spark Structured Streaming 可以作为Kafka的生成者,将处理后的结果写入kafka

需求

1
2
3
1、使用Structured Streaming从Kafka的TopicA主题读取数据
2、Structured Streaming对读取的数据进行单词统计
3、Structured Streaming将统计后的写入Kafka的TopicB主题

环境准备

1
2
3
4
5
6
# 创建一个TopicA,用于模拟数据存储原始数据
kafka-topics.sh --create --topic TopicA --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092


# 创建TopicB,用于存储词频统计的结果
kafka-topics.sh --create --topic TopicB --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092

CodeDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import time

from pyspark import SparkContext, SparkConf
import os
import sys
from pyspark.sql import SparkSession

# spark入门案例 --- WordCount
# 1、设置环境变量
from pyspark.sql.functions import *
from pyspark.sql.types import StringType

os.environ['JAVA_HOME'] = '/export/server/jdk'
os.environ["PYSPARK_PYTHON"] = "/export/server/anaconda3/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/export/server/anaconda3/bin/python3"

if __name__ == '__main__':
print("spark入门案例 --- WordCount")

# 1、在Python代码中入口类对象:SparkSession
spark = SparkSession \
.builder \
.master('local[2]') \
.appName("wordcount_streaming") \
.config("spark.sql.shuffle.partitions", 2) \
.getOrCreate()

# 2、StructuredStreaming从Kafka读取数据
input_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "node1:9092,node2:9092,node3:9092") \
.option("subscribe", "TopicA") \
.load()

#3、对读取的kafka数据进行类型转换,并进行单词统计
"""
1、kafka的每个数据包含了很多信息:key value topic partition offset timestamp
2、往往只需要value这一列 它保存了真实的数据内容
3、这个value在kafka中是binary类型 因此需要将其转成String类型
"""
# input_df.select(col('value').cast(StringType()))
rs_df = input_df\
.selectExpr('cast(value as string)')\
.select(explode(split(col("value"), "\\s+")).alias("word"))\
.groupBy(col("word"))\
.agg(count(col("word")).alias("cnt"))

#4、将统计结果保存到Kafka的TopcA主题,如果不指定key,则写入时只能有一列,所以要对rs_df的两列进行拼接,拼接成一列,作为kafka的value

rs_df \
.select(concat_ws("-", col("word"), col("cnt")).alias("value")) \
.writeStream \
.outputMode("Complete") \
.format("kafka") \
.option("kafka.bootstrap.servers", "node1:9092,node2:9092,node3:9092") \
.option("topic", "TopicB") \
.option("checkpointLocation", "file:///root/file_kafka_check_point") \
.start().awaitTermination()

# 停止SparkSession
spark.stop()

Structured Streaming的CheckPoint

  • 1、一个实时程序在执行完任务时,如果程序出现故障,则默认情况下需要从头从新计算,代价太高
  • 2、我们可以给Structured Streaming设置了一个缓存checkpoint,用来保存计算的所有配置信息,当程序重启时可以从checkpoint目录下读取配置信息,将程序的状态恢复到执行之前,避免从头计算。

两种方式构建SparkStreaming的CheckPoint

  • 方式一:在构建SparkSession时配置
  • 方式二:在输出DataFrame的数据时配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 方式一:在构建SparkSession时配置
spark = SparkSession\
.builder\
.master("local[2]")\
.appName("Struct WordCount Streaming")\
.config("spark.sql.shuffle.partitions", 2)\
.config("spark.sql.streaming.checkpointLocation", "/datas/output/chk2")\
.getOrCreate()


# 方式二:在输出DataFrame的数据时配置
rs_data\
.writeStream\
.outputMode("complete")\
.format("kafka") \
.option("kafka.bootstrap.servers", "node1:9092,node2:9092,node3:9092") \
.option("topic", "TopicName") \
.option("checkpointLocation", "/datas/output/chk2")\
.start()\
.awaitTermination()

Structured Streaming的Trigger触发间隔

  • 实时处理的级别:秒级、亚秒级、毫秒级、微妙级、纳秒级
  • Trigger的分类
    • 尽快进行微批处理:as soon as micro-batch,代码不用做任何设置
    • 只执行一个批次,不是流式计算,效果等同于离线处理Once
    • 持续数据处理,延迟可以缩小到1ms,支持at-least-once,不支持聚合,目前还处于试验阶段

CodeDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#2.2 按照一定的时间间隔来运行每个批次
writeStream\
.outputMode("complete")\
.format("console")\
.trigger(processingTime='5 seconds')\ #每隔5秒钟计算一次
.start()\
.awaitTermination()

#2.3 Once:只执行一个批次,不是流式计算,效果等同于离线处理
dwriteStream\
.outputMode("complete")\
.format("console")\
.trigger(once=True)\ #在此设置
.start()\
.awaitTermination()

#2.4 持续数据处理,延迟可以缩小到1ms,支持at-least-once,不支持聚合,目前还处于试验阶段
# 注意:这里的5s不代表每5s执行一次,代表每隔一段时间做一次记录
dwriteStream\
.outputMode("complete").\
.format("console")\
.trigger(continuous='5 seconds')\ #在此设置
.start()\
.awaitTermination()

Structured Streaming物联网案例

  • 数据

    1
    2
    {'deviceID': 'device_1_19', 'deviceType': '油烟机', 'deviceSignal': 16, 'time': '1690356364'}
    {'deviceID': 'device_7_7', 'deviceType': '水表', 'deviceSignal': 76, 'time': '1690356366'}
  • 需求

    • 1、生产端一直模拟物联网数据写入Kafka
    • 2、Structured Streaming作为消费端从Kafka读取数据进行统计
    • 3、结果打印到终端
    • 要求
      • 1、各种设备类型的设备数量和平均信号强度
      • 2、要求信号强度大于30
  • 代码

    • 生产端

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      import json
      import random
      import sys
      import time
      import os
      from kafka import KafkaProducer
      from kafka.errors import KafkaError

      # 锁定远端操作环境, 避免存在多个版本环境的问题
      os.environ['SPARK_HOME'] = '/export/server/spark'
      os.environ["PYSPARK_PYTHON"] = "/root/anaconda3/bin/python"
      os.environ["PYSPARK_DRIVER_PYTHON"] = "/root/anaconda3/bin/python"

      # 快捷键: main 回车
      if __name__ == '__main__':
      print("模拟物联网数据")

      # 1- 构建一个kafka的生产者:
      producer = KafkaProducer(
      bootstrap_servers=['node1:9092', 'node2:9092', 'node3:9092'],
      acks='all', # 等价-1
      value_serializer=lambda m: json.dumps(m).encode("utf-8") #json.dumps:将一个字典类型转为字符串
      )
      # 2- 物联网设备类型
      deviceTypes = ["洗衣机", "油烟机", "空调", "窗帘", "灯", "窗户", "煤气报警器", "水表", "燃气表"]

      while True:
      index = random.choice(range(0, len(deviceTypes)))
      deviceID = f'device_{index}_{random.randrange(1, 20)}' #设备id

      deviceType = deviceTypes[index] #设备类型
      deviceSignal = random.choice(range(10, 100)) #信号强度

      # 组装数据集
      print({'deviceID': deviceID, 'deviceType': deviceType, 'deviceSignal': deviceSignal,
      'time': time.strftime('%s')})

      # 发送数据
      #在send函数的内部会自动的调用上边定义的value_serializer函数,将value的字段数据转为字符串写入kafka
      producer.send(topic='search-log-topic',
      value={'deviceID': deviceID, 'deviceType': deviceType, 'deviceSignal': deviceSignal,
      'time': time.strftime('%s')} )
      # 间隔时间 5s内随机
      time.sleep(random.choice(range(1, 3)))
    • 消费端

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      #!/usr/bin/env python
      # -*- coding: utf-8 -*-

      import os

      from pyspark.sql import SparkSession
      from pyspark.sql.functions import *

      """
      -------------------------------------------------
      Description : TODO:通过SQL或者DSL实现物联网分析案例的开发
      SourceFile : 0502.struct_streaming_iot_case
      Author :
      Date :
      -------------------------------------------------
      """

      if __name__ == '__main__':
      # todo:0-设置系统环境变量:全部换成Linux地址
      os.environ['JAVA_HOME'] = '/export/server/jdk'
      os.environ['HADOOP_HOME'] = '/export/server/hadoop'
      os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3'
      os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3'

      # todo:1-构建SparkSession
      spark = SparkSession \
      .builder \
      .master("local[2]") \
      .appName("StructStreaming APP") \
      .config("spark.sql.shuffle.partitions", 2) \
      .getOrCreate()

      # todo:2-数据处理:读取、转换、保存
      # step1: 读取数据:将数据变成一个DataFrame
      kafka_data = spark\
      .readStream\
      .format("kafka") \
      .option("kafka.bootstrap.servers", "node1:9092,node2:9092,node3:9092") \
      .option("subscribe", "search-log-topic") \
      .load()

      # step2: 处理数据:使用SQL或者DSL对DF进行转换
      # 获取Value
      value_data = kafka_data.selectExpr("CAST(value AS STRING)")

      # 先实现数据清洗:从每条数据中获取4列
      etl_data = ( value_data
      # 调用函数从字段中获取每一列:json_tuple、get_json_object
      .select(
      get_json_object("value", "$.deviceID").alias("device_id"),
      get_json_object("value", "$.deviceType").alias("device_type"),
      get_json_object("value", "$.deviceSignal").alias("signal"),
      get_json_object("value", "$.time").alias("time")
      )
      )

      """SQL实现"""
      # # 注册视图
      # etl_data.createOrReplaceTempView("tmp_view_iot")
      # # SQL语句
      # rs1 = spark.sql("""
      # select
      # device_type,
      # count(device_id) as cnt,
      # round(avg(signal), 2) as avg_signal
      # from tmp_view_iot
      # where signal > 30
      # group by device_type
      # """)

      """DSL实现"""
      rs2 = ( etl_data
      # 先过滤
      .where(col("signal") > 30)
      # 再分组
      .groupby(col("device_type"))
      # 聚合
      .agg(
      count(col("device_id")).alias("cnt"),
      round(avg(col("signal")), 2).alias("avg_signal")
      )
      )

      # step3: 保存结果:打印或者保存

      rs2\
      .writeStream\
      .outputMode("complete")\
      .format("console")\
      .start().awaitTermination()

  • 代码测试

    1
    2
    3
    1、在Pycharm先运行生产端代码
    2、在Pycharm先运行消费端代码
    3、在Pycharm的消费端查看结果