封装基类

代码的重构

代码重构是指对现有代码进行修改和优化,以改善代码的质量、可读性、可维护性和可扩展性,而不改变代码的功能。它可以帮助开发人员更好地理解和维护代码,减少代码中的冗余、重复和复杂性,提高代码的可重用性和可扩展性,从而使代码更加健壮和可靠。重构的目的是使代码更加简洁、易于理解和修改,以提高软件开发的效率和质量。

基类的抽取

所有逻辑一样的步骤可以被每个标签计算任务使用,可以抽取为共同的步骤,注意参数是否相同

  • 1、创建spark的运行环境,逻辑一样,参数不一样:appName
  • 2、读取mysql中的标签信息(四级和五级标签的id和rule),逻辑一样,参数不一样:tag4Id
  • 3、解析四级标签的rule,逻辑一样,参数一样
  • 4、根据四级标签的rule读取es中的数据,逻辑一样,参数一样
  • 5、读取计算需要用到的五级标签的rule和id,逻辑一样,参数一样
  • 6、标签的计算,逻辑不一样,参数一样
  • 7、结果的更新,逻辑一样,参数一样
  • 8、结果的保存,逻辑一样,参数一样
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
"""
#TODO 0.准备Spark开发环境
#TODO 1.读取MySQL中的数据
#TODO 2.读取模型/标签相关的4级标签rule并解析--==标签id不一样==
#TODO 3.根据解析出来的rule读取ES数据
#TODO 4.读取模型/标签相关的5级标签(根据4级标签的id作为pid查询)---==标签id不一样==
#====5.根据ES数据和5级标签数据进行匹配,得出userId,tagsId---==实现代码不一样==
#TODO 6.查询elasticsearch中的oldDF
#TODO 7.合并newDF和oldDF
#TODO 8.将最终结果写到ES
"""
import os

from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F
from pyspark.sql.types import StringType

from UserProfile.offline.pojo.RuleMeta import Rule4Meta

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_HOME'] = '/root/anaconda3/envs/pyspark_env/bin/python'

def strParse(rule4_str: str):
rule4_list = rule4_str.split("##")
rule4_dict = {}
for rule4_meta in rule4_list:
rule4_dict[rule4_meta.split('=')[0]] = rule4_meta.split('=')[1]
return rule4_dict

@F.udf
def updateTagsId(newTagsId, oldTagsId:str, tag5Ids:str):
# new_result_df.tagsId, old_df.tagsId, new_result_df.tag5Ids
tag5_list = tag5Ids.split(",")
need_list = []
for i in oldTagsId.split(","):
if i not in tag5_list:
need_list.append(i)
need_list.append(newTagsId)
return ','.join(need_list)

class TagComputeBase():
def __init__(self, appName, tag_four_id):
self.appName = appName
self.tag_four_id = tag_four_id
self.url = "jdbc:mysql://192.168.88.166:3306/tfec_tags?useSSL=false&useUnicode=true&characterEncoding=utf8&user=root&password=123456"
tableName = 'tbl_basic_tag'
self.sql = f'select id, rule from {tableName} where id = {tag_four_id} or pid = {tag_four_id}'
self.result_index = "tfec_userprofile_result"

# 创建Spark开发环境并返回
def getSparkSession(self):
sparkSession = SparkSession.builder \
.appName(self.appName) \
.master("local[*]") \
.config("spark.sql.shuffle.partitions", 20) \
.getOrCreate()
return sparkSession

# 读取MySQL中的数据(传入的参数是SparkSession对象, 要使用Spark读取MySQL数据
def getTagInfo(self, sparkSession: SparkSession):
tag_df = sparkSession.read\
.format('jdbc')\
.option('url', self.url)\
.option('query', self.sql).load()
return tag_df

# 获取四级标签rule的对象
def getRule4Meta(self, tag_df: DataFrame):
rule4_str = tag_df.rdd.map(lambda row: row.rule).collect()[0]
rule4_dict = strParse(rule4_str)
# 将字典传入类, 封装成对象
rule_four_meta = Rule4Meta.dict_to_obj(rule4_dict)
return rule_four_meta

# 获取计算标签需要的数据
def getComputeData(self, sparkSeession: SparkSession, rule_meta: Rule4Meta):
es_df = sparkSeession.read\
.format('es')\
.option('es.nodes', rule_meta.esNodes)\
.option('es.resource', rule_meta.esIndex)\
.option('es.read.field.include', rule_meta.selectFields).load()
return es_df

# 读取五级标签的id和rule
def getTag5Info(self, tag_df: DataFrame):
tag5_df = tag_df.where(f'id!={self.tag_four_id}')
return tag5_df

# 标签计算(将两个df的数据join, 得到userId, 标签id
def tagCompute(self, es_df: DataFrame, tag5_df: DataFrame):
pass

