在开始之前先贴几个将IP地址转换成地理位置的网站

http://ip-api.com/json/117.136.12.79?lang=zh-CN
https://opendata.baidu.com/api.php?query=120.10.33.48&co=&resource_id=6006&oe=utf8
https://whois.pconline.com.cn/ipJson.jsp?ip=120.10.33.48&json=true
http://ip.ws.126.net/ipquery?ip=117.136.12.79

将IP地址转换成地理位置

除了上述网站, 还可以使用离线的方式

1
pip install geoip2  -i https://pypi.tuna.tsinghua.edu.cn/simple/

正则表达式解析Nginx日志

  • 格式

image-20230524125654217

具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import re

# 要匹配的字符在开头<re.Match object; span=(0, 3), match='www'>
print(re.match('www', 'www.itcast.cn'))
# 要匹配的字符不在开头(None-匹配失败
print(re.match('com', 'www.itcast.com'))

line = "Cats are smarter than dogs"
matchObj = re.match(r'(.*) are (.*?).*', line, re.M | re.I)
print(matchObj) # <re.Match object; span=(0, 26), match='Cats are smarter than dogs'>

if matchObj:
print("matchObj.group() :", matchObj.group())
print("matchObj.group(1):", matchObj.group(1))
print("matchObj.group(2):", matchObj.group(2))
else:
print("NO!")
print(re.search('www', 'www.itcast.cn'))
# search也是从左向右匹配, 直到匹配到第一个值(默认为非贪婪匹配)
print(re.search('com', 'www.itheima.com'))

# Python中的Nginx日志解析方法
obj = re.compile(
r'(?P<ip>.*?) - - \[(?P<time>.*?)\] "(?P<request>.*?)" (?P<status>.*?) (?P<bytes>.*?) "(?P<referer>.*?)" "(?P<ua>.*?)" "(?P<proxy_address>.*)"')

nginx_log = [
'224.220.73.29 - - [07/May/2022:11:40:36 +0800] "GET /pa18shopnst/nstShop/128.html HTTP/1.1" 200 419 "http://baoxian.htv.com/pa18shopnst/" "Mozilla/5.0 (X11; OpenBSD i386) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36" "-"',
'224.192.169.90 - - [07/May/2022:11:40:36 +0800] "GET /zaixiangoumai/chexian/chexian.shtml HTTP/1.1" 200 964 "http://baoxian.htv.com?item=室内财产全方位保障" "Mozilla/5.0 (Linux; Android 10; PACM00 Build/QP1A.190711.020; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0" "-"',
'67.17.10.89 - - [07/May/2022:11:40:36 +0800] "GET /pa18shopnst/nstShop/index.html#/productInfo/resources/list?type=FILE&_t=0.7003008955390666 HTTP/1.1" 200 487 "http://baoxian.htv.com?item=汽车保险" "Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)" "-"',
]

for line in nginx_log:
result = obj.match(line)
ip = result.group('ip')
time = result.group('request')
request = result.group("request")
status = result.group("status")
bytes = result.group("bytes")
referer = result.group("referer")
ua = result.group("ua")
proxy_address = result.group("proxy_address")
print("nginx解析日志后: ip:{ip}, time:{time}, request:{request}, status:{status}, bytes:{bytes}, referer:{referer}, ua:{ua}, proxy_address:{proxy_address}".format(ip=ip, time=time, request=request, status=status, bytes=bytes, referer=referer, ua=ua,proxy_address=proxy_address))

Spark结构化流从Kafka中读取数据

首先通过Python模拟Nginx日志, 将日志写入到Kafka中, 再通过PySpark读取Kafka中的数据消费.

具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# 解析Nginx日志
from pyspark.sql import SparkSession

from UserProfile.online.utils import ConfigLoader
import pyspark.sql.functions as F
import os
SPARK_HOME = "/export/server/spark"
PYSPARK_HOME = "/root/anaconda3/envs/pyspark_env/bin/python"

