数据挖掘KMeans算法

KMeans介绍

k均值聚类算法(k-means clustering algorithm)是一种迭代求解的聚类分析算法,其步骤是,预将数据分为K组,则随机选取K个对象作为初始的聚类中心,然后计算每个对象与各个种子聚类中心之间的距离,把每个对象分配给距离它最近的聚类中心。聚类中心以及分配给它们的对象就代表一个聚类。每分配一个样本,聚类的聚类中心会根据聚类中现有的对象被重新计算。这个过程将不断重复直到满足某个终止条件。终止条件可以是没有(或最小数目)对象被重新分配给不同的聚类,没有(或最小数目)聚类中心再发生变化,误差平方和局部最小。

余弦相似度

什么是余弦相似度

余弦相似度算法:一个向量空间中两个向量夹角间的余弦值作为衡量两个个体之间差异的大小,余弦值接近1,夹角趋于0,表明两个向量越相似,余弦值接近于0,夹角趋于90度,表明两个向量越不相似

主要应用在数据查重场景中, 这里推荐一篇大佬的文章, 写的简单易懂

https://blog.csdn.net/u014539465/article/details/105353638/

文档1:有/问题/下课/找/我/ 1827777777

文档2:下课/有/问题/可以/找/我/ 1877777777

文档1:(TF, IDF)

有 1 0.5

问题 1 0.5

下课 1 0.5

找 1 0.5

我 1 0.5

18.. 10.5

KMeans算法原理

k-means其实包含两层内容:

  • K表示初始中心点个数(计划聚类数)
  • means求中心点到其他数据点距离的平均值, K自己设置(2,3,4,5,6,7,8)
  • 说白了就是计算给定数据中心点的位置(通过不断矫正某个簇的中心点距离其它点的距离的方差- 方差越小越说明接近中心, 来实现聚类算法)

  • 具体步骤如下:

    • 随机设置K个特征空间内的点作为初始的聚类中心

    • 对于其他每个点计算到K个中心的距离,未知的点选择最近的一个聚类中心点作为标记类别

    • 接着对着标记的聚类中心之后,重新计算出每个聚类的新中心点(平均值)

    • 如果计算得出的新中心点与原中心点一样(质心不再移动),那么结束,否则重新进行第二步过程

image-20230518170849066

KMeans算法特点

优点:速度快,简单

  • 对处理大数据集,该算法保持可伸缩性和高效率。
  • 当簇近似为高斯分布(也就是概论中学的正态分布)时,它的效果较好。

缺点:最终结果跟初始点选择相关,容易陷入局部最优

  • k均值算法中k是实现者给定的,这个k值的选定是非常难估计的。
  • k均值的聚类算法需要不断地进行样本分类调整,不断地计算调整后的新的聚类中心,当数据量大的时候,算法开销很大。
  • k均值是求得局部最优解的算法,所以对于初始化时选取的k个聚类的中心比较敏感,不同点的中心选取策略可能带来不同的聚类结果。
  • 对噪声点和孤立点数据敏感。

KMeans一般是其他聚类方法的基础算法,如谱聚类。

KMeans模型评估

  • SSE表示数据样本与它所属的簇中心之间的距离(差异度)平方之和。
    • img
    • 直观的来说,SSE越小,表示数据点越接近它们的中心,聚类效果越好。
    • 因为对误差取了平方,更加重视那些远离中心的点。
  • 聚类算法评估使用SC系数/轮廓系数
    • 轮廓系数: 取值范围[ -1,1] 取值越大越好
    • 聚成不同的类别 k = 3, k = 4,k=5,k=6, k=7 , 每次聚类之后, 都算一下轮廓系数, 轮廓系数最大的聚类的方式就是最佳的K值

KMeans模型实现

  • from pyspark.ml.clustering import KMeans 创建KMeans聚类对象
  • from pyspark.ml.evaluation import ClusteringEvaluator 创建ClusteringEvaluator 聚类评估器对象
  • 一般需要搭配VectorAssembler聚类使用, 给出参数features_column, prediction_column, key, seed就可以训练模型
  • 通过
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import VectorAssembler