# 合并更新新老结果(使用自定义udf函数实现-还需要获取老的df标签结果
def tagResultUpdate(self, new_result_df: DataFrame, sparkSession: SparkSession,
tag5_df: DataFrame, rule_meta: Rule4Meta ):
old_df = sparkSession.read.format('es') \
.option('es.nodes', rule_meta.esNodes) \
.option('es.resource', self.result_index) \
.load()
# 获取五级标签的集合
tag5_list = tag5_df.select(F.col('id').cast(StringType())).rdd.map(lambda row:row.id).collect()
# 给获取的五级标签的集合放到new_result_df中
new_rs_df = new_result_df.withColumn('tag5Ids', F.lit(','.join(tag5_list)))
# 将新老标签的数据合并
result_df = old_df\
.join(new_rs_df, on=old_df.userId==new_rs_df.userId, how='left')\
.select(new_rs_df.userId,updateTagsId(new_rs_df.tagsId, old_df.tagsId, new_rs_df.tag5Ids).alias('tagsId'))
return result_df
# 保存标签计算的结果
def saveResult(self, result_df: DataFrame, rule_meta: Rule4Meta):
result_df.write.format('es')\
.option('es.nodes', rule_meta.esNodes)\
.option('es.resource', self.result_index)\
.option('es.write.operation', 'upsert') \
.option('es.mapping.id', 'userId') \
.option('es.mapping.name', 'userId:userId, tagsId:tagsId') \
.mode('append') \
.save()
# 自动调用执行所有的流程
def execute(self):
# - 1、创建spark的运行环境,逻辑一样,参数不一样:appName
sparkSession = self.getSparkSession()
# - 2、读取mysql中的标签信息(四级和五级标签的id和rule),逻辑一样,参数不一样:tag4Id
tag_df = self.getTagInfo(sparkSession)
# - 3、解析四级标签的rule,逻辑一样,参数一样
rule_meta = self.getRule4Meta(tag_df)
# - 4、根据四级标签的rule读取es中的数据,逻辑一样,参数一样
es_df = self.getComputeData(sparkSession, rule_meta)
# - 5、读取计算需要用到的五级标签的rule和id,逻辑一样,参数一样
tag5_df = self.getTag5Info(tag_df)
# - 6、标签的计算,逻辑不一样,参数一样
new_result_df = self.tagCompute(es_df, tag5_df)
if new_result_df is not None:
# - 7、结果的更新,逻辑一样,参数一样
result_df = self.tagResultUpdate(new_result_df, sparkSession, tag5_df, rule_meta)
# - 8、结果的保存,逻辑一样,参数一样
self.saveResult(result_df, rule_meta)

统计类标签计算

计算最后一次消费时间距离当前时间的天数

  • 四级标签的rule-id:23
1
2
3
4
5
6
7
8
9
inType=Elasticsearch

esNodes=up01:9200

esIndex=tfec_tbl_orders

esType=_doc

selectFields=memberid,finishtime
  • 五级标签
1
2
3
4
5
+---+------+
| id| rule|
+---+------+
| 24| 0-7|
| 25| 8-14|
  • 计算逻辑
    • 需要计算最后一次消费时间距离当前时间的天数
    • 根据memberid进行分组group by,求最大值的finishtime
    • 需要将finishtime从unix_timestamp转化yyyyMMdd的字符格式
    • 把五级标签rule切分为start,end
    • 使用F.datediff(start,end)求两个日期之间的差值
    • 由于数据比较老我们把当前日期作提前处理F.datesub(F.currentdate,1095)

最终得到两个df, 然后合并就行了

  • es_pre_df(经过处理的es中的源数据)
1
2
3
4
5
+------+----+
|userId|days|
+------+----+
| 898| 5|
| 29| 5|
  • tag5_pre_df最后一次消费时间距离当前时间的规则,以及标签的id
1
2
3
4
5
+------+-----+---+
|tagsId|start|end|
+------+-----+---+
| 24| 0| 7|
| 25| 8| 14|

消费标签计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from pyspark.sql.types import StringType

from UserProfile.offline.base.TagComputeBase import TagComputeBase

# 定义性别标签计算类, 继承自标签计算基类
class ConsumptionCycleTagCompute(TagComputeBase):
def tagCompute(self, es_df: DataFrame, tag5_df: DataFrame):
# 需要计算最后一次消费时间距离当前时间的天数
# es_df.show()
# +----------+--------+
# |finishtime|memberid|
# +----------+--------+
# |1594137600| 342|
# |1594310400| 405|
# |1593964800| 653|

# tag5_df.show()
# +---+------+
# | id| rule|
# +---+------+
# | 24| 0-7|
# | 25| 8-14|
# 求用户最大的时间戳, 也就是最近一次的消费时间
es_pre_df = es_df.groupBy(F.col('memberid').alias('userId')).agg(
F.max(F.col('finishtime')).alias('finishtime')
)

# es_pre_df.show()
# +------+----------+
# |userId|finishtime|
# +------+----------+
# | 898|1596211200|
# | 29|1596211200|

