标签计算流程

总结

  • 1, 根据四级标签的ID读取四级标签和五级标签的元数据tags5_df

  • 2, 解析四级标签的rule->rule_meta对象

    • 按’##‘切分生成一个list
    • 遍历list按’=‘切分’生成一个dict
    • 将dic转化为rule_meta对象
  • 3, 根据解析的四级标签的rule读取标签计算的需要的数据es_df

  • 取出五级标签的rule和id与es_df进行匹配计算,得到一个标签结果new_result_df(userId, tagsId)

  • 从结果索引tfec_userprofile_result中取出老的标签结果old_result_df

  • 将老标签结果中属于本次计算的五级标签id集合中的id剔除,再加入newTagId生成最终的结果result_df

  • 将最终结果result_df以upsert方式写入到es的结果索引中

年龄段标签

  • 根据出生日期的范围划分标签
  • 使用F.regexp_replace将birthday转化yyyyMMdd的格式
  • 使用F.split将五级标签的rule转化start和end
  • 使用birthday与start和end进行非等值join

性别标签

  • 使用F.withColumn增加新列
  • 使用F.lit给新列赋固定值(五级标签id的集合)

年龄标签计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
"""
【总结代码】流程:
#0.准备Spark开发环境
#1.读取MySQL中的数据
#2.读取和年龄段标签相关的4级标签rule并解析
#3.根据解析出来的ruel读取ES数据
#4.读取和年龄段标签相关的5级标签(根据4级标签的id作为pid查询)
#5.根据ES数据和5级标签数据进行匹配,得出userId,tagsId
#5-1统一格式,将1999-09-09统一为:199909095
#5-2将fiveDS拆分为("tagsId","start","end")
#5-3将ES和fiveDS2直接join
#6.将最终结果写到ES
"""

from pyspark.sql import SparkSession
import os

from pyspark.sql.types import StringType

from UserProfile.offline.pojo.RuleMeta import Rule4Meta
import pyspark.sql.functions as F

# 准备Spark开发环境
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_HOME'] = '/root/anaconda3/envs/pyspark_env/bin/python'

# 解析四级标签(将字符串类型转变为dict类型
def strParse(rule_str:str):
# 将字符串按照##切割, 再按照=切割, 最后生成一个字典并返回
all_element_list = rule_str.split("##")
# 定义空字典,用于返回
rule_four_dict = {}
# 遍历列表, 将元素切割(并放到字典中
for element in all_element_list:
rule_four_dict[element.split("=")[0]] = element.split("=")[1]
return rule_four_dict

if __name__ == '__main__':
spark = SparkSession\
.builder\
.master("local[*]")\
.appName('获取MySQL的数据到ES中,如id, 年龄(标签id)')\
.config("spark.sql.shuffle.partitions", 10)\
.getOrCreate()
# 配置MySQL的相关参数
url = "jdbc:mysql://192.168.88.166:3306/tfec_tags?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false&user=root&password=123456"
tableName = "tbl_basic_tag"
tag4Id = 14
sql = f"select id,rule from {tableName} where id = {tag4Id} or pid = {tag4Id}"

# 读取和年龄相关的四级标签的rule并解析(读取MySQL中的内容
meta_df = spark\
.read\
.format('jdbc')\
.option('url', url)\
.option('query', sql)\
.load()
# 另一种读取方法(使用Spark.read.jdbc(url='xxx', table='xxx').select.where
# es_df = spark\
# .read\
# .jdbc(url=url, table=tableName)\
# .select('id', 'rule')\
# .where('id = 14 or pid = 14')
# meta_df.show()

#2.读取和年龄段标签相关的4级标签rule并解析(取出四级标签的row)- 将DataFrame转为RDD
es_rdd = meta_df.rdd.map(lambda row: row.rule) # 取出所有Row的rule数据
tag4_rule_str = es_rdd.collect()[0] # 这个值是一个字符串
print(tag4_rule_str)

# 通过对象名.属性名的方式调用 需要读取的数据库名名称, 需要挑选的字段(这样不会出错
rule4_dict = strParse(tag4_rule_str)
# 创建对象, 使用对象名.方法名调用
rule_four_meta = Rule4Meta.dict_to_obj(rule4_dict)
print(rule_four_meta.esNodes,rule_four_meta.selectFields, rule_four_meta.esIndex)