from pyspark.sql import SparkSession

if __name__ == '__main__':
# 创建Spark运行环境
spark = SparkSession\
.builder\
.appName('Kmeans算法实现')\
.master('local[*]')\
.getOrCreate()
# 1, 加载数据
data_df = spark.read\
.format('csv')\
.option('header', True)\
.option('inferSchema', True)\
.load('/root/a.txt')
# data_df.show()
# +-----------+----+
# |Weightindex|PH值|
# +-----------+----+
# | 1.0| 1.0|
# | 2.0| 1.0|
# | 4.0| 3.0|
# | 5.0| 4.0|

# 2, 特征工程(将特征列封装到一个向量列当中
assembler = VectorAssembler(inputCols=['Weightindex','PH值'], outputCol='features')
# 2.1 创建模型(VectorAssmebler可以省略fit), 并计算数据
result_df = assembler.transform(data_df)
# result_df.show()
# +-----------+----+---------+
# |Weightindex|PH值| features|
# +-----------+----+---------+
# | 1.0| 1.0|[1.0,1.0]|
# | 2.0| 1.0|[2.0,1.0]|
# | 4.0| 3.0|[4.0,3.0]|
# | 5.0| 4.0|[5.0,4.0]|

# 3, 模型训练 (使用KMeans进行模型训练, 始终逃不过fit和transform
# 3.1 创建KMeans对象
kmeans = KMeans(featuresCol='features', predictionCol='prediction',k=2, seed=5)
# 3.2 创建模型,并计算
new_result_df = kmeans.fit(result_df).transform(result_df)
new_result_df.show()

# 4, 模型评估
evalutor = ClusteringEvaluator(predictionCol='prediction', featuresCol='features')
sc = evalutor.evaluate(new_result_df)
print(f"轮廓系数为: {sc}")

KMeans计算RFM标签

什么是RFM模型

上篇文章介绍过了, 这里就不多bb了!

讲一下思想:

  • 首先通过KMeans算法得到prediction列
  • 通过[numpy.sum(x) for x in centers]获取center_list中心点值的一个列表(列表中的元素和prediction中的0, 1, 2, 3簇标识一一对应)
  • 将刚才获取的列表转为字典(for i in range(0, len(center_list): new_dict[i] = center_list[i])顺序不会发生变化)
  • 对字典进行排序,按值排序(key=lambda x: x[1], reverse=True)
  • 取上面排过序的字典, 取字典的键, 并获取五级标签的id为值(价格敏感度高的排在前面, 且根据逻辑价格敏感度高的中心点的值越大)合成为新的字典new_dict
  • 创建udf函数, 在后续调用, 将prediction类传入函数中, 最终返回new_dict[prediction]
1
就是说: 按簇中心点的值排序, 值大的, 价格敏感度越高

RFM标签计算流程

  • 1、计算 R/F/M的值
    • R 用户id分组消费时间求最大值,
    • F 用户id分组 count计数 (去重计数)
    • M 用户id分组 金额求和
  • 2、R/F/M 分别打分
  • 3、使用KMeans算法进行聚类
  • 4、将聚类结果和5级标签的rule匹配
  • 5、给用户打上标签

RFM标签计算实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from UserProfile.offline.base.TagComputeBase import TagComputeBase
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
import numpy as np

class TagCptByKmeans(TagComputeBase):
def tagCompute(self, es_df: DataFrame, tag5_df: DataFrame):
# es_df.show()
# +----------+--------+-----------+--------------------+
# |finishtime|memberid|orderamount| ordersn|
# +----------+--------+-----------+--------------------+
# |1594137600| 342| 1.0|jd_15062716252125282|
# |1594310400| 405| 3699.0|jd_15062720080896457|