os.environ['SPARK_HOME'] = SPARK_HOME
os.environ['PYSPARK_HOME'] = PYSPARK_HOME

if __name__ == '__main__':
# 创建Spark运行环境
spark = SparkSession\
.builder\
.appName('解析Nginx日志')\
.master('local[*]')\
.getOrCreate()
# 从Kafka中读取数据
host = ConfigLoader.getKafkaConfig('bootstrapServerHost')
port = ConfigLoader.getKafkaConfig('bootstrapServerPort')
topic = ConfigLoader.getKafkaConfig('nginx_topic')

# 通过selectExpr主要是将从Kafka中读取的二进制数据转换成字符串数据
kafka_df = spark\
.readStream\
.format('kafka')\
.option('kafka.bootstrap.servers', f"{host}:{port}")\
.option('subscribe', topic)\
.option('startingOffsets', 'earliest')\
.load()\
.selectExpr('cast(value as string)')

# 正则表达式解析字符串(每次匹配到的字符就用一个分组标识标记
java_regexp_str: str = '(?<ip>\d+\.\d+\.\d+\.\d+) (- - \[)(?<datetime>[\s\S]+)(?<t1>\][\s"]+)(?<request>[A-Z]+) (?<url>[\S]*) (?<protocol>[\S]+)["] (?<code>\d+) (?<sendbytes>\d+) ["](?<refferer>[\S]*) ["](?<useragent>[\S\s]+)["] ["](?<proxyaddr>[\S\s]+)["]'
java_reg_sql_str: str = '(?<ip>\\\\d+\\\\.\\\\d+\\\\.\\\\d+\\\\.\\\\d+) (- - \\\\[)(?<datetime>[\\\\s\\\\S]+)(?<t1>\\\\][\\\\s"]+)(?<request>[A-Z]+) (?<url>[\\\\S]*) (?<protocol>[\\\\S]+)["] (?<code>\\\\d+) (?<sendbytes>\\\\d+) ["](?<refferer>[\\\\S]*) ["](?<useragent>[\\\\S\\\\s]+)["] ["](?<proxyaddr>[\\\\S\\\\s]+)["]'

nginx_df = kafka_df.select(
F.regexp_extract("value", java_regexp_str, 1).alias("real_ip"),
F.regexp_extract("value", java_regexp_str, 3).alias("access_time"),
F.regexp_extract("value", java_regexp_str, 5).alias("req_method"),
F.regexp_extract("value", java_regexp_str, 6).alias("request_uri"),
F.regexp_extract("value", java_regexp_str, 8).alias("status"),
F.regexp_extract("value", java_regexp_str, 9).alias("bytes"),
F.regexp_extract("value", java_regexp_str, 10).alias("referer_url"),
F.regexp_extract("value", java_regexp_str, 11).alias("ua"),
F.regexp_extract("value", java_regexp_str, 12).alias("proxy_address")
)

# 输出读取的数据到控制台(只能通过sink, truncate代表省略(false是不省略
nginx_df.writeStream\
.format('console')\
.outputMode('append')\
.option('truncate', 'false')\
.option('rowsNum', 100)\
.start()\
.awaitTermination()

Spark从Kafka消费数据并将结果写入到MySQL

因为从Kafka中读取的数据是结构化流, 不能直接写入到MySQL, 需要将数据通过foreachbach来分批写入

比较妙的几个点

  • 写SparkSession就必须在开始写上Spark运行环境
  • 解析Nginx日志的正则表达式是全球统一的说
  • 函数中嵌套udf函数, 外部函数返回了udf函数
  • F.first是取(某个列的)第一个值
  • 结构化流数据不能直接写入MySQL, 要使用df.foreachBatch(save2mysql)其中save2mysql是自己定义的函数(不是udf函数)

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import datetime
import os

import user_agents
from pyspark.sql import SparkSession, DataFrame

from com.itheima.online.utils import ConfigLoader
import pyspark.sql.functions as F