# 3.根据解析出来的ruel读取ES数据(别手写, 容易出错 field没有s)
es_df = spark\
.read\
.format('es')\
.option('es.resource',rule_four_meta.esIndex)\
.option('es.nodes', rule_four_meta.esNodes)\
.option('es.read.field.include', rule_four_meta.selectFields)\
.load()
# es_df.show() id就是用户id
# +-------------------+---+
# | birthday| id|
# +-------------------+---+
# |1992-05-31 00:00:00| 1|
# |1983-10-11 00:00:00| 2|
# |1970-11-22 00:00:00| 3|

# 4.读取和年龄段标签相关的5级标签(根据4级标签的id作为pid查询)
tag5_df = meta_df.where(f'id!={tag4Id}')
# tag5_df.show()
# +---+-----------------+
# | id| rule|
# +---+-----------------+
# | 15|19500101-19591231|
# | 16|19600101-19691231|
# | 17|19700101-19791231|

# 5.根据ES数据和5级标签数据进行匹配,得出userId,tagsId
# 5-1统一格式,将1999-09-09统一为:199909095
es_pre_df = es_df\
.select(
F.col('id').alias('userId'),
F.regexp_replace("birthday",'-','')[0:8].alias('birthday')
)
# es_pre_df.show()
# +------+--------+
# |userId|birthday|
# +------+--------+
# | 1|19920531|
# 5-2将tag5_df拆分为("tagsId","start","end")
tag5_pre_df = tag5_df.select(
F.col('id').alias('tagsId'),
F.split('rule','-')[0].alias('start'),
F.split('rule','-')[1].alias('end')
)
# 5-3将ES和tag5_df直接join
result_df = es_pre_df\
.join(tag5_pre_df,
on=es_pre_df.birthday.between(tag5_pre_df.start, tag5_pre_df.end),
how='left')\
.select(
F.col('userId').cast(StringType()).alias('userId'),
F.col('tagsId').cast(StringType()).alias('tagsId')
)
# result_df.show()
# +------+------+
# |userId|tagsId|
# +------+------+
# | 3| 17|
# | 7| 17|

# 6.将最终结果写到ES
result_index = 'tfec_userprofile_result'
result_df.write.format('es') \
.option('es.nodes', rule_four_meta.esNodes) \
.option('es.resource', result_index) \
.option('es.write.operation', 'upsert') \
.option('es.mapping.id', 'userId') \
.option('es.mapping.name', "userId:userId,tagsId:tagsId") \
.mode('append') \
.save()

面向对象方式创建对象

使用对象名.属性的方法调用一些东西很方便.

RuleMeta.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from dataclasses import dataclass

# 使用装饰器的方式来创建类,是为了不手动写全参构造函数
@dataclass
class Rule4Meta():
# 定义四级标签的封装类
inType: str
esIndex: str
esNodes: str
esType: str
selectFields: str

def dict_to_obj(rule_four_dict: dict):
# 传入四级标签的rule(类型是dict)返回一个对象
return Rule4Meta(
rule_four_dict.get('inType',''),
rule_four_dict.get('esIndex', ''),
rule_four_dict.get('esNodes',''),
rule_four_dict.get('esType', ''),
rule_four_dict.get('selectFields', '')
)

性别标签计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
"""
1.准备Spark开发环境
2.读取MySQL数据
3.读取和性别标签相关的4级标签rule并解析
4.根据4级标签加载ES数据
5.读取和性别标签相关的5级标签(根据4级标签的id作为pid查询)
6.根据ES数据和5级标签数据进行匹配,得出userId,tagsId
7.查询ES中的oldDF
8.合并newDF和oldDF
9.将最终结果写到ES
"""
from pyspark.sql import SparkSession
import os

from pyspark.sql.types import StringType

from UserProfile.offline.pojo.RuleMeta import Rule4Meta
import pyspark.sql.functions as F

# 准备Spark开发环境
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_HOME'] = '/root/anaconda3/envs/pyspark_env/bin/python'

# 自定义udf函数
@F.udf
def updateTagsId(old_tags:str, new_tag:str, tag5_ids:str):
# old_df.tagsId, new_result_df.tagsId, new_result_df.tag5Ids
if new_tag == None:
return old_tags
if old_tags == None:
return new_tag
old_tags_list = old_tags.split(",")
tag5_id_list = tag5_ids.split(',')
new_list = []
for old in old_tags_list:
if old not in tag5_id_list:
new_list.append(old)
new_list.append(new_tag)
return ','.join(new_list)