# tag5_df.show()
# +---+----+
# | id|rule|
# +---+----+
# | 38| 1|
# | 39| 2|
# | 40| 3|

# 1、计算 R/F/M的值
# R 用户id分组消费时间求最大值,
rAgg = F.datediff(F.date_sub(F.current_date(), 1035),
F.from_unixtime(F.max(F.col('finishtime')), format='yyyy-MM-dd')).alias('r')
# F 用户id分组 count计数 (去重计数)
fAgg = F.countDistinct(F.col('ordersn')).alias('f')
# M 用户id分组 金额求和
mAgg = F.sum(F.col('orderamount')).alias('m')
rfm_df = es_df.groupBy('memberid').agg(rAgg, fAgg, mAgg)

# rfm_df.show()
# +--------+---+---+------------------+
# |memberid| r| f| m|
# +--------+---+---+------------------+
# | 747| 67|111|202788.09013915993|
# | 303| 67| 97|168033.97006225586|
# | 61| 67|230|373673.77001953125|

# 2、R/F/M 分别打分
rScore = F.when(F.col('r').between(1, 7), 5) \
.when(F.col('r').between(8, 14), 4) \
.when(F.col('r').between(15, 30), 3) \
.when(F.col('r').between(31, 60), 2) \
.otherwise(1).alias('rScore')
fScore = F.when(F.col('f').between(1, 100), 1) \
.when(F.col('f').between(101, 150), 2) \
.when(F.col('f').between(151, 200), 3) \
.when(F.col('f').between(201, 300), 4) \
.otherwise(5).alias('fScore')
mScore = F.when(F.col('m').between(1, 200000), 1) \
.when(F.col('m').between(200001, 300000), 2) \
.when(F.col('m').between(300001, 500000), 3) \
.when(F.col('m').between(500001, 800000), 4) \
.otherwise(5).alias('mScore')
rfm_s_df = rfm_df.select(F.col('memberid').alias('userId'), rScore, fScore, mScore)
# rfm_s_df.show()
# +------+------+------+------+
# |userId|rScore|fScore|mScore|
# +------+------+------+------+
# | 747| 1| 2| 2|
# | 303| 1| 1| 1|
# | 61| 1| 4| 3|
# 3、使用KMeans算法进行聚类
# 3-1、进行特征工程将r f m三个维度列合并一个列中(使用VectorAssembler创建对象, Transform
vectorAssembler = VectorAssembler(inputCols=['rScore', 'fScore','mScore'], outputCol='features')
assemble_df = vectorAssembler.transform(rfm_s_df)
# assemble_df.show()
# +------+------+------+------+-------------+
# |userId|rScore|fScore|mScore| features|
# +------+------+------+------+-------------+
# | 747| 1| 2| 2|[1.0,2.0,2.0]|
# | 303| 1| 1| 1|[1.0,1.0,1.0]|

# 3-2、使用KMeans算法进行模型训练
kmeans = KMeans(featuresCol='features', predictionCol='prediction', k=7, seed=3)
# 创建模型
model = kmeans.fit(assemble_df)
# 计算
prediction_df = model.transform(assemble_df)
prediction_df.show()
# +------+------+------+------+-------------+----------+
# |userId|rScore|fScore|mScore| features|prediction|
# +------+------+------+------+-------------+----------+
# | 747| 1| 2| 2|[1.0,2.0,2.0]| 0|
# | 303| 1| 1| 1|[1.0,1.0,1.0]| 4|
# | 61| 1| 4| 3|[1.0,4.0,3.0]| 3|

# 4、将聚类结果和5级标签的rule匹配(已经使用kemeans对数据进行聚类, 分成7个类,需要将每个类对应的值转换为价值标签
# 4-1、求每个类别的得分
centers = model.clusterCenters()
print(centers)

# 4-2 求每个中心点得分总和 (下面的等价于 for x in centers: centers_list.append(np.sum(x))
centers_list = [np.sum(x) for x in centers]
print(centers_list)
# [5.0, 4.0, 9.357142857142858, 8.0, 3.1707317073170733, 6.1, 7.75]

