1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
| """ #TODO 0.准备Spark开发环境 #TODO 1.读取MySQL中的数据 #TODO 2.读取模型/标签相关的4级标签rule并解析--==标签id不一样== #TODO 3.根据解析出来的rule读取ES数据 #TODO 4.读取模型/标签相关的5级标签(根据4级标签的id作为pid查询)---==标签id不一样== #====5.根据ES数据和5级标签数据进行匹配,得出userId,tagsId---==实现代码不一样== #TODO 6.查询elasticsearch中的oldDF #TODO 7.合并newDF和oldDF #TODO 8.将最终结果写到ES """ import os
from pyspark.sql import SparkSession, DataFrame import pyspark.sql.functions as F from pyspark.sql.types import StringType
from UserProfile.offline.pojo.RuleMeta import Rule4Meta
os.environ['SPARK_HOME'] = '/export/server/spark' os.environ['PYSPARK_HOME'] = '/root/anaconda3/envs/pyspark_env/bin/python'
def strParse(rule4_str: str): rule4_list = rule4_str.split("##") rule4_dict = {} for rule4_meta in rule4_list: rule4_dict[rule4_meta.split('=')[0]] = rule4_meta.split('=')[1] return rule4_dict
@F.udf def updateTagsId(newTagsId, oldTagsId:str, tag5Ids:str): tag5_list = tag5Ids.split(",") need_list = [] for i in oldTagsId.split(","): if i not in tag5_list: need_list.append(i) need_list.append(newTagsId) return ','.join(need_list)
class TagComputeBase(): def __init__(self, appName, tag_four_id): self.appName = appName self.tag_four_id = tag_four_id self.url = "jdbc:mysql://192.168.88.166:3306/tfec_tags?useSSL=false&useUnicode=true&characterEncoding=utf8&user=root&password=123456" tableName = 'tbl_basic_tag' self.sql = f'select id, rule from {tableName} where id = {tag_four_id} or pid = {tag_four_id}' self.result_index = "tfec_userprofile_result"
def getSparkSession(self): sparkSession = SparkSession.builder \ .appName(self.appName) \ .master("local[*]") \ .config("spark.sql.shuffle.partitions", 20) \ .getOrCreate() return sparkSession
def getTagInfo(self, sparkSession: SparkSession): tag_df = sparkSession.read\ .format('jdbc')\ .option('url', self.url)\ .option('query', self.sql).load() return tag_df
def getRule4Meta(self, tag_df: DataFrame): rule4_str = tag_df.rdd.map(lambda row: row.rule).collect()[0] rule4_dict = strParse(rule4_str) rule_four_meta = Rule4Meta.dict_to_obj(rule4_dict) return rule_four_meta
def getComputeData(self, sparkSeession: SparkSession, rule_meta: Rule4Meta): es_df = sparkSeession.read\ .format('es')\ .option('es.nodes', rule_meta.esNodes)\ .option('es.resource', rule_meta.esIndex)\ .option('es.read.field.include', rule_meta.selectFields).load() return es_df
def getTag5Info(self, tag_df: DataFrame): tag5_df = tag_df.where(f'id!={self.tag_four_id}') return tag5_df
def tagCompute(self, es_df: DataFrame, tag5_df: DataFrame): pass
def tagResultUpdate(self, new_result_df: DataFrame, sparkSession: SparkSession, tag5_df: DataFrame, rule_meta: Rule4Meta ): old_df = sparkSession.read.format('es') \ .option('es.nodes', rule_meta.esNodes) \ .option('es.resource', self.result_index) \ .load() tag5_list = tag5_df.select(F.col('id').cast(StringType())).rdd.map(lambda row:row.id).collect() new_rs_df = new_result_df.withColumn('tag5Ids', F.lit(','.join(tag5_list))) result_df = old_df\ .join(new_rs_df, on=old_df.userId==new_rs_df.userId, how='left')\ .select(new_rs_df.userId,updateTagsId(new_rs_df.tagsId, old_df.tagsId, new_rs_df.tag5Ids).alias('tagsId')) return result_df def saveResult(self, result_df: DataFrame, rule_meta: Rule4Meta): result_df.write.format('es')\ .option('es.nodes', rule_meta.esNodes)\ .option('es.resource', self.result_index)\ .option('es.write.operation', 'upsert') \ .option('es.mapping.id', 'userId') \ .option('es.mapping.name', 'userId:userId, tagsId:tagsId') \ .mode('append') \ .save() def execute(self): sparkSession = self.getSparkSession() tag_df = self.getTagInfo(sparkSession) rule_meta = self.getRule4Meta(tag_df) es_df = self.getComputeData(sparkSession, rule_meta) tag5_df = self.getTag5Info(tag_df) new_result_df = self.tagCompute(es_df, tag5_df) if new_result_df is not None: result_df = self.tagResultUpdate(new_result_df, sparkSession, tag5_df, rule_meta) self.saveResult(result_df, rule_meta)
|