def strParse(rule_str:str):
# 将字符串按照##切割, 再按照=切割, 最后生成一个字典并返回
all_element_list = rule_str.split("##")
# 定义空字典,用于返回
rule_four_dict = {}
# 遍历列表, 将元素切割(并放到字典中
for element in all_element_list:
rule_four_dict[element.split("=")[0]] = element.split("=")[1]
return rule_four_dict

if __name__ == '__main__':
spark = SparkSession\
.builder\
.master("local[*]")\
.appName('获取MySQL的数据,读取ES中数据,并将分析结果写入到ES')\
.config("spark.sql.shuffle.partitions", 10)\
.getOrCreate()
# 配置MySQL的相关参数
url = "jdbc:mysql://192.168.88.166:3306/tfec_tags?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false&user=root&password=123456"
tableName = "tbl_basic_tag"
tag4Id = 4
sql = f"select id,rule from {tableName} where id = {tag4Id} or pid = {tag4Id}"
tfec_userprofile_result = "tfec_userprofile_result"

# 3.读取和性别标签相关的4级标签rule并解析
tag_four_df = spark.read\
.format('jdbc')\
.option('url', url)\
.option('query', sql)\
.load()
# tag_four_df.show()
#
tag_four_str = tag_four_df.rdd.map(lambda row: row.rule).collect()[0]
tag_four_dict = strParse(tag_four_str)
rule_four_meta = Rule4Meta.dict_to_obj(tag_four_dict)
print(rule_four_meta.esIndex, rule_four_meta.esNodes, rule_four_meta.selectFields)

# 4.根据4级标签加载ES数据
es_df = spark.read.format('es')\
.option('es.nodes', rule_four_meta.esNodes)\
.option('es.resource', rule_four_meta.esIndex)\
.option('es.read.field.include', rule_four_meta.selectFields)\
.load()
# es_df.show()
# +------+---+
# |gender| id|
# +------+---+
# | 2| 1|
# | 1| 2|

# 5.读取和性别标签相关的5级标签(根据4级标签的id作为pid查询)
tag_five_df = tag_four_df.where(f'id!={tag4Id}')
# +---+------+
# | id| rule|
# +---+------+
# | 5| 1|
# | 6| 2|
# +---+------+

# 6.根据ES数据和5级标签数据进行join,得出userId,tagsId
result_df = es_df\
.join(tag_five_df,
on=tag_five_df.rule ==es_df.gender,
how='left')\
.select(es_df.id.cast(StringType()).alias('userId'),
tag_five_df.id.cast(StringType()).alias('tagsId'))
# result_df.show()
# +------+------+
# |userId|tagsId|
# +------+------+
# | 1| 6|
# | 7| 6|
# | 8| 6|

# 7.查询ES中的oldDF
old_df = spark.read.format('es')\
.option('es.nodes', rule_four_meta.esNodes)\
.option('es.resource', tfec_userprofile_result)\
.load()
# old_df.show()
# +------+------+
# |tagsId|userId|
# +------+------+
# | 17| 3|
# | 17| 7|
# | 17| 9|

# 8.合并newDF和oldDF
# 8-1、求出5级标签的id集合
# 先将五级标签的id转化为str
tag5Ids = tag_five_df\
.select(F.col('id').cast(StringType()).alias('id'))\
.rdd.map(lambda row: row.id).collect()
print(tag5Ids)

# 8.2、给new_result_df增加一列赋值为5级标签的id集合
# 使用F.withColumn增加列,F.lit()给列赋固定值
new_result_df = result_df.withColumn('tag5Ids', F.lit(','.join(tag5Ids)))
# new_result_df.show()
# +------+------+-------+
# |userId|tagsId|tag5Ids|
# +------+------+-------+
# | 1| 6| 5,6|
# | 7| 6| 5,6|
# | 8| 6| 5,6|

# 8.3合并两个df, 通过自定义udf函数合并
merge_df = new_result_df\
.join(old_df, on=new_result_df.userId==old_df.userId,how='right')\
.select(old_df.userId.alias('userId'), updateTagsId(old_df.tagsId, new_result_df.tagsId, new_result_df.tag5Ids).alias('tagsId'))