# 4-3 将中心点的得分和中心点所属于的类别存入到字典中
centers_dict = {}
for i in range(0, len(centers_list)):
centers_dict[i] = centers_list[i]
print(centers_dict)
# {0: 5.0, 1: 4.0, 2: 9.357142857142858, 3: 8.0, 4: 3.1707317073170733, 5: 6.1, 6: 7.75}

# 4-4 对字典进行排序, x: x[1]表示取字典中的value进行排序, reverse=True倒序
sorted_dict = dict(sorted(centers_dict.items(), key=lambda x: x[1], reverse=True))
print(sorted_dict)
# {2: 9.357142857142858, 3: 8.0, 6: 7.75, 5: 6.1, 0: 5.0, 1: 4.0, 4: 3.1707317073170733}
# 取出五级标签的rule(将五级标签的rule转换为dict
id_list = tag5_df.rdd.map(lambda row: row.id).collect()
print(id_list) # 顺序获取标签id, [38, 39, 40, 41, 42, 43, 44]
# key是sorted_dict的键 value对应五级rule, 通过zip将两个东西合并起来
rule_dict = dict(zip(sorted_dict.keys(), id_list))
print(rule_dict)
# 五级标签id与中心点类别: {4: 38, 3: 39, 2: 40, 6: 41, 0: 42, 1: 43, 5: 44}
@F.udf
def prediction2tag5Id(prediction):
return rule_dict[prediction]
final_result_df = prediction_df.select(F.col('userId').cast(StringType()).alias('userId'),
prediction2tag5Id(F.col('prediction')).cast(StringType()).alias('tagsId') )
return final_result_df

if __name__ == '__main__':
appName = '通过kmeans算法计算rfm标签'
tag4Id = 37
# 创建标签计算对象
tagCpt = TagCptByKmeans(appName, tag4Id)
tagCpt.execute()

KMeans计算RFE标签

什么是RFE模型

RFE:用户活跃度标签:

使用场景:用户频繁使用但是消费场景比较少,比如信息流应用(今日头条, 腾讯新闻, 微博)

RFE标签计算流程

  • 1、计算 R/F/E的值
  • 2、对R/F/E 分别打分
  • 3、使用KMeans算法进行聚类
  • 4、将聚类结果和5级标签的rule匹配
  • 5、给用户打上标签

RFE标签计算实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from UserProfile.offline.base.TagComputeBase import TagComputeBase
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
import numpy as np

class TagCptByKmeans(TagComputeBase):
def tagCompute(self, es_df: DataFrame, tag5_df: DataFrame):
# es_df.show()
# +--------------+--------------------+-------------------+
# |global_user_id| loc_url| log_time|
# +--------------+--------------------+-------------------+
# | 806|http://search.esh...|2019-07-26 04:14:36|
# | 139|http://m.eshop.co...|2019-07-28 19:00:37|
# | 180|http://www.eshop....|2019-07-21 14:49:14|
# | 482|http://m.eshop.co...|2019-08-10 23:43:39|

# tag5_df.show()
# +---+----+
# | id|rule|
# +---+----+
# | 46| 1|
# | 47| 2|
# | 48| 3|
# | 49| 4|
# +---+----+

# 1、计算 R/F/E的值
r_v = F.datediff(F.date_sub(F.current_date(), 1450), F.max(F.col('log_time'))).alias('r_v')
f_v = F.count(F.col('loc_url')).alias('f_v')
e_v = F.countDistinct(F.col('loc_url')).alias('e_v')
rfe_v_df = es_df.groupby('global_user_id').agg(r_v, f_v, e_v)
# rfe_v_df.show()
# +--------------+---+---+---+
# |global_user_id|r_v|f_v|e_v|
# +--------------+---+---+---+
# | 107| 2|823|280|
# | 110| 2|680|249|
# | 137| 2|858|290|

