• 从socket套接字中读取数据(被称为Socket Source)
1
2
3
4
5
input_df = spark \
.readStream \
.format("数据来源类型") \
.option("host", "地址") \
.load()
  • 实时的从文件中读取数据(被称为File Source)
1
2
3
4
5
6
input_df = spark\
.readStream\
.format('csv')\
.option('sep', ';') \
.schema('naem string, age int, hobby string')\
.load('file:///root/streaming')
  • 实时的将数据写出到文件中(被称为File Sink)
1
2
3
4
5
6
7
rs_df\
.writeStream\
.outputMode('append')\
.format('csv')\
.option('path', 'file:///root/file_sink')\
.option('checkpointLocation','file:///root/file_sink_check_point')\
.start().awaitTermination()
  • 实时的将数据写出到数据库中(被称为Foreach Sink)
1
2
3
4
5
res_df.writeStream \
.outputMode("complete") \
.foreach(process_row) \
.start() \
.awaitTermination()

实时与离线

  • 离线/批处理:基于固定的时间周期对每个时间周期的数据进行处理,基于T+1时间业务处理有界数据
  • 实时/流处理:基于数据的生成,数据一产生就立即对数据进行处理,以数据为计算标准处理无界数据

Spark Structed Streaming编程模型

  • Step1: 读取数据并放到一个无边界的DataFrame中
1
2
3
4
5
input_df = spark \
.readStream \
.format("数据来源类型") \
.option("host", "地址") \
.load()
  • Step2: 调用DSL或者SQL对DataFrame中的数据进行转换计算
1
2
# 可以使用SQL或者DSL进行处理
rs_df = input_df.select().groupBy.count()
  • Step3:每次追加计算的结果,放入一个无边界的结果表,不断更新计算的结果

SparkStreaming官方Demo

  • 启动HDFS

    1
    start-dfs.sh
  • 在node1机器运行nc

    1
    nc -lk 9999
  • 在node2运行

    1
    2
    3
    4
    5
    /export/server/spark/bin/run-example \
    --master local[2] \
    --conf spark.sql.shuffle.partitions=2 \
    org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount \
    node1 9999
  • 观察输出

Structured Streaming词频统计Demo

  • 步骤:
    • 获取SparkSession对象 from pyspark.sql import SparkSession
    • 从Socket网络端口读取数据转为DataFrame
      • 配置Spark.readStream
      • 格式化format为socket套接字format('socket')
      • option设置主机名称, option设置端口号, 设置load
    • 对DataFrame进行数据分析
    • 处理结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import time

from pyspark import SparkContext, SparkConf, Row
import os
import sys
from pyspark.sql import SparkSession

# 1、设置环境变量
from pyspark.sql.functions import *
import pymysql

os.environ['JAVA_HOME'] = '/export/server/jdk'
os.environ['SPARK_HOME'] = '/export/server/spark' # Spark安装位置
os.environ["PYSPARK_PYTHON"] = "/export/server/anaconda3/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/export/server/anaconda3/bin/python3"

if __name__ == '__main__':
print("WordCount")

# 1、在Python代码中入口类对象:SparkSession
"""
.appName:应用的名字
config("spark.sql.shuffle.partitions", 2) spark经过shuffle之后重新分配的分区数,模式是200个
getOrCreate() :如果代码前文已经创建了该对象,则获取,否则创建
"""
spark = SparkSession \
.builder \
.master('local[2]') \
.appName("wordcount_1") \
.config("spark.sql.shuffle.partitions", 2) \
.getOrCreate()

# 从网络接口中读取数据(封装成无边界的DataFrame
input_df = spark\
.readStream\
.format('socket')\
.option('host', 'node1')\
.option('port', 9999).load()

# 处理数据, 使用DSL或者SQL来处理
res_df = input_df\
.select(explode(split('value', '\\s+')).alias('word'))\
.groupBy(col('word'))\
.agg(count(col('word')).alias('cnt'))