SPARK_HOME = "/export/server/spark"
PYSPARK_HOME = "/root/anaconda3/envs/pyspark_env/bin/python"

os.environ['SPARK_HOME'] = SPARK_HOME
os.environ['PYSPARK_HOME'] = PYSPARK_HOME

if __name__ == '__main__':
# 1、创建spark运行环境
spark = SparkSession.builder \
.appName('spark结构化流处理nginx日志') \
.master('local[*]') \
.config('spark.sql.shuffle.partitions', 20) \
.getOrCreate()

# 2、从kafka中读取数据
host = ConfigLoader.getKafkaConfig('bootstrapServerHost')
port = ConfigLoader.getKafkaConfig('bootstrapServerPort')
topic = ConfigLoader.getKafkaConfig('nginx_topic')

kafka_df = spark.readStream.format('kafka') \
.option('kafka.bootstrap.servers', f"{host}:{port}") \
.option('subscribe', topic) \
.option('startingOffsets', 'earliest') \
.load() \
.selectExpr('cast(value as string)')

# 3、数据ETL
# 正则表达式
java_regexp_str: str = '(?<ip>\d+\.\d+\.\d+\.\d+) (- - \[)(?<datetime>[\s\S]+)(?<t1>\][\s"]+)(?<request>[A-Z]+) (?<url>[\S]*) (?<protocol>[\S]+)["] (?<code>\d+) (?<sendbytes>\d+) ["](?<refferer>[\S]*) ["](?<useragent>[\S\s]+)["] ["](?<proxyaddr>[\S\s]+)["]'

nginx_df = kafka_df.select(
F.regexp_extract("value", java_regexp_str, 1).alias("real_ip"),
F.regexp_extract("value", java_regexp_str, 3).alias("access_time"),
F.regexp_extract("value", java_regexp_str, 5).alias("req_method"),
F.regexp_extract("value", java_regexp_str, 6).alias("request_uri"),
F.regexp_extract("value", java_regexp_str, 8).alias("status"),
F.regexp_extract("value", java_regexp_str, 9).alias("bytes"),
F.regexp_extract("value", java_regexp_str, 10).alias("referer_url"),
F.regexp_extract("value", java_regexp_str, 11).alias("ua"),
F.regexp_extract("value", java_regexp_str, 12).alias("proxy_address")
).where(F.length("real_ip") >= 7)

# 4、根据需求计算指标
# 计算出pv,uv,根据ip解析出地理位置,得到请求状态码,根据ua解析得到终端设备、设备名牌、使用的浏览器

@F.udf
def ip2Address(ip):
import requests
result = requests.get('https://opendata.baidu.com/api.php?query=%s&co=&resource_id=6006&oe=utf8' % ip) \
.content
import json
# 将json字符串转换为json对象
res_json = json.loads(result, encoding='utf-8')
location: str = res_json.get('data')[0].get('location')

if location.__contains__('本地'):
return '本地局域网'
else:
return location.split(' ')[0]


@F.udf
def parseAccessTime(access_time: str):
return datetime.datetime.strptime(access_time.replace(' +0800', ''), '%d/%b/%Y:%H:%M:%S') \
.strftime('%Y-%m-%d %H:%M:%S')



def parseUA(type):
"""
这不是一个udf方法,不能处理列
:param type:
:return:
"""
@F.udf
def udfParseUA(ua_str):
"""
这是一个udf方法用来处理ua列
:param ua_str:
:return:
"""
ua = user_agents.parse(ua_str)
if type == 'device':
return ua.device.family
elif type == 'os':
return ua.device.brand
elif type == 'browser':
return ua.browser.family

return udfParseUA


result_df = nginx_df.groupby(F.col('real_ip').alias('ip')).agg(
F.count('real_ip').alias('pv'),
F.lit(1).alias('uv'),
F.lit(ip2Address('real_ip')).alias('area'),
F.first(parseAccessTime('access_time')).alias('access_time'),
F.first(parseUA('device')('ua')).alias('device_terminal'),
F.first(parseUA('os')('ua')).alias('device_brand'),
F.first(parseUA('browser')('ua')).alias('browser_name'),
F.first('status').alias('status_code')
)