# 2、对R/F/E 分别打分
r_s = F.when(F.col('r_v').between(1, 3), 5) \
.when(F.col('r_v').between(4, 7), 4) \
.when(F.col('r_v').between(8, 15), 3) \
.when(F.col('r_v').between(16, 30), 2) \
.otherwise(1).alias('r_s')
f_s = F.when(F.col('f_v').between(1, 500), 1) \
.when(F.col('f_v').between(500, 550), 2) \
.when(F.col('f_v').between(550, 600), 3) \
.when(F.col('f_v').between(600, 700), 4) \
.otherwise(5).alias('f_s')
e_s = F.when(F.col('e_v').between(0, 200), 1) \
.when(F.col('e_v').between(200, 251), 2) \
.when(F.col('e_v').between(251, 300), 3) \
.when(F.col('e_v').between(300, 350), 4) \
.otherwise(5).alias('e_s')
rfe_s_df = rfe_v_df.select(F.col('global_user_id').alias('userId'),r_s, f_s, e_s)
# rfe_s_df.show()
# +------+---+---+---+
# |userId|r_s|f_s|e_s|
# +------+---+---+---+
# | 107| 5| 5| 3|
# | 110| 5| 4| 2|

# 3、使用KMeans算法进行聚类(先用vectorassembler 在用KMeans
vector = VectorAssembler(inputCols=['r_s', 'f_s', 'e_s'],outputCol='features')
assembler_df = vector.transform(rfe_s_df)
# 3.2 使用KMeans算法进行模型训练
kmeans = KMeans(k=4, featuresCol='features', predictionCol='prediction', seed=5)
# 创建模型
model = kmeans.fit(assembler_df)
KMeans_df = model.transform(assembler_df)
# 打印进过模型训练的df, 有prediction列
# KMeans_df.show()
# +------+---+---+---+-------------+----------+
# |userId|r_s|f_s|e_s| features|prediction|
# +------+---+---+---+-------------+----------+
# | 107| 5| 5| 3|[5.0,5.0,3.0]| 1|
# | 110| 5| 4| 2|[5.0,4.0,2.0]| 2|

# 4、将聚类结果和5级标签的rule匹配
# 4.1 首先算出聚类的中心点(查看中心点的得分, 算出得分的总和
# 注意centers是model中的一个方法, 只有模型才能调用
centers = model.clusterCenters()
# 将每组centers放到一个列表中
centers_list = [np.sum(x) for x in centers]
print(centers_list)
# [12.0, 13.00498338870432, 11.0, 12.0] -- 这里是一一对应prediction中的0, 1, 2
# 将列表元素 转为字典中,不改变顺序,因为将center中的值取出来是与prediction中的0, 1,2 对应的
# (生成的字典为: {'1': list中第一个元素, '2': 第二个.....}
centers_dict = {}
for i in range(0, len(centers_list)):
centers_dict[i] = centers_list[i]
# 对字典中的元素进行排序(按照值从大到小的顺序排序
sorted_dict = dict(sorted(centers_dict.items(), key=lambda x: x[1], reverse=True))
print(sorted_dict)
# {1: 13.00498338870432, 0: 12.0, 3: 12.0, 2: 11.0}
# 获取五级标签的id( 用户价值高的排在前面
tag5_id_list = tag5_df.rdd.map(lambda row: row.id).collect()
# 将按照值的大小排过顺序的中心点字典的键为新字典的键, 值为tag5的id(价值高的排前面) 两个进行一一对应, 生成一个新的字典
prediction_dict = dict(zip(sorted_dict.keys(), tag5_id_list))
print(prediction_dict)
# {1: 46, 0: 47, 3: 48, 2: 49}
# 5、给用户打上标签
@F.udf
def prediction2TagsId(prediction):
return prediction_dict[prediction]

result_df = KMeans_df.select(F.col('userId').cast(StringType()).alias('userId'),
prediction2TagsId(F.col('prediction')).cast(StringType()).alias('tagsId')
)

