将从Kafka中读取的binary二进制数据(DataFrame结构)转为String类型的数据, 方便后续的处理
这里有两种方式
- DSL风格-传统方式
select(col("value").cast(StringType()))
- DSL风格使用selec表达式替换
selectExpr("cast(value as string)")
SparkStreaming和Kafka整合
无非就是Kafka中的几个组件(生产者, 消费者, topic, partition)和Streaming进行整合
这里的重点就是理解Kafka中的原理
从Kafka中读取数据
1 2 3 4 5 6
| df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ .option("subscribe", "topic1") \ .load()
|
1 2 3 4 5 6
| df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ .option("subscribe", "topic1,topic2") \ .load()
|
1 2 3 4 5 6 7
| df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ .option("subscribePattern", "topic.*") \ .load()
|
将数据写出到Kafka中
- 向Kafka写入:只写value,写入内容必须只有一列 —-》随机分区
1 2 3 4 5 6 7
| ds = df \ .selectExpr("CAST(value AS STRING)") \ .writeStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ .option("topic", "topic1") \ .start()
|
- 向Kafka写入:写入key和value,写入内容必须只有二列 —-》hash分区
1 2 3 4 5 6 7
| ds = df \ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ .writeStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ .option("topic", "topic1") \ .start()
|
- 向Kafka写入:指定topic、key、value,写入内容必须只有3列 —-》hash分区
1 2 3 4 5 6
| ds = df \ .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \ .writeStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ .start()
|
Spark从Kafka中读取数据的格式
1 2 3 4 5
| - Key:写入数据的Key - Value:存储在Kafka中的: 真正内容 - Topic:当前这条数据所属的Topic - Partition:当前这条数据所属的分区 - offset:当前这条数据所在的offset
|
Spark作为消费者
Structured Streaming从kafka消费数据进行处理
环境准备
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| # 在node1、node2、node3上,分别切换到pyspark的依赖包目录下,上传资料中的kafka依赖包 cd /export/server/anaconda3/lib/python3.8/site-packages/pyspark/jars
#在node1,node2,node3分别执行以下命令,启动Zookeeper /export/server/zookeeper/bin/zkServer.sh start
#在node1,node2,node3分别执行以下命令启动kafka cd /export/server/kafka_2.12-2.4.1/ bin/kafka-server-start.sh config/server.properties >>/dev/null 2>&1 &
# 创建Topic kafka-topics.sh --create --topic wordsTopic --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092
|
CodeDemo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| import time
from pyspark import SparkContext, SparkConf import os import sys from pyspark.sql import SparkSession
from pyspark.sql.functions import *
os.environ['JAVA_HOME'] = '/export/server/jdk'
os.environ["PYSPARK_PYTHON"] = "/export/server/anaconda3/bin/python3" os.environ["PYSPARK_DRIVER_PYTHON"] = "/export/server/anaconda3/bin/python3"
if __name__ == '__main__': print("WordCount")
""" .appName:应用的名字 config("spark.sql.shuffle.partitions", 2) spark经过shuffle之后重新分配的分区数,模式是200个 getOrCreate() :如果代码前文已经创建了该对象,则获取,否则创建 """ spark = SparkSession \ .builder \ .master('local[2]') \ .appName("wordcount_1") \ .config("spark.sql.shuffle.partitions", 2) \ .getOrCreate()
input_df = spark\ .readStream\ .format('kafka')\ .option('kafka.bootstrap.servers', 'node1:9092, node2:9092, node3:9092')\ .option('subscribe', 'wordsTopic').load()
value_df = input_df.selectExpr("CAST(value as STRING)")
rs_df = value_df\ .select(explode(split(col('value'), '\\s+')).alias('word'))\ .groupBy(col('word'))\ .agg(count(col('word')).alias('cnt'))
rs_df\ .writeStream\ .outputMode('complete')\ .format('console')\ .start()\ .awaitTermination()
|
运行操作
1 2 3 4 5 6 7 8
| 1、启动生产者 kafka-console-producer.sh --topic wordsTopic --broker-list node1:9092,node2:9092,node3:9092
2、运行pycharm代码
3、在生产端模拟单词
4、观察pycharm输出
|
Spark作为生产者
Spark Structured Streaming 可以作为Kafka的生成者,将处理后的结果写入kafka
需求
1 2 3
| 1、使用Structured Streaming从Kafka的TopicA主题读取数据 2、Structured Streaming对读取的数据进行单词统计 3、Structured Streaming将统计后的写入Kafka的TopicB主题
|
环境准备
1 2 3 4 5 6
| # 创建一个TopicA,用于模拟数据存储原始数据 kafka-topics.sh --create --topic TopicA --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092
# 创建TopicB,用于存储词频统计的结果 kafka-topics.sh --create --topic TopicB --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092
|
CodeDemo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| import time
from pyspark import SparkContext, SparkConf import os import sys from pyspark.sql import SparkSession
from pyspark.sql.functions import * from pyspark.sql.types import StringType
os.environ['JAVA_HOME'] = '/export/server/jdk' os.environ["PYSPARK_PYTHON"] = "/export/server/anaconda3/bin/python3" os.environ["PYSPARK_DRIVER_PYTHON"] = "/export/server/anaconda3/bin/python3"
if __name__ == '__main__': print("spark入门案例 --- WordCount")
spark = SparkSession \ .builder \ .master('local[2]') \ .appName("wordcount_streaming") \ .config("spark.sql.shuffle.partitions", 2) \ .getOrCreate()
input_df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "node1:9092,node2:9092,node3:9092") \ .option("subscribe", "TopicA") \ .load()
""" 1、kafka的每个数据包含了很多信息:key value topic partition offset timestamp 2、往往只需要value这一列 它保存了真实的数据内容 3、这个value在kafka中是binary类型 因此需要将其转成String类型 """ rs_df = input_df\ .selectExpr('cast(value as string)')\ .select(explode(split(col("value"), "\\s+")).alias("word"))\ .groupBy(col("word"))\ .agg(count(col("word")).alias("cnt"))
rs_df \ .select(concat_ws("-", col("word"), col("cnt")).alias("value")) \ .writeStream \ .outputMode("Complete") \ .format("kafka") \ .option("kafka.bootstrap.servers", "node1:9092,node2:9092,node3:9092") \ .option("topic", "TopicB") \ .option("checkpointLocation", "file:///root/file_kafka_check_point") \ .start().awaitTermination()
spark.stop()
|
Structured Streaming的CheckPoint
- 1、一个实时程序在执行完任务时,如果程序出现故障,则默认情况下需要从头从新计算,代价太高
- 2、我们可以给Structured Streaming设置了一个缓存checkpoint,用来保存计算的所有配置信息,当程序重启时可以从checkpoint目录下读取配置信息,将程序的状态恢复到执行之前,避免从头计算。
两种方式构建SparkStreaming的CheckPoint
- 方式一:在构建SparkSession时配置
- 方式二:在输出DataFrame的数据时配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| spark = SparkSession\ .builder\ .master("local[2]")\ .appName("Struct WordCount Streaming")\ .config("spark.sql.shuffle.partitions", 2)\ .config("spark.sql.streaming.checkpointLocation", "/datas/output/chk2")\ .getOrCreate()
rs_data\ .writeStream\ .outputMode("complete")\ .format("kafka") \ .option("kafka.bootstrap.servers", "node1:9092,node2:9092,node3:9092") \ .option("topic", "TopicName") \ .option("checkpointLocation", "/datas/output/chk2")\ .start()\ .awaitTermination()
|
Structured Streaming的Trigger触发间隔
- 实时处理的级别:秒级、亚秒级、毫秒级、微妙级、纳秒级
- Trigger的分类
- 尽快进行微批处理:as soon as micro-batch,代码不用做任何设置
- 只执行一个批次,不是流式计算,效果等同于离线处理Once
- 持续数据处理,延迟可以缩小到1ms,支持at-least-once,不支持聚合,目前还处于试验阶段
CodeDemo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| writeStream\ .outputMode("complete")\ .format("console")\ .trigger(processingTime='5 seconds')\ #每隔5秒钟计算一次 .start()\ .awaitTermination()
dwriteStream\ .outputMode("complete")\ .format("console")\ .trigger(once=True)\ #在此设置 .start()\ .awaitTermination()
dwriteStream\ .outputMode("complete").\ .format("console")\ .trigger(continuous='5 seconds')\ #在此设置 .start()\ .awaitTermination()
|
Structured Streaming物联网案例
数据
1 2
| {'deviceID': 'device_1_19', 'deviceType': '油烟机', 'deviceSignal': 16, 'time': '1690356364'} {'deviceID': 'device_7_7', 'deviceType': '水表', 'deviceSignal': 76, 'time': '1690356366'}
|
需求
- 1、生产端一直模拟物联网数据写入Kafka
- 2、Structured Streaming作为消费端从Kafka读取数据进行统计
- 3、结果打印到终端
- 要求
- 1、各种设备类型的设备数量和平均信号强度
- 2、要求信号强度大于30
代码
生产端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| import json import random import sys import time import os from kafka import KafkaProducer from kafka.errors import KafkaError
os.environ['SPARK_HOME'] = '/export/server/spark' os.environ["PYSPARK_PYTHON"] = "/root/anaconda3/bin/python" os.environ["PYSPARK_DRIVER_PYTHON"] = "/root/anaconda3/bin/python"
if __name__ == '__main__': print("模拟物联网数据")
producer = KafkaProducer( bootstrap_servers=['node1:9092', 'node2:9092', 'node3:9092'], acks='all', value_serializer=lambda m: json.dumps(m).encode("utf-8") ) deviceTypes = ["洗衣机", "油烟机", "空调", "窗帘", "灯", "窗户", "煤气报警器", "水表", "燃气表"]
while True: index = random.choice(range(0, len(deviceTypes))) deviceID = f'device_{index}_{random.randrange(1, 20)}'
deviceType = deviceTypes[index] deviceSignal = random.choice(range(10, 100))
print({'deviceID': deviceID, 'deviceType': deviceType, 'deviceSignal': deviceSignal, 'time': time.strftime('%s')})
producer.send(topic='search-log-topic', value={'deviceID': deviceID, 'deviceType': deviceType, 'deviceSignal': deviceSignal, 'time': time.strftime('%s')} ) time.sleep(random.choice(range(1, 3)))
|
消费端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
|
import os
from pyspark.sql import SparkSession from pyspark.sql.functions import *
""" ------------------------------------------------- Description : TODO:通过SQL或者DSL实现物联网分析案例的开发 SourceFile : 0502.struct_streaming_iot_case Author : Date : ------------------------------------------------- """
if __name__ == '__main__': os.environ['JAVA_HOME'] = '/export/server/jdk' os.environ['HADOOP_HOME'] = '/export/server/hadoop' os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3'
spark = SparkSession \ .builder \ .master("local[2]") \ .appName("StructStreaming APP") \ .config("spark.sql.shuffle.partitions", 2) \ .getOrCreate()
kafka_data = spark\ .readStream\ .format("kafka") \ .option("kafka.bootstrap.servers", "node1:9092,node2:9092,node3:9092") \ .option("subscribe", "search-log-topic") \ .load()
value_data = kafka_data.selectExpr("CAST(value AS STRING)")
etl_data = ( value_data .select( get_json_object("value", "$.deviceID").alias("device_id"), get_json_object("value", "$.deviceType").alias("device_type"), get_json_object("value", "$.deviceSignal").alias("signal"), get_json_object("value", "$.time").alias("time") ) )
"""SQL实现"""
"""DSL实现""" rs2 = ( etl_data .where(col("signal") > 30) .groupby(col("device_type")) .agg( count(col("device_id")).alias("cnt"), round(avg(col("signal")), 2).alias("avg_signal") ) )
rs2\ .writeStream\ .outputMode("complete")\ .format("console")\ .start().awaitTermination()
|
代码测试
1 2 3
| 1、在Pycharm先运行生产端代码 2、在Pycharm先运行消费端代码 3、在Pycharm的消费端查看结果
|