# 5、将日志计算结果保存到mysql中

# 定义foreachBath方法
def save2mysql(df: DataFrame, batch_id):
"""
将结构化流转换批df进行写入mysql
:param df: 本次操作的数据
:param batch_id: 本批次的id
:return:
truncate: true-每次清空表中数据
isolationLevel: None-不使用隔离级别
batchsize: 10- 每10条数据写一次结果表
"""
df.write \
.mode("overwrite") \
.format("jdbc") \
.option("driver", "com.mysql.jdbc.Driver") \
.option("url", "jdbc:mysql://up01:3306?rewriteBatchedStatements=true") \
.option("dbtable", "htv_insurance_stream_db.nginx_log_result") \
.option("user", "root") \
.option("password", "123456") \
.option("truncate", "true") \
.option("useSSL", "false") \
.option("isolationLevel", "NONE") \
.option("batchsize", "10") \
.save()


result_df.writeStream\
.foreachBatch(save2mysql)\
.outputMode('complete')\
.start()\
.awaitTermination()

用户日志处理

需求分析

1
2
3
4
5
6
7
8
9
10
11
12
13
1、统计用户访问的区域TopN user_id, province或city(根据用户id分组,统计区域次数 count),别名为:province_area_num
2、统计用户使用的网络运营商TopN user_id, service_provider(根据用户id分组,统计网络运营商次数 count),别名为:service_provider_num
3、被访问页面,停留时长 (小时) user_id, sum(to_time - browse_time) -> 两两累加(根据用户id分组,统计业务在访问页面的停留时长),别名为:stay_duration
4、用户浏览页面排行TopN user_id, browse_page(用户浏览页面排行TopN user_id, browse_page),别名为:browse_page_num
5、用户浏览行为总数TopN user_id, is_browse = 1(根据用户id分组,统计浏览行为出现次数 sum),别名为:browse_num
6、用户下单行为总数TopN user_id, is_order = 1(根据用户id分组,统计下单行为出现次数 sum),被名为order_num
7、用户支付行为总数TopN user_id, is_buy = 1 (根据用户id分组,统计支付行为出现次数 sum),别名为:pay_num
8、用户退单行为总数TopN user_id, is_back_order = 1(根据用户id分组,统计退单行为出现次数 sum),别名为:back_order_num
9、用户理赔行为总数TopN user_id, is_claim = 1(根据用户id分组,统计理赔行为出现次数 sum),别名为:claim_num
10、根据用户id分组,显示商品类型,别名为:goods_type_name
11、根据用户id分组,显示minimum_price,别名为:min_price
12、根据用户id分组,显示page_keywords,别名为:page_keyword
13、根据用户id分组,显示visit_time访问时间,别名为:visit_time

步骤

  • 实时加载Kafka中的数据
  • 对日志数据按Json解析
  • F.get_json_object(列名, ‘$.key’)
    • 只能单列处理
    • 可以处理嵌套的json
  • F.json_tuple(列名, ‘key1’, ‘key2’…….)
    • 可以处理多列,列名默认c0, c1…..
    • 该方法不能匹配嵌套的json
    • 该方法每个select语句中只能使用一次
  • 按需求统计相应的指标

具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import datetime
import os

import user_agents
from pyspark.sql import SparkSession, DataFrame

from com.itheima.online.utils import ConfigLoader
import pyspark.sql.functions as F

SPARK_HOME = "/export/server/spark"
PYSPARK_HOME = "/root/anaconda3/envs/pyspark_env/bin/python"

os.environ['SPARK_HOME'] = SPARK_HOME
os.environ['PYSPARK_HOME'] = PYSPARK_HOME

if __name__ == '__main__':
# 1、创建spark运行环境
spark = SparkSession.builder \
.appName('spark结构化流处理nginx日志') \
.master('local[*]') \
.config('spark.sql.shuffle.partitions', 20) \
.getOrCreate()