return result_df

if __name__ == '__main__':
appName = '通过KMeans算法计算RFE标签'
tag4Id = 45
# 创建标签计算对象
tagCpt = TagCptByKmeans(appName, tag4Id)
tagCpt.execute()

KMeans计算PSM标签

什么是PSM模型

PSM模型(Price Sensitivity Measurement),即价格敏感度模型,是当前价格测试模型中的一个常用模型,其特点为所有价格测试过程完全基于被访者的自然反应,没有任何竞争对手甚至自身产品的任何信息。通过该模型,可以得到产品的最优价格和合理的价格间。

时在实际业务中,会把用户分为3-5类,比如分为极度敏感、较敏感、一般敏感、较不敏感、极度不敏感

PSM模型中相关概念名词说明如下:

ra: receivableAmount 应收金额

da: discountAmount 优惠金额

pa: practicalAmount 实收金额

tdon 优惠订单数 total discount order number

ton 总订单总数 total order number

ada 平均优惠金额 average discount Amount

ara 平均每单应收 average receivable Amount

tda 优惠总金额 total discount Amount

tra 应收总金额 total receivable Amount

tdonr 优惠订单占比(优惠订单数 / 订单总数) total discount order number ratio

adar 平均优惠金额占比(平均优惠金额 / 平均每单应收金额) average discount amount ratio

tdar 优惠总金额占比(优惠总金额 / 订单总金额) total discount amount ratio

psm = 优惠订单占比 + 平均优惠金额占比 + 优惠总金额占比

psmScore = tdonr + adar + tdar

标签计算流程

  • 1、计算 psm 的值
  • 2、使用KMeans算法进行聚类
  • 3、将聚类结果和5级标签的rule匹配
  • 4、给用户打上标签

标签计算实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from UserProfile.offline.base.TagComputeBase import TagComputeBase
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
import numpy as np

class TagCptByKmeans(TagComputeBase):
def tagCompute(self, es_df: DataFrame, tag5_df: DataFrame):
# es_df.show()
# +---------------+--------+-----------+--------------------+
# |couponcodevalue|memberid|orderamount| ordersn|
# +---------------+--------+-----------+--------------------+
# | 0.0| 342| 1.0|jd_15062716252125282|
# | 0.0| 405| 3699.0|jd_15062720080896457|
# | 0.0| 653| 2699.0|jd_15062817103347299|

# tag5_df.show()
# +---+----+
# | id|rule|
# +---+----+
# | 51| 1|
# | 52| 2|
# | 53| 3|
# | 54| 4|
# | 55| 5|
# +---+----+

# 计算PSM的值
# 优惠金额
da = F.col('couponcodevalue').alias('da')
# 实收金额
pa = F.col('orderamount').alias('pa')
# 应收金额
ra = da + pa

rdp_df = es_df.select(da, pa, ra.alias('ra'), F.col('ordersn'), F.col('memberid').alias('userId'))
# rdp_df.show()
# +---+-------+-------+--------------------+------+
# | da| pa| ra| ordersn|userId|
# +---+-------+-------+--------------------+------+
# |0.0| 1.0| 1.0|jd_15062716252125282| 342|
# |0.0| 3699.0| 3699.0|jd_15062720080896457| 405|