"""
方式1:直接使用process_row函数方式:定义一个方法来调用
"""
"""
process_row(row:Row):系统会自动将rs_df的每个结果封装成Row对象,传入参数
"""


def process_row(row: Row): # Row{'word':'hello', 'cnt':2}
"""处理逻辑"""
conn = pymysql.connect(
host="node1",
port=3306,
user='root',
password='123456'
)
cursor = conn.cursor()

# 从Row对象获取结果
"""
replace into类似insert into,但是会根据主键判断某个主键是否存在,如果存在则替换,不存在则插入
"""
sql = f"replace into pyspark.tb_wordcount values ('{row.word}', {row.cnt})"

print(sql)

try:
# 使用 execute 方法sql语句查询
cursor.execute(sql)
# 提交结果
conn.commit()

except Exception as e:
# 输出捕获的异常
print(e)
finally:
# 关闭数据库连接
cursor.close()
conn.close()


# step3: 保存结果 :实时的程序7*24工作 不停的
"""
foreach(process_row) :这里只需要写函数的名字,不需要写括号
"""
res_df.writeStream \
.outputMode("complete") \
.foreach(process_row) \
.start() \
.awaitTermination()

FileSource案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import time

from pyspark import SparkContext, SparkConf
import os
import sys
from pyspark.sql import SparkSession

# 1、设置环境变量
from pyspark.sql.functions import *

os.environ['JAVA_HOME'] = '/export/server/jdk'
os.environ['SPARK_HOME'] = '/export/server/spark' # Spark安装位置
os.environ["PYSPARK_PYTHON"] = "/export/server/anaconda3/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/export/server/anaconda3/bin/python3"

if __name__ == '__main__':
print("WordCount")

# 1、在Python代码中入口类对象:SparkSession
"""
.appName:应用的名字
config("spark.sql.shuffle.partitions", 2) spark经过shuffle之后重新分配的分区数,模式是200个
getOrCreate() :如果代码前文已经创建了该对象,则获取,否则创建
"""
spark = SparkSession \
.builder \
.master('local[2]') \
.appName("wordcount_1") \
.config("spark.sql.shuffle.partitions", 2) \
.getOrCreate()

# 2, 从FileSource中读取数据转为DataFrame(监控目录中的文件, 如果有文件则会立即处理
input_df = spark\
.readStream\
.format('csv')\
.option('sep', ';') \
.schema('naem string, age int, hobby string')\
.load('file:///root/streaming')

# 3, 对数据进行分析
rs_df = input_df\
.where('age < 25')\
.groupby('hobby')\
.agg(count('*').alias('cnt'))

# 4, 对结果进行处理
rs_df.writeStream\
.outputMode('Complete')\
.format('console')\
.start()\
.awaitTermination()

File Sink案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import time

from pyspark import SparkContext, SparkConf
import os
import sys
from pyspark.sql import SparkSession

# 1、设置环境变量
from pyspark.sql.functions import *

os.environ['JAVA_HOME'] = '/export/server/jdk'
os.environ['SPARK_HOME'] = '/export/server/spark' # Spark安装位置
os.environ["PYSPARK_PYTHON"] = "/export/server/anaconda3/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/export/server/anaconda3/bin/python3"

if __name__ == '__main__':
print("WordCount")

# 1、在Python代码中入口类对象:SparkSession
"""
.appName:应用的名字
config("spark.sql.shuffle.partitions", 2) spark经过shuffle之后重新分配的分区数,模式是200个
getOrCreate() :如果代码前文已经创建了该对象,则获取,否则创建
"""
spark = SparkSession \
.builder \
.master('local[2]') \
.appName("wordcount_1") \
.config("spark.sql.shuffle.partitions", 2) \
.getOrCreate()

# 从Socket端口获取数据(一般是固定格式, 记住不记住都行), 转为DataFrame
input_df = spark\
.readStream\
.format('socket')\
.option('host', 'node1')\
.option('port', 9999)\
.load()
# 定义黑名单列表
filter_element = ['?', ',', '.', '!', ' ', '-']
# 对数据进行处理(实现过滤, 将每一行的元素炸开)
rs_df = input_df\
.select(explode(split('value', '\\s+')).alias('word'))\
.filter(~col('word').isin(filter_element))