# 2、从kafka中读取数据
host = ConfigLoader.getKafkaConfig('bootstrapServerHost')
port = ConfigLoader.getKafkaConfig('bootstrapServerPort')
topic = ConfigLoader.getKafkaConfig('event_topic')

kafka_df = spark.readStream.format('kafka') \
.option('kafka.bootstrap.servers', f"{host}:{port}") \
.option('subscribe', topic) \
.option('startingOffsets', 'earliest') \
.load() \
.selectExpr('cast(value as string)')

# 3、数据的ETL
# {"phone_num": "14989149320","system_id": "6XojKO57mOI","area": {"province":"江苏", "city":"南京", "sp":"电信"},"user_name": "薛丹丹","user_id": "9083-9855635","visit_time": "2023-08-11 10:47:34","goods_type": "厨卫电器","minimum_price": 5168.51,"user_behavior": {"is_browse":1, "is_order":0, "is_buy":0, "is_back_order":0, "is_claim":0},"goods_detail": {"goods_name":"三明治机/早餐机", "browse_page":"https://baoxian.sanyou.com/product/eshengsanyoubaiwanyiqiyeban.shtml?WT.mc_id=T00-WT-PAHOME-baoxian-qiyexian-all", "browse_time":"2023-08-11 10:22:34", "to_page":"https://baoxian.sanyou.com/product/quanqiujiaotongyiwaibaoxian.shtml?WT.mc_id=T00-WT-PAHOME-PCGW", "to_time":"2023-08-11 10:47:34", "page_keywords":"限量版"}}
etl_df = kafka_df.select(F.json_tuple('value', 'phone_num', 'system_id', 'area', 'user_name', 'user_id',
'visit_time', 'goods_type', "minimum_price", "user_behavior", "goods_detail")
.alias('phone_num', 'system_id', 'area', 'user_name', 'user_id',
'visit_time', 'goods_type', "minimum_price", "user_behavior", "goods_detail")
) \
.select('*', F.json_tuple('area', 'province', 'city', 'sp').alias('province', 'city', 'service_provider')) \
.select('*', F.json_tuple('user_behavior', "is_browse", "is_order", "is_buy", "is_back_order", "is_claim")
.alias("is_browse", "is_order", "is_buy", "is_back_order", "is_claim")) \
.select('*', F.json_tuple('goods_detail', "goods_name", "browse_page", "browse_time", "to_page", "to_time",
"page_keywords").alias("goods_name", "browse_page", "browse_time", "to_page",
"to_time",
"page_keywords")) \
.selectExpr("cast(phone_num as string) phone_num",
"cast(system_id as string) system_id",
"cast(province as string) province",
"cast(city as string) city",
"cast(service_provider as string) service_provider",
"cast(user_name as string) user_name",
"cast(user_id as string) user_id",
"cast(visit_time as string) visit_time",
"cast(goods_type as string) goods_type",
"cast(minimum_price as float) minimum_price",
"cast(is_browse as smallint) is_browse",
"cast(is_order as smallint) is_order",
"cast(is_buy as smallint) is_buy",
"cast(is_back_order as smallint) is_back_order",
"cast(is_claim as smallint) is_claim",
"cast(goods_name as string) goods_name",
"cast(browse_page as string) browse_page",
"cast(browse_time as string) browse_time",
"cast(to_page as string) to_page",
"cast(to_time as string) to_time",
"cast(page_keywords as string) page_keywords")