# 优惠订单数
tdon = F.sum(F.when(F.col('da') == 0, 0).otherwise(1)).alias('tdon')
# 总订单数
ton = F.count(F.col('ordersn')).alias('ton')
# 平均优惠金额
ada = F.avg(F.col('da')).alias('ada')
# 优惠总金额
tda = F.sum(F.col('da')).alias('tda')
# 每单平均应收
ara = F.avg(F.col('ra')).alias('ara')
# 应收总金额
tra = F.sum(F.col('ra')).alias('tra')
rdp_agg_df = rdp_df.groupby('userId').agg(tdon, ton, ada, tda, ara, tra)
# rdp_agg_df.show()
# +------+----+---+-------------------+------+------------------+------------------+
# |userId|tdon|ton| ada| tda| ara| tra|
# +------+----+---+-------------------+------+------------------+------------------+
# | 898| 4|129| 5.426356589147287| 700.0|1822.9205431087996|235156.75006103516|
# | 29| 10|246| 7.723577235772358|1900.0| 1648.066057530845| 405424.2501525879|
# 用户敏感度 = 优惠订单占比 + 平均优惠金额占比 + 优惠总金额占比
# 优惠订单占比(优惠订单数/订单总数)
tdonr = (F.col('tdon') / F.col('ton')).alias('tdonr')
# 平均优惠金额占比(平均优惠金额/平均每单应收金额)
adar = (F.col('ada') / F.col('ara')).alias('adar')
# 优惠总金额占比(优惠总金额/订单总金额)
tdar = (F.col('tda') / F.col('tra')).alias('tdar')
psm_df = rdp_agg_df.select('userId', (tdonr + adar + tdar).alias('psm'))
# psm_df.show()
# +------+--------------------+
# |userId| psm|
# +------+--------------------+
# | 898| 0.03696122764997928|
# | 29| 0.05002330415034498|
# | 92|0.004834992278551832|

# 使用KMeans算法进行聚类
# 特征筛选
assembler = VectorAssembler(inputCols=['psm'], outputCol='features')
psm_vector_df = assembler.transform(psm_df)
# KMeans对psm进行聚类, 并求中心点的分值, 分值越高越代表对价格的敏感度越高
kmeans = KMeans(k=5, featuresCol='features', predictionCol='prediction', seed=5)
# 创建KMeans模型
model = kmeans.fit(psm_vector_df)
# 进行计算
result_df = model.transform(psm_vector_df)
# result_df.show()
# +------+--------------------+--------------------+----------+
# |userId| psm| features|prediction|
# +------+--------------------+--------------------+----------+
# | 898| 0.03696122764997928|[0.03696122764997...| 2|
# | 29| 0.05002330415034498|[0.05002330415034...| 3|
# | 92|0.004834992278551832|[0.00483499227855...| 1|

# 将聚类的值和五级标签的rule匹配
# 获取聚类中心点的值(必须通过model.clusterCenters调用, 获取一个列表嵌套多个array, array中的是簇的每个点的值
centers = model.clusterCenters()
centers_list = [np.sum(x) for x in centers]
print(centers_list)
# 为了不改变prediction中的顺序, 将列表中的元素整合成一个字典
centers_dict = {}
for i in range(0, len(centers_list)):
centers_dict[i] = centers_list[i]
# 按照字典中的值对字典进行降序排序(后面的标签id也是敏感度高的在最上面
sorted_dict = dict(sorted(centers_dict.items(), key=lambda x: x[1], reverse=True))
print(sorted_dict)
# 取出五级标签的id
tag5_ids_list = tag5_df.rdd.map(lambda row: row.id).collect()
print(tag5_ids_list)

# 将五级标签的id和字典中的值一一对应的连接起来(通过zip函数形成新的字典
tags_to_centers_dict = dict(zip(sorted_dict.keys(), tag5_ids_list))
print(tags_to_centers_dict)

@F.udf
def prediction2Id(prediction):
return tags_to_centers_dict[prediction]

# 挑选字段, 并返回最终的df
final_df = result_df.select(F.col('userId').cast(StringType()).alias('userId'),
prediction2Id(F.col('prediction')).cast(StringType()).alias('tagsId'))

# final_df.show()
# +------+------+
# |userId|tagsId|
# +------+------+
# | 898| 53|
# | 29| 52|
# | 92| 55|

return final_df

if __name__ == '__main__':
appName = '通过kmeans算法计算PSM标签'
tag4Id = 50
# 创建标签计算对象
tagCpt = TagCptByKmeans(appName, tag4Id)
tagCpt.execute()