# 保存结果, 打印或者保存
rs_df\
.writeStream\
.outputMode('append')\
.format('csv')\
.option('path', 'file:///root/file_sink')\
.option('checkpointLocation','file:///root/file_sink_check_point')\
.start().awaitTermination()

Foreach Sink案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import time

from pyspark import SparkContext, SparkConf, Row
import os
import sys
from pyspark.sql import SparkSession

# 1、设置环境变量
from pyspark.sql.functions import *
import pymysql

os.environ['JAVA_HOME'] = '/export/server/jdk'
os.environ['SPARK_HOME'] = '/export/server/spark' # Spark安装位置
os.environ["PYSPARK_PYTHON"] = "/export/server/anaconda3/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/export/server/anaconda3/bin/python3"

if __name__ == '__main__':
print("WordCount")

# 1、在Python代码中入口类对象:SparkSession
"""
.appName:应用的名字
config("spark.sql.shuffle.partitions", 2) spark经过shuffle之后重新分配的分区数,模式是200个
getOrCreate() :如果代码前文已经创建了该对象,则获取,否则创建
"""
spark = SparkSession \
.builder \
.master('local[2]') \
.appName("wordcount_1") \
.config("spark.sql.shuffle.partitions", 2) \
.getOrCreate()

# 从网络接口中读取数据(封装成无边界的DataFrame
input_df = spark\
.readStream\
.format('socket')\
.option('host', 'node1')\
.option('port', 9999).load()

# 处理数据, 使用DSL或者SQL来处理
res_df = input_df\
.select(explode(split('value', '\\s+')).alias('word'))\
.groupBy(col('word'))\
.agg(count(col('word')).alias('cnt'))

"""
方式1:直接使用process_row函数方式:定义一个方法来调用
"""
"""
process_row(row:Row):系统会自动将rs_df的每个结果封装成Row对象,传入参数
"""


def process_row(row: Row): # Row{'word':'hello', 'cnt':2}
"""处理逻辑"""
conn = pymysql.connect(
host="node1",
port=3306,
user='root',
password='123456'
)
cursor = conn.cursor()

# 从Row对象获取结果
"""
replace into类似insert into,但是会根据主键判断某个主键是否存在,如果存在则替换,不存在则插入
"""
sql = f"replace into pyspark.tb_wordcount values ('{row.word}', {row.cnt})"

print(sql)

try:
# 使用 execute 方法sql语句查询
cursor.execute(sql)
# 提交结果
conn.commit()

except Exception as e:
# 输出捕获的异常
print(e)
finally:
# 关闭数据库连接
cursor.close()
conn.close()


# step3: 保存结果 :实时的程序7*24工作 不停的
"""
foreach(process_row) :这里只需要写函数的名字,不需要写括号
"""
res_df.writeStream \
.outputMode("complete") \
.foreach(process_row) \
.start() \
.awaitTermination()

Structured Streaming支持的OutputMode

OutputMode方式

  • append

    1
    2
    3
    4
    5
    1、该模式用于无状态计算
    2、该模式下不能有数据的聚合
    3、该模式主要用于数据清洗

    = outputMode('append')
  • complete !!!

    1
    2
    3
    4
    5
    1、该模式用于有状态计算
    2、该模式下可以有数据的聚合
    3、该模式主要用于数据分析

    = outputMode('complete')
  • update

    1
    2
    3
    4
    5
    6
    1、该模式用于有状态计算
    2、该模式下可以有数据的聚合,但是不能排序
    3、该模式只能每次输出更新的或者新增数据
    4、应用场景待定

    = outputMode('update')

Linux端口占用解决

1
2
3
4
5
6
#1、查找被占用端口9999的进程号
netstat -nltp | grep 9999

=tcp 0 0 0.0.0.0:9999 0.0.0.0:* LISTEN 10239/nc
#2、杀死进程
kill -9 10239