# 4、指标计算
# 1、统计用户访问的区域TopN user_id, province或city(根据用户id分组,统计区域次数 count),别名为:province_area_num
area_agg = F.approxCountDistinct('province').alias('province_area_num')
# 2、统计用户使用的网络运营商TopN user_id, service_provider(根据用户id分组,统计网络运营商次数 count),别名为:service_provider_num
sp_agg = F.approxCountDistinct('service_provider').alias('service_provider_num')
# 3、被访问页面,停留时长 (小时) user_id, sum(to_time - browse_time) -> 两两累加(根据用户id分组,统计业务在访问页面的停留时长),别名为:stay_duration
stay_duration_agg = F.sum(F.unix_timestamp(F.col('to_time')) - F.unix_timestamp(F.col('browse_time'))).alias(
'stay_duration')
# 4、用户浏览页面排行TopN user_id, browse_page(用户浏览页面排行TopN user_id, browse_page),别名为:browse_page_num
browse_num_agg = F.count('browse_page').alias('browse_page_num')
# 5、用户浏览行为总数TopN user_id, is_browse = 1(根据用户id分组,统计浏览行为出现次数 sum),别名为:browse_num
is_browse_agg = F.sum(F.col('is_browse')).alias('browse_num')
# 6、用户下单行为总数TopN user_id, is_order = 1(根据用户id分组,统计下单行为出现次数 sum),被名为order_num
is_order_agg = F.sum(F.col('is_order')).alias('order_num')
# 7、用户支付行为总数TopN user_id, is_buy = 1 (根据用户id分组,统计支付行为出现次数 sum),别名为:pay_num
is_pay_agg = F.sum(F.col('is_buy')).alias('pay_num')
# 8、用户退单行为总数TopN user_id, is_back_order = 1(根据用户id分组,统计退单行为出现次数 sum),别名为:back_order_num
is_back_order_agg = F.sum(F.col('is_back_order')).alias('back_order_num')
# 9、用户理赔行为总数TopN user_id, is_claim = 1(根据用户id分组,统计理赔行为出现次数 sum),别名为:claim_num
is_claim_agg = F.sum(F.col('is_claim')).alias('claim_num')
# 10、根据用户id分组,显示商品类型,别名为:goods_type_name
goods_type_agg = F.first('goods_type').alias('insurance_type_name')
# 11、根据用户id分组,显示minimum_price,别名为:min_price
min_price_agg = F.min('minimum_price').alias('min_price')
# 12、根据用户id分组,显示page_keywords,别名为:page_keyword
page_keyword_agg = F.first('page_keywords').alias('page_keyword')
# 13、根据用户id分组,显示visit_time访问时间,别名为:visit_time
visit_time_agg = F.first('visit_time').alias('visit_time')

result_df = etl_df.groupby('user_id').agg(
area_agg, sp_agg,
stay_duration_agg,
browse_num_agg,
is_browse_agg,
is_order_agg,
is_pay_agg,
is_back_order_agg,
is_claim_agg,
goods_type_agg,
min_price_agg,
page_keyword_agg,
visit_time_agg
).orderBy(F.col("province_area_num").desc(),
F.col("service_provider_num").desc(),
F.col("stay_duration").desc(),
F.col("browse_page_num").desc(),
F.col("browse_num").desc(),
F.col("order_num").desc(),
F.col("pay_num").desc(),
F.col("back_order_num").desc(),
F.col("claim_num").desc(),
F.col("min_price").asc())


# 5、将结果写入mysql
# 定义foreachBath方法
def save2mysql(df: DataFrame, batch_id):
"""
将结构化流转换批df进行写入mysql
:param df: 本次操作的数据
:param batch_id: 本批次的id
:return:
truncate: true-每次清空表中数据
isolationLevel: None-不使用隔离级别
batchsize: 10- 每10条数据写一次结果表
"""
df.write \
.mode("overwrite") \
.format("jdbc") \
.option("driver", "com.mysql.jdbc.Driver") \
.option("url", "jdbc:mysql://up01:3306?rewriteBatchedStatements=true") \
.option("dbtable", "htv_insurance_stream_db.user_event_result") \
.option("user", "root") \
.option("password", "123456") \
.option("truncate", "true") \
.option("useSSL", "false") \
.option("isolationLevel", "NONE") \
.option("batchsize", "10") \
.save()


result_df.writeStream \
.foreachBatch(save2mysql) \
.outputMode('complete') \
.start() \
.awaitTermination()