merge_df.show()
# 9.将最终结果写到ES
merge_df\
.write\
.format('es')\
.option('es.nodes', rule_four_meta.esNodes)\
.option('es.resource', tfec_userprofile_result)\
.option('es.write.operation','upsert')\
.option('es.mapping.id','userId')\
.option('es.mapping.name',"userId:userId,tagsId:tagsId") \
.mode('append')\
.save()

全流程

image-20230805092648278

使用基类方式重写年龄, 性别标签计算

年龄标签

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from pyspark.sql.types import StringType

from UserProfile.offline.base.TagComputeBase import TagComputeBase

# 定义年龄标签计算类, 继承自标签计算基类
class AgeTagCompute(TagComputeBase):
def tagCompute(self, es_df: DataFrame, tag5_df: DataFrame):
# es_df.show()
# +-------------------+---+
# | birthday| id|
# +-------------------+---+
# |1992-05-31 00:00:00| 1|
# |1983-10-11 00:00:00| 2|
# |1970-11-22 00:00:00| 3|

# tag5_df.show()
# +---+-----------------+
# | id| rule|
# +---+-----------------+
# | 15|19500101-19591231|
# | 16|19600101-19691231|

es_pre_df = es_df\
.select(es_df.id.alias('userId'),
F.regexp_replace('birthday', '-', '')[0:8].alias('birthday'))
tag5_pre_df = tag5_df\
.select(tag5_df.id.alias('tagsId'), F.split('rule', '-')[0].alias('start'), F.split('rule', '-')[1].alias('end'))
new_result_df = es_pre_df\
.join(tag5_pre_df, on=es_pre_df.birthday.between(tag5_pre_df.start, tag5_pre_df.end), how='left')\
.select(F.col('userId').cast(StringType()).alias('userId'), F.col('tagsId').cast(StringType()).alias('tagsId'))

return new_result_df

if __name__ == '__main__':
appName = '年龄标签计算任务'
tag4Id = 14
# 创建标签计算对象
ageTagCompute = AgeTagCompute(appName, tag4Id)
ageTagCompute.execute()

性别标签

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from pyspark.sql.types import StringType

from UserProfile.offline.base.TagComputeBase import TagComputeBase

# 定义性别标签计算类, 继承自标签计算基类
class SexTagCompute(TagComputeBase):
def tagCompute(self, es_df: DataFrame, tag5_df: DataFrame):
# es_df.show()
# +------+---+
# |gender| id|
# +------+---+
# | 2| 1|
# | 1| 2|
# | 1| 3|

# tag5_df.show()
# +---+----+
# | id|rule|
# +---+----+
# | 5| 1|
# | 6| 2|
new_result_df = es_df\
.join(tag5_df, on=es_df.gender==tag5_df.rule, how='left')\
.select(es_df.id.cast(StringType()).alias('userId'), tag5_df.id.cast(StringType()).alias('tagsId'))
return new_result_df
if __name__ == '__main__':
appName = '性别标签计算任务'
tag4Id = 4
# 创建标签计算对象
sexTagCompute = SexTagCompute(appName, tag4Id)
sexTagCompute.execute()

职业标签

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from pyspark.sql.types import StringType

from UserProfile.offline.base.TagComputeBase import TagComputeBase

# 定义年龄标签计算类, 继承自标签计算基类
class JobTagCompute(TagComputeBase):
def tagCompute(self, es_df: DataFrame, tag5_df: DataFrame):
# es_df.show()
# +---+---+
# | id|job|
# +---+---+
# | 1| 3|
# | 2| 3|
# | 3| 1|

# tag5_df.show()
# +---+----+
# | id|rule|
# +---+----+
# | 8| 1|
# | 9| 2|
new_result_df = es_df\
.join(tag5_df, on=es_df.job == tag5_df.rule, how='left')\
.select(es_df.id.cast(StringType()).alias('userId'), tag5_df.id.cast(StringType()).alias('tagsId'))
return new_result_df

if __name__ == '__main__':
appName = '职业标签计算任务'
tag4Id = 7
# 创建标签计算对象
jobTagCompute = JobTagCompute(appName, tag4Id)
jobTagCompute.execute()