数据挖掘KMeans算法 KMeans介绍
k均值聚类算法(k-means clustering algorithm)是一种迭代求解的聚类分析算法,其步骤是,预将数据分为K组,则随机选取K个对象作为初始的聚类中心,然后计算每个对象与各个种子聚类中心之间的距离,把每个对象分配给距离它最近的聚类中心。聚类中心以及分配给它们的对象就代表一个聚类。每分配一个样本,聚类的聚类中心会根据聚类中现有的对象被重新计算。这个过程将不断重复直到满足某个终止条件。终止条件可以是没有(或最小数目)对象被重新分配给不同的聚类,没有(或最小数目)聚类中心再发生变化,误差平方和局部最小。
余弦相似度 什么是余弦相似度
余弦相似度算法: 一个向量空间中两个向量夹角间的余弦值作为衡量两个个体之间差异的大小,余弦值接近1,夹角趋于0,表明两个向量越相似,余弦值接近于0,夹角趋于90度,表明两个向量越不相似
主要应用在数据查重场景中, 这里推荐一篇大佬的文章, 写的简单易懂
https://blog.csdn.net/u014539465/article/details/105353638/
文档1:有/问题/下课/找/我/ 1827777777
文档2:下课/有/问题/可以/找/我/ 1877777777
文档1:(TF, IDF)
有 1 0.5
问题 1 0.5
下课 1 0.5
找 1 0.5
我 1 0.5
18.. 10.5
KMeans算法原理
k-means其实包含两层内容:
K表示初始中心点个数(计划聚类数)
means求中心点到其他数据点距离的平均值, K自己设置(2,3,4,5,6,7,8)
KMeans算法特点
优点:速度快,简单
对处理大数据集,该算法保持可伸缩性和高效率。
当簇近似为高斯分布(也就是概论中学的正态分布)时,它的效果较好。
缺点:最终结果跟初始点选择相关,容易陷入局部最优
k均值算法中k是实现者给定的,这个k值的选定是非常难估计的。
k均值的聚类算法需要不断地进行样本分类调整,不断地计算调整后的新的聚类中心,当数据量大的时候,算法开销很大。
k均值是求得局部最优解的算法,所以对于初始化时选取的k个聚类的中心比较敏感,不同点的中心选取策略可能带来不同的聚类结果。
对噪声点和孤立点数据敏感。
KMeans一般是其他聚类方法的基础算法,如谱聚类。
KMeans模型评估
SSE表示数据样本与它所属的簇中心之间的距离(差异度)平方之和。
直观的来说,SSE越小,表示数据点越接近它们的中心,聚类效果越好。
因为对误差取了平方,更加重视那些远离中心的点。
聚类算法评估使用SC系数/轮廓系数
轮廓系数: 取值范围[ -1,1] 取值越大越好
聚成不同的类别 k = 3, k = 4,k=5,k=6, k=7 , 每次聚类之后, 都算一下轮廓系数, 轮廓系数最大的聚类的方式就是最佳的K值
KMeans模型实现
from pyspark.ml.clustering import KMeans 创建KMeans聚类对象
from pyspark.ml.evaluation import ClusteringEvaluator 创建ClusteringEvaluator 聚类评估器对象
一般需要搭配VectorAssembler聚类使用, 给出参数features_column, prediction_column, key, seed就可以训练模型
通过
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 from pyspark.ml.clustering import KMeansfrom pyspark.ml.evaluation import ClusteringEvaluatorfrom pyspark.ml.feature import VectorAssemblerfrom pyspark.sql import SparkSessionif __name__ == '__main__' : spark = SparkSession\ .builder\ .appName('Kmeans算法实现' )\ .master('local[*]' )\ .getOrCreate() data_df = spark.read\ .format ('csv' )\ .option('header' , True )\ .option('inferSchema' , True )\ .load('/root/a.txt' ) assembler = VectorAssembler(inputCols=['Weightindex' ,'PH值' ], outputCol='features' ) result_df = assembler.transform(data_df) kmeans = KMeans(featuresCol='features' , predictionCol='prediction' ,k=2 , seed=5 ) new_result_df = kmeans.fit(result_df).transform(result_df) new_result_df.show() evalutor = ClusteringEvaluator(predictionCol='prediction' , featuresCol='features' ) sc = evalutor.evaluate(new_result_df) print (f"轮廓系数为: {sc} " )
KMeans计算RFM标签 什么是RFM模型
上篇文章介绍过了, 这里就不多bb了!
讲一下思想:
首先通过KMeans算法得到prediction列
通过[numpy.sum(x) for x in centers]获取center_list中心点值的一个列表(列表中的元素和prediction中的0, 1, 2, 3簇标识一一对应)
将刚才获取的列表转为字典(for i in range(0, len(center_list): new_dict[i] = center_list[i])顺序不会发生变化)
对字典进行排序,按值排序(key=lambda x: x[1], reverse=True)
取上面排过序的字典, 取字典的键, 并获取五级标签的id为值(价格敏感度高的排在前面, 且根据逻辑价格敏感度高的中心点的值越大)合成为新的字典new_dict
创建udf函数, 在后续调用, 将prediction类传入函数中, 最终返回new_dict[prediction]
1 就是说 : 按簇中心点的值排序, 值大的, 价格敏感度越高
RFM标签计算流程
1、计算 R/F/M的值
R 用户id分组消费时间求最大值,
F 用户id分组 count计数 (去重计数)
M 用户id分组 金额求和
2、R/F/M 分别打分
3、使用KMeans算法进行聚类
4、将聚类结果和5级标签的rule匹配
5、给用户打上标签
RFM标签计算实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 from pyspark.sql import DataFrameimport pyspark.sql.functions as Ffrom pyspark.sql.types import StringTypefrom UserProfile.offline.base.TagComputeBase import TagComputeBasefrom pyspark.ml.clustering import KMeansfrom pyspark.ml.feature import VectorAssemblerimport numpy as npclass TagCptByKmeans (TagComputeBase ): def tagCompute (self, es_df: DataFrame, tag5_df: DataFrame ): rAgg = F.datediff(F.date_sub(F.current_date(), 1035 ), F.from_unixtime(F.max (F.col('finishtime' )), format ='yyyy-MM-dd' )).alias('r' ) fAgg = F.countDistinct(F.col('ordersn' )).alias('f' ) mAgg = F.sum (F.col('orderamount' )).alias('m' ) rfm_df = es_df.groupBy('memberid' ).agg(rAgg, fAgg, mAgg) rScore = F.when(F.col('r' ).between(1 , 7 ), 5 ) \ .when(F.col('r' ).between(8 , 14 ), 4 ) \ .when(F.col('r' ).between(15 , 30 ), 3 ) \ .when(F.col('r' ).between(31 , 60 ), 2 ) \ .otherwise(1 ).alias('rScore' ) fScore = F.when(F.col('f' ).between(1 , 100 ), 1 ) \ .when(F.col('f' ).between(101 , 150 ), 2 ) \ .when(F.col('f' ).between(151 , 200 ), 3 ) \ .when(F.col('f' ).between(201 , 300 ), 4 ) \ .otherwise(5 ).alias('fScore' ) mScore = F.when(F.col('m' ).between(1 , 200000 ), 1 ) \ .when(F.col('m' ).between(200001 , 300000 ), 2 ) \ .when(F.col('m' ).between(300001 , 500000 ), 3 ) \ .when(F.col('m' ).between(500001 , 800000 ), 4 ) \ .otherwise(5 ).alias('mScore' ) rfm_s_df = rfm_df.select(F.col('memberid' ).alias('userId' ), rScore, fScore, mScore) vectorAssembler = VectorAssembler(inputCols=['rScore' , 'fScore' ,'mScore' ], outputCol='features' ) assemble_df = vectorAssembler.transform(rfm_s_df) kmeans = KMeans(featuresCol='features' , predictionCol='prediction' , k=7 , seed=3 ) model = kmeans.fit(assemble_df) prediction_df = model.transform(assemble_df) prediction_df.show() centers = model.clusterCenters() print (centers) centers_list = [np.sum (x) for x in centers] print (centers_list) centers_dict = {} for i in range (0 , len (centers_list)): centers_dict[i] = centers_list[i] print (centers_dict) sorted_dict = dict (sorted (centers_dict.items(), key=lambda x: x[1 ], reverse=True )) print (sorted_dict) id_list = tag5_df.rdd.map (lambda row: row.id ).collect() print (id_list) rule_dict = dict (zip (sorted_dict.keys(), id_list)) print (rule_dict) @F.udf def prediction2tag5Id (prediction ): return rule_dict[prediction] final_result_df = prediction_df.select(F.col('userId' ).cast(StringType()).alias('userId' ), prediction2tag5Id(F.col('prediction' )).cast(StringType()).alias('tagsId' ) ) return final_result_df if __name__ == '__main__' : appName = '通过kmeans算法计算rfm标签' tag4Id = 37 tagCpt = TagCptByKmeans(appName, tag4Id) tagCpt.execute()
KMeans计算RFE标签 什么是RFE模型
RFE:用户活跃度标签:
使用场景:用户频繁使用但是消费场景比较少,比如信息流应用(今日头条, 腾讯新闻, 微博)
RFE标签计算流程
1、计算 R/F/E的值
2、对R/F/E 分别打分
3、使用KMeans算法进行聚类
4、将聚类结果和5级标签的rule匹配
5、给用户打上标签
RFE标签计算实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 from pyspark.sql import DataFrameimport pyspark.sql.functions as Ffrom pyspark.sql.types import StringTypefrom UserProfile.offline.base.TagComputeBase import TagComputeBasefrom pyspark.ml.clustering import KMeansfrom pyspark.ml.feature import VectorAssemblerimport numpy as npclass TagCptByKmeans (TagComputeBase ): def tagCompute (self, es_df: DataFrame, tag5_df: DataFrame ): r_v = F.datediff(F.date_sub(F.current_date(), 1450 ), F.max (F.col('log_time' ))).alias('r_v' ) f_v = F.count(F.col('loc_url' )).alias('f_v' ) e_v = F.countDistinct(F.col('loc_url' )).alias('e_v' ) rfe_v_df = es_df.groupby('global_user_id' ).agg(r_v, f_v, e_v) r_s = F.when(F.col('r_v' ).between(1 , 3 ), 5 ) \ .when(F.col('r_v' ).between(4 , 7 ), 4 ) \ .when(F.col('r_v' ).between(8 , 15 ), 3 ) \ .when(F.col('r_v' ).between(16 , 30 ), 2 ) \ .otherwise(1 ).alias('r_s' ) f_s = F.when(F.col('f_v' ).between(1 , 500 ), 1 ) \ .when(F.col('f_v' ).between(500 , 550 ), 2 ) \ .when(F.col('f_v' ).between(550 , 600 ), 3 ) \ .when(F.col('f_v' ).between(600 , 700 ), 4 ) \ .otherwise(5 ).alias('f_s' ) e_s = F.when(F.col('e_v' ).between(0 , 200 ), 1 ) \ .when(F.col('e_v' ).between(200 , 251 ), 2 ) \ .when(F.col('e_v' ).between(251 , 300 ), 3 ) \ .when(F.col('e_v' ).between(300 , 350 ), 4 ) \ .otherwise(5 ).alias('e_s' ) rfe_s_df = rfe_v_df.select(F.col('global_user_id' ).alias('userId' ),r_s, f_s, e_s) vector = VectorAssembler(inputCols=['r_s' , 'f_s' , 'e_s' ],outputCol='features' ) assembler_df = vector.transform(rfe_s_df) kmeans = KMeans(k=4 , featuresCol='features' , predictionCol='prediction' , seed=5 ) model = kmeans.fit(assembler_df) KMeans_df = model.transform(assembler_df) centers = model.clusterCenters() centers_list = [np.sum (x) for x in centers] print (centers_list) centers_dict = {} for i in range (0 , len (centers_list)): centers_dict[i] = centers_list[i] sorted_dict = dict (sorted (centers_dict.items(), key=lambda x: x[1 ], reverse=True )) print (sorted_dict) tag5_id_list = tag5_df.rdd.map (lambda row: row.id ).collect() prediction_dict = dict (zip (sorted_dict.keys(), tag5_id_list)) print (prediction_dict) @F.udf def prediction2TagsId (prediction ): return prediction_dict[prediction] result_df = KMeans_df.select(F.col('userId' ).cast(StringType()).alias('userId' ), prediction2TagsId(F.col('prediction' )).cast(StringType()).alias('tagsId' ) ) return result_df if __name__ == '__main__' : appName = '通过KMeans算法计算RFE标签' tag4Id = 45 tagCpt = TagCptByKmeans(appName, tag4Id) tagCpt.execute()
KMeans计算PSM标签 什么是PSM模型
PSM模型(Price Sensitivity Measurement),即价格敏感度模型,是当前价格测试模型中的一个常用模型,其特点为所有价格测试过程完全基于被访者的自然反应,没有任何竞争对手甚至自身产品的任何信息。通过该模型,可以得到产品的最优价格和合理的价格间。
时在实际业务中,会把用户分为3-5类,比如分为极度敏感、较敏感、一般敏感、较不敏感、极度不敏感。
PSM模型中相关概念名词说明如下:
ra: receivableAmount 应收金额
da: discountAmount 优惠金额
pa: practicalAmount 实收金额
tdon 优惠订单数 total discount order number
ton 总订单总数 total order number
ada 平均优惠金额 average discount Amount
ara 平均每单应收 average receivable Amount
tda 优惠总金额 total discount Amount
tra 应收总金额 total receivable Amount
tdonr 优惠订单占比(优惠订单数 / 订单总数) total discount order number ratio
adar 平均优惠金额占比(平均优惠金额 / 平均每单应收金额) average discount amount ratio
tdar 优惠总金额占比(优惠总金额 / 订单总金额) total discount amount ratio
psm = 优惠订单占比 + 平均优惠金额占比 + 优惠总金额占比
psmScore = tdonr + adar + tdar
标签计算流程
1、计算 psm 的值
2、使用KMeans算法进行聚类
3、将聚类结果和5级标签的rule匹配
4、给用户打上标签
标签计算实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 from pyspark.sql import DataFrameimport pyspark.sql.functions as Ffrom pyspark.sql.types import StringTypefrom UserProfile.offline.base.TagComputeBase import TagComputeBasefrom pyspark.ml.clustering import KMeansfrom pyspark.ml.feature import VectorAssemblerimport numpy as npclass TagCptByKmeans (TagComputeBase ): def tagCompute (self, es_df: DataFrame, tag5_df: DataFrame ): da = F.col('couponcodevalue' ).alias('da' ) pa = F.col('orderamount' ).alias('pa' ) ra = da + pa rdp_df = es_df.select(da, pa, ra.alias('ra' ), F.col('ordersn' ), F.col('memberid' ).alias('userId' )) tdon = F.sum (F.when(F.col('da' ) == 0 , 0 ).otherwise(1 )).alias('tdon' ) ton = F.count(F.col('ordersn' )).alias('ton' ) ada = F.avg(F.col('da' )).alias('ada' ) tda = F.sum (F.col('da' )).alias('tda' ) ara = F.avg(F.col('ra' )).alias('ara' ) tra = F.sum (F.col('ra' )).alias('tra' ) rdp_agg_df = rdp_df.groupby('userId' ).agg(tdon, ton, ada, tda, ara, tra) tdonr = (F.col('tdon' ) / F.col('ton' )).alias('tdonr' ) adar = (F.col('ada' ) / F.col('ara' )).alias('adar' ) tdar = (F.col('tda' ) / F.col('tra' )).alias('tdar' ) psm_df = rdp_agg_df.select('userId' , (tdonr + adar + tdar).alias('psm' )) assembler = VectorAssembler(inputCols=['psm' ], outputCol='features' ) psm_vector_df = assembler.transform(psm_df) kmeans = KMeans(k=5 , featuresCol='features' , predictionCol='prediction' , seed=5 ) model = kmeans.fit(psm_vector_df) result_df = model.transform(psm_vector_df) centers = model.clusterCenters() centers_list = [np.sum (x) for x in centers] print (centers_list) centers_dict = {} for i in range (0 , len (centers_list)): centers_dict[i] = centers_list[i] sorted_dict = dict (sorted (centers_dict.items(), key=lambda x: x[1 ], reverse=True )) print (sorted_dict) tag5_ids_list = tag5_df.rdd.map (lambda row: row.id ).collect() print (tag5_ids_list) tags_to_centers_dict = dict (zip (sorted_dict.keys(), tag5_ids_list)) print (tags_to_centers_dict) @F.udf def prediction2Id (prediction ): return tags_to_centers_dict[prediction] final_df = result_df.select(F.col('userId' ).cast(StringType()).alias('userId' ), prediction2Id(F.col('prediction' )).cast(StringType()).alias('tagsId' )) return final_df if __name__ == '__main__' : appName = '通过kmeans算法计算PSM标签' tag4Id = 50 tagCpt = TagCptByKmeans(appName, tag4Id) tagCpt.execute()