# 将时间戳类型的时间转换为日期格式
date_df = es_pre_df\
.select(F.col('userId'), F.from_unixtime(F.col('finishtime'), format='yyyy-MM-dd').alias('finishDate'))
# date_df.show()
# +------+----------+
# |userId|finishDate|
# +------+----------+
# | 898|2020-08-01|
# | 29|2020-08-01|
# | 92|2020-08-01|

# 由于消费者最后一次的消费时间跟现在距离过远, 需要将日期推到3年前-1095天,然后计算相隔天数
date_pre_df = date_df.select('userId', F.datediff(F.date_sub(F.current_date(), 1095), F.col('finishDate')).alias('days'))
# date_pre_df.show()
# +------+----+
# |userId|days|
# +------+----+
# | 898| 5|
# | 29| 5|
# 由于发现数据有点离谱, days普遍集中在5, 6 这两个数值, 所以为了严谨, 将1095改为1093使得days产生7-8,产生两个区间的标签id

# 将tag5_df 的rule(0-7)拆分成start 和 end 这种类型的时间
tag5_pre_df = tag5_df.select(F.col('id').alias('tagsId'), F.split('rule', '-')[0].alias('start'), F.split('rule', '-')[1].alias('end'))
# tag5_pre_df.show()
# +------+-----+---+
# |tagsId|start|end|
# +------+-----+---+
# | 24| 0| 7|
# | 25| 8| 14|

# 将两个标签的数据进行合并(date_pre_df.userId不要写成date_pre_df.select(F.col('userId'))
result_df = date_pre_df\
.join(tag5_pre_df, on=date_pre_df.days.between(tag5_pre_df.start, tag5_pre_df.end), how='left')\
.select(date_pre_df.userId.cast(StringType()).alias('userId'), tag5_pre_df.tagsId.cast(StringType()).alias('tagsId'))
return result_df

pass
if __name__ == '__main__':
appName = '消费周期标签计算任务'
tag4Id = 23
# 创建标签计算对象
consumptionCycleTagCompute = ConsumptionCycleTagCompute(appName, tag4Id)
consumptionCycleTagCompute.execute()

支付方式标签计算

计算每个人常用的支付方式

  • 四级标签id-29
1
2
3
4
5
6
7
8
9
inType=Elasticsearch

esNodes=up01:9200

esIndex=tfec_tbl_orders

esType=_doc

selectFields=memberid,paymentcode
  • 五级标签
1
2
3
4
5
6
30,alipay
31,wxpay
32,chinapay
33,kjtpay
34,cod
35,other
  • 计算逻辑
    • 计算每个人常用的支付方式
    • 根据memberid、paymentcode进行分组groupby,求count(paymentcode)每个人的每个支付的次数
    • 求每个人支付方式中次数最多的支付方式
    • 使用开窗计算:
1
2
3
4
5
es_pre_df = es_df\
.groupBy(F.col('memberid'), F.col('paymentcode'))\
.agg(F.count('paymentcode').alias('cnt'))\
.withColumn('rk', F.row_number().over(Window.partitionBy('memberid').orderBy(F.col('cnt').desc())))\
.where('rk = 1')

计算流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from pyspark.sql import DataFrame, Window
import pyspark.sql.functions as F
from pyspark.sql.types import StringType

from UserProfile.offline.base.TagComputeBase import TagComputeBase

# 定义性别标签计算类, 继承自标签计算基类
class PaymentTagCompute(TagComputeBase):
def tagCompute(self, es_df: DataFrame, tag5_df: DataFrame):
# 计算每个人常用的支付方式
# es_df.show()
# +--------+-----------+
# |memberid|paymentcode|
# +--------+-----------+
# | 342| alipay|
# | 405| alipay|
es_pre_df = es_df\
.groupBy(F.col('memberid'), F.col('paymentcode'))\
.agg(F.count('paymentcode').alias('cnt'))\
.withColumn('rk', F.row_number().over(Window.partitionBy('memberid').orderBy(F.col('cnt').desc())))\
.where('rk = 1')

# es_pre_df.show()
# +--------+-----------+---+---+
# |memberid|paymentcode|cnt| rk|
# +--------+-----------+---+---+
# | 4| alipay|195| 1|
# | 26| alipay|176| 1|
# | 29| alipay|196| 1|

# tag5_df.show()
# +---+--------+
# | id| rule|
# +---+--------+
# | 30| alipay|
# | 31| wxpay|

# 合并两个df得到最终结果
result_df = es_pre_df.join(tag5_df,on=es_pre_df.paymentcode==tag5_df.rule, how='left')\
.select(es_pre_df.memberid.cast(StringType()).alias('userId'), tag5_df.id.cast(StringType()).alias('tagsId'))
return result_df

if __name__ == '__main__':
appName = '支付方式标签计算任务'
tag4Id = 29
# 创建标签计算对象
paymentTagCompute = PaymentTagCompute(appName, tag4Id)
paymentTagCompute.execute()