首先梳理一下流程:

业务数据库 → HDFS → Hive数仓分层(ODS层) → ElasticSearch → 标签计算 → 读取元数据将计算结果写入到ElasticSearch

项目总结

幕布: https://www.mubu.com/doc/lV4_0JwoCW

项目背景

行业特点:

1.实时需求较少、离线需求较多

2.指标计算对精度要求较高

为了能更好地分析历史用户的行为习惯,将用户标签化,利用大数据技术分析用户的偏好和行为,可以帮助保险公司更好地开展精准营销、产品定制、风险评估、理赔处理、客户服务等业务,从而更好地满足客户需求,提升业务效率和客户体验,为公司提高收益

标签体系

一级标签: (行业)保险行业

二级标签: 子行业-三有保险用户画像

三级标签: 标签大类-投保客户类型、投保详情商业类型、理赔记录赔付类型、退保记录流式类型

四级标签: 标签的一个类别(对应一个计算任务):性别、身高、年龄、省市、区域、收入、民族、婚姻状况、教育程度、缴费期、购买保险的年龄、购买周期、保险类型、保单状态、保单持有日期等

五级标签: 四级标签对应的具体的值:标签规则就是标签计算的依据

按照类别细分

分别占75%, 50%, 25%

匹配类标签

1
2
3
4
性别、身高、年龄
省份、城市、区域
收入、民族、星座
婚姻状况、教育程度

统计类标签

1
2
3
缴费期、购买保险年龄
最近购买周期、保险类型
保单状态、保单持有日期

挖掘类标签

1
用户风险等级

技术选型

1
Spark2.4.8 + Elasticsearch7.10 + Hadoop2.7 + Hive2.1 + Flume1.10 + Zookeeper3.4 + Kafka2.11 + Dolphinscheduler2.0 + Mysql5.7

离线流程

Spark读取MySQL中存储的四级标签的元数据并解析为一个 rule对象

根据四级标签的元数据读取标签计算需要的数据列

从ES中读取五级标签的元数据,根据rule进行计算,保存结果回ES

用Presto查询结果并做BI展示

实时流程

Flume采集nginx日志数据和用户的行为日志文件到Kafka中

通过spark结构化流实时计算任务,并将结果写入MySQL中

用Presto查询结果

Hive建表DDL

创建HiveODS层的表

分区 - 分桶 - 切割 - 存储格式 - 存储位置 - 表属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
create database insurance;
use insurance_ods;

-- 投保客户表
CREATE TABLE insurance_ods.policy_client (
user_id STRING COMMENT '用户号',
name STRING COMMENT '姓名',
id_card STRING COMMENT '身份证号',
phone STRING COMMENT '手机号',
sex STRING COMMENT '性别',
height INT COMMENT '身高',
birthday STRING COMMENT '出生日期',
province STRING COMMENT '省份',
city STRING COMMENT '城市',
direction STRING COMMENT '区域',
income INT COMMENT '收入',
race STRING COMMENT '民族',
marriage_state STRING COMMENT '婚姻状况',
edu STRING COMMENT '学历',
sign STRING COMMENT '星座')
COMMENT '客户信息表'
-- 是否设置表分区,取决于后续是否区分全量和增量导入数据
partitioned by (df string)
row format delimited fields terminated by '\t';

-- 修改表创建新的分区
-- alter table insurance_ods.policy_client add if not exists partition(df='20221203') location '/user/hive/warehouse/insurance_ods.db/policy_client/20221203';
alter table insurance_ods.policy_client add if not exists partition(df='20221203');
alter table insurance_ods.policy_client add if not exists partition(df='20221202');


-- 退保记录表
CREATE TABLE insurance_ods.policy_surrender (
pol_no STRING COMMENT '保单号',
user_id STRING COMMENT '用户号',
buy_datetime STRING COMMENT '投保日期',
keep_days INT COMMENT '持有天数',
elapse_date STRING COMMENT '失效日期')
COMMENT '退保记录表'
partitioned by (df string)
row format delimited fields terminated by '\t';


-- 投保详情表
drop table insurance_ods.policy_benefit;
CREATE TABLE insurance_ods.policy_benefit (
pol_no STRING COMMENT '保单号',
user_id STRING COMMENT '用户号',
ppp STRING COMMENT '缴费期',
age_buy smallint COMMENT '投保年龄',
buy_datetime STRING COMMENT '投保日期',
insur_name STRING COMMENT '保险名称',
insur_code STRING COMMENT '保险代码',
pol_flag SMALLINT COMMENT '保单状态,1有效,0失效',
elapse_date STRING COMMENT '保单失效时间')
COMMENT '投保记录表'
partitioned by (df string)
row format delimited fields terminated by '\t';


-- 理赔信息表
drop table insurance_ods.claim_info;
CREATE TABLE insurance_ods.claim_info (
pol_no STRING COMMENT '保单号',
user_id STRING COMMENT '用户号',
buy_datetime STRING COMMENT '购买日期',
insur_code STRING COMMENT '保险代码',
claim_date STRING COMMENT '理赔日期',
claim_item STRING COMMENT '理赔责任',
claim_mnt DECIMAL(35,6) COMMENT '理赔金额')
COMMENT '理赔信息表'
partitioned by (df string)
row format delimited fields terminated by '\t';

-- 修改表创建新的分区
-- alter table insurance_ods.policy_client add if not exists partition(df='20221203') location '/user/hive/warehouse/insurance_ods.db/policy_client/20221203';
alter table insurance_ods.policy_benefit add if not exists partition(df='20221203');
alter table insurance_ods.policy_surrender add if not exists partition(df='20221203');
alter table insurance_ods.claim_info add if not exists partition(df='20221203');

抽取业务数据库中的数据到Hive的ODS

1
2
3
4
5
load data local inpath '/root/claim_info.txt' into table claim_info partition (df='20230815');
show partitions policy_surrender;
load data local inpath '/root/policy_client.txt' into table policy_client partition (df='20230815');
load data local inpath '/root/policy_surrender.txt' into table policy_surrender partition (df='20230815');
load data local inpath '/root/policy_benefit.txt' into table policy_surrender partition (df='20230815');

MySQL标签元数据建立

这里的规则都是自己定义的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (1, '保险', '行业', '', '', 1, -1, '2023-08-15 14:17:52', '2023-08-15 14:17:52', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (2, '三有保险用户画像', '项目或子行业', '', '', 2, 1, '2023-08-15 14:17:52', '2023-08-15 14:17:52', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (3, '投保客户类型', '项目下标签属性类型', '', '', 3, 2, '2023-08-15 14:17:52', '2023-08-15 14:17:52', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (4, '投保详情商业类型', '项目下标签属性类型', '', '', 3, 2, '2023-08-15 14:17:52', '2023-08-15 14:17:52', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (5, '理赔记录赔付类型', '项目下标签属性类型', '', '', 3, 2, '2023-08-15 14:17:52', '2023-08-15 14:17:52', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (6, '退保记录流式类型', '项目下标签属性类型', '', '', 3, 2, '2023-08-15 14:17:52', '2023-08-15 14:17:52', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (7, '性别', '标签', 'inType=Elasticsearch##esNodes=up01:9200##esIndex=up_policy_client##esType=_doc##selectFields=user_id,sex', '会员用户性别', 4, 3, '2023-08-15 14:17:52', '2023-08-15 14:17:52', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (8, '年龄', '标签', 'inType=Elasticsearch##esNodes=up01:9200##esIndex=up_policy_client##esType=_doc##selectFields=user_id,birthday', '会员用户出生日期:birthday', 4, 3, '2023-08-15 14:17:52', '2023-08-15 14:17:52', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (9, '收入', '标签', 'inType=Elasticsearch##esNodes=up01:9200##esIndex=up_policy_client##esType=_doc##selectFields=user_id,income', '会员用户收入水平', 4, 3, '2023-08-15 14:17:53', '2023-08-15 14:17:53', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (10, '保单状态', '标签', 'inType=Elasticsearch##esNodes=up01:9200##esIndex=up_policy_benefit##esType=_doc##selectFields=user_id,pol_flag', '会员用户的保单状态', 4, 4, '2023-08-15 14:17:53', '2023-08-15 14:17:53', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (11, '区域', '标签', 'inType=Elasticsearch##esNodes=up01:9200##esIndex=up_policy_client##esType=_doc##selectFields=user_id,direction', '会员用户所在的区域', 4, 3, '2023-08-15 14:17:53', '2023-08-15 14:17:53', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (12, '购买周期', '标签', 'inType=Elasticsearch##esNodes=up01:9200##esIndex=up_policy_benefit##esType=_doc##selectFields=user_id,buy_datetime', '保单购买的时间区间:7日、2周、1月、2月、3月、4月、5月、6月、半年以上', 4, 4, '2023-08-15 14:17:53', '2023-08-15 14:17:53', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (13, '缴费期', '标签', 'inType=Elasticsearch##esNodes=up01:9200##esIndex=up_policy_benefit##esType=_doc##selectFields=user_id,ppp', '保单购买缴费期限:10年、15年、20年、30年', 4, 4, '2023-08-15 14:17:53', '2023-08-15 14:17:53', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (14, '男', '性别属性值', '男', '男性用户群体', 5, 7, '2023-08-15 14:17:53', '2023-08-15 14:17:53', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (15, '女', '性别属性值', '女', '女性用户群体', 5, 7, '2023-08-15 14:17:53', '2023-08-15 14:17:53', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (16, '年龄段', '年龄属性值', '19500101-19591231', '50后', 5, 8, '2023-08-15 14:17:53', '2023-08-15 14:17:53', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (17, '年龄段', '年龄属性值', '19600101-19691231', '60后', 5, 8, '2023-08-15 14:17:53', '2023-08-15 14:17:53', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (18, '年龄段', '年龄属性值', '19700101-19791231', '70后', 5, 8, '2023-08-15 14:17:53', '2023-08-15 14:17:53', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (19, '年龄段', '年龄属性值', '19800101-19891231', '80后', 5, 8, '2023-08-15 14:17:53', '2023-08-15 14:17:53', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (20, '年龄段', '年龄属性值', '19900101-19991231', '90后', 5, 8, '2023-08-15 14:17:53', '2023-08-15 14:17:53', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (21, '年龄段', '年龄属性值', '20000101-20091231', '00后', 5, 8, '2023-08-15 14:17:53', '2023-08-15 14:17:53', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (22, '年龄段', '年龄属性值', '20100101-20191231', '10后', 5, 8, '2023-08-15 14:17:53', '2023-08-15 14:17:53', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (23, '年龄段', '年龄属性值', '20200101-20291231', '20后', 5, 8, '2023-08-15 14:17:53', '2023-08-15 14:17:53', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (24, '收入', '低收入', '0-60000', '收入高、中、低', 5, 9, '2023-08-15 14:17:54', '2023-08-15 14:17:54', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (25, '收入', '中等收入', '60000-120000', '收入高、中、低', 5, 9, '2023-08-15 14:17:54', '2023-08-15 14:17:54', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (26, '收入', '中上等收入', '120000-180000', '收入高、中、低', 5, 9, '2023-08-15 14:17:54', '2023-08-15 14:17:54', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (27, '收入', '高收入', '180000-240000', '收入高、中、低', 5, 9, '2023-08-15 14:17:54', '2023-08-15 14:17:54', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (28, '收入', '超高收入', '240000-1000000', '收入高、中、低', 5, 9, '2023-08-15 14:17:54', '2023-08-15 14:17:54', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (29, '收入', '富翁', '1000000-100000000', '收入高、中、低', 5, 9, '2023-08-15 14:17:54', '2023-08-15 14:17:54', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (30, '保单状态', '有效', '1', '保单是否有效', 5, 10, '2023-08-15 14:17:54', '2023-08-15 14:17:54', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (31, '保单状态', '无效', '0', '保单是否有效', 5, 10, '2023-08-15 14:17:54', '2023-08-15 14:17:54', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (32, '购买周期', '7天', '0-7', '购买周期:保单购买时间距离当前时间天数差', 5, 12, '2023-08-15 14:17:54', '2023-08-15 14:17:54', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (33, '购买周期', '2周', '8-14', '购买周期:保单购买时间距离当前时间天数差', 5, 12, '2023-08-15 14:17:54', '2023-08-15 14:17:54', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (34, '购买周期', '1月', '15-30', '购买周期:保单购买时间距离当前时间天数差', 5, 12, '2023-08-15 14:17:54', '2023-08-15 14:17:54', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (35, '购买周期', '2月', '31-60', '购买周期:保单购买时间距离当前时间天数差', 5, 12, '2023-08-15 14:17:54', '2023-08-15 14:17:54', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (36, '购买周期', '3月', '61-90', '购买周期:保单购买时间距离当前时间天数差', 5, 12, '2023-08-15 14:17:54', '2023-08-15 14:17:54', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (37, '购买周期', '4月', '91-120', '购买周期:保单购买时间距离当前时间天数差', 5, 12, '2023-08-15 14:17:54', '2023-08-15 14:17:54', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (38, '购买周期', '5月', '121-150', '购买周期:保单购买时间距离当前时间天数差', 5, 12, '2023-08-15 14:17:54', '2023-08-15 14:17:54', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (39, '购买周期', '6月', '151-180', '购买周期:保单购买时间距离当前时间天数差', 5, 12, '2023-08-15 14:17:55', '2023-08-15 14:17:55', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (40, '购买周期', '半年以上', '180-36500', '购买周期:保单购买时间距离当前时间天数差', 5, 12, '2023-08-15 14:17:55', '2023-08-15 14:17:55', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (41, '缴费期', '10年', '10', '保单的缴费期', 5, 13, '2023-08-15 14:17:55', '2023-08-15 14:17:55', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (42, '缴费期', '15年', '15', '保单的缴费期', 5, 13, '2023-08-15 14:17:55', '2023-08-15 14:17:55', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (43, '缴费期', '20年', '20', '保单的缴费期', 5, 13, '2023-08-15 14:17:55', '2023-08-15 14:17:55', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (44, '缴费期', '30年', '30', '保单的缴费期', 5, 13, '2023-08-15 14:17:55', '2023-08-15 14:17:55', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (47, '民族', null, 'inType=Elasticsearch##esNodes=up01:9200##esIndex=policy_client##esType=_doc##selectFields=user_id,race', null, 4, 3, '2023-08-15 15:33:59', '2023-08-15 15:33:59', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (50, '汉族', null, '汉族', null, 5, 47, '2023-08-15 15:39:56', '2023-08-15 15:39:56', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (51, '少数民族', null, '壮族、满族、回族、蒙古族、维吾尔族、苗族、彝族、藏族、侗族、布依族、瑶族、白族、土家族、哈尼族、哈萨克族、傣族、黎族、傈僳族、佤族、畲族、高山族、拉祜族、水族、东乡族、纳西族、景颇族、柯尔克孜族、土族、达斡尔族、仫佬族、羌族、布朗族、撒拉族、毛南族、仡佬族、锡伯族、阿昌族、普米族、塔吉克族、怒族、乌孜别克族、俄罗斯族、鄂温克族、德昂族、保安族、裕固族、京族、塔塔尔族、独龙族、鄂伦春族、赫哲族、门巴族、珞巴族和基诺', null, 5, 47, '2023-08-15 15:39:57', '2023-08-15 15:39:57', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (52, '学历', null, 'inType=Elasticsearch##esNodes=up01:9200##esIndex=policy_client##esType=_doc##selectFields=user_id,edu', null, 4, 3, '2023-08-15 15:40:44', '2023-08-15 15:40:44', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (68, '小学', null, '小学', null, 5, 52, '2023-08-15 16:12:28', '2023-08-15 16:12:28', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (69, '初中', null, '初中', null, 5, 52, '2023-08-15 16:12:28', '2023-08-15 16:12:28', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (71, '高中', null, '高中', null, 5, 52, '2023-08-15 16:12:28', '2023-08-15 16:12:28', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (72, '大专', null, '大专', null, 5, 52, '2023-08-15 16:12:28', '2023-08-15 16:12:28', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (73, '本科', null, '本科', null, 5, 52, '2023-08-15 16:12:28', '2023-08-15 16:12:28', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (74, '硕士', null, '研究生', null, 5, 52, '2023-08-15 16:12:28', '2023-08-15 16:12:28', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (75, '博士', null, '博士', null, 5, 52, '2023-08-15 16:12:28', '2023-08-15 16:12:28', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (76, '购买保险年龄', null, 'inType=Elasticsearch##esNodes=up01:9200##esIndex=policy_surrender##esType=_doc##selectFields=user_id,buy_datetime', null, 4, 3, '2023-08-15 16:14:12', '2023-08-15 16:14:12', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (77, '少年', null, '0-18', null, 5, 76, '2023-08-15 16:17:22', '2023-08-15 16:17:22', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (78, '青年', null, '18-28', null, 5, 76, '2023-08-15 16:17:22', '2023-08-15 16:17:22', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (79, '中年', null, '29-65', null, 5, 76, '2023-08-15 16:17:22', '2023-08-15 16:17:22', 2, null);
INSERT INTO tags_new.tbl_basic_tag (id, name, industry, rule, business, level, pid, ctime, utime, state, remark) VALUES (80, '老年', null, '66-120', null, 5, 76, '2023-08-15 16:17:22', '2023-08-15 16:17:22', 2, null);

通过Hive创建ElasticSearch

创建ElasticSearch是通过Hive创建的.

  • 需要先加载es-hadoop插件
  • 需要设置表的存储位置以及表属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
add jar hdfs:///libs/es-hadoop/elasticsearch-hadoop-7.10.2.jar;
drop table insurance.claim_info;
-- 用户表
create external table if not exists insurance.claim_info(
pol_no string comment '保单号',
user_id string comment '用户号',
buy_datetime string comment '购买日期',
insur_code string comment '保险代码',
claim_date string comment '理赔日期',
claim_item string comment '理赔责任',
claim_mnt decimal(35, 6) comment '理赔金额',
df string
) comment '用户es外部表'
stored by 'org.elasticsearch.hadoop.hive.EsStorageHandler'
tblproperties('es.resource'='claim_info/_doc',
'es.nodes'='up01:9200',
'es.index.auto.create'='TRUE',
'es.index.refresh_interval' = '-1',
'es.index.number_of_replicas' = '0',
'es.batch.write.retry.count' = '6',
'es.batch.write.retry.wait' = '60s',
'es.mapping.name' = 'pol_no:pol_no,user_id:user_id, buy_datetime:buy_datetime,insur_code:insur_code,claim_date:claim_date,claim_item:claim_item,claim_mnt:claim_mnt,df:df' );

insert overwrite table insurance.claim_info select pol_no,
user_id,
buy_datetime,
insur_code,
claim_date,
claim_item,
claim_mnt,
df from insurance_ods.claim_info;

drop table insurance.policy_client;
CREATE external table if not exists insurance.policy_client (
user_id STRING COMMENT '用户号',
name STRING COMMENT '姓名',
id_card STRING COMMENT '身份证号',
phone STRING COMMENT '手机号',
sex STRING COMMENT '性别',
height INT COMMENT '身高',
birthday STRING COMMENT '出生日期',
province STRING COMMENT '省份',
city STRING COMMENT '城市',
direction STRING COMMENT '区域',
income INT COMMENT '收入',
race STRING COMMENT '民族',
marriage_state STRING COMMENT '婚姻状况',
edu STRING COMMENT '学历',
sign STRING COMMENT '星座')
comment '用户es外部表'
stored by 'org.elasticsearch.hadoop.hive.EsStorageHandler'
tblproperties('es.resource'='policy_client/_doc',
'es.nodes'='up01:9200',
'es.index.auto.create'='TRUE',
'es.index.refresh_interval' = '-1',
'es.index.number_of_replicas' = '0',
'es.batch.write.retry.count' = '6',
'es.batch.write.retry.wait' = '60s',
'es.mapping.name' = 'user_id:user_id,name:name,id_card:id_card,phone:phone,sex:sex,height:height,birthday:birthday,province:province,city:city,direction:direction,income:income,race:race,marriage_state:marriage_state,edu:edu,sign:sign' );

add jar hdfs:///libs/es-hadoop/elasticsearch-hadoop-7.10.2.jar;
insert overwrite table insurance.policy_client select user_id,
name,
id_card,
phone,
sex,
height,
birthday,
province,
city,
direction,
income,
race,
marriage_state,
edu,
sign
df from insurance_ods.policy_client;

drop table insurance.policy_surrender;
CREATE external table if not exists insurance.policy_surrender (
pol_no STRING COMMENT '保单号',
user_id STRING COMMENT '用户号',
buy_datetime STRING COMMENT '投保日期',
keep_days INT COMMENT '持有天数',
elapse_date STRING COMMENT '失效日期')
comment '用户es外部表'
stored by 'org.elasticsearch.hadoop.hive.EsStorageHandler'
tblproperties('es.resource'='policy_surrender/_doc',
'es.nodes'='up01:9200',
'es.index.auto.create'='TRUE',
'es.index.refresh_interval' = '-1',
'es.index.number_of_replicas' = '0',
'es.batch.write.retry.count' = '6',
'es.batch.write.retry.wait' = '60s',
'es.mapping.name' = 'pol_no:pol_no,user_id:user_id, buy_datetime:buy_datetime,keep_days:keep_days,elapse_date:elapse_date' );

insert overwrite table insurance.policy_surrender select pol_no,
user_id,
buy_datetime,
keep_days,
elapse_date from insurance_ods.policy_surrender;

drop table insurance.policy_benefit;
CREATE TABLE insurance.policy_benefit (
pol_no STRING COMMENT '保单号',
user_id STRING COMMENT '用户号',
ppp STRING COMMENT '缴费期',
age_buy smallint COMMENT '投保年龄',
buy_datetime STRING COMMENT '投保日期',
insur_name STRING COMMENT '保险名称',
insur_code STRING COMMENT '保险代码',
pol_flag SMALLINT COMMENT '保单状态,1有效,0失效',
elapse_date STRING COMMENT '保单失效时间')
comment '投保记录表'
stored by 'org.elasticsearch.3hadoop.hive.EsStorageHandler'
tblproperties('es.resource'='policy_benefit/_doc',
'es.nodes'='up01:9200',
'es.index.auto.create'='TRUE',
'es.index.refresh_interval' = '-1',
'es.index.number_of_replicas' = '0',
'es.batch.write.retry.count' = '6',
'es.batch.write.retry.wait' = '60s',
'es.mapping.name' = 'pol_no:pol_no,user_id:user_id, ppp:ppp,age_buy:age_buy,buy_datetime:buy_datetime,insur_name:insur_name,insur_code:insur_code,pol_flag:pol_flag,elapse_date:elapse_date' );

insert overwrite table insurance.policy_benefit select pol_no,
user_id,
ppp,
age_buy,
buy_datetime,
insur_name,
insur_code,
pol_flag,
elapse_date from insurance_ods.policy_benefit;

PySpark代码

注意: 前方大量SparkDSL风格写法,喜欢使用SparkSQL风格的小伙伴慎入

封装基类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import os

from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F
from pyspark.sql.types import StringType

from RuleMeta import Rule4Meta

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_HOME'] = '/root/anaconda3/envs/pyspark_env/bin/python'

# 定义函数将字符串解析为对象
def str2Dict(tags: str):
all_list = tags.split('##')
tags_dict = {}
for i in all_list:
tags_dict[i.split('=')[0]] = i.split('=')[1]
return tags_dict

@F.udf
def updateTagsId(new_tags_id:str, old_tags_id:str, tags5Id:str):
if new_tags_id is None:
return old_tags_id
if old_tags_id is None:
return new_tags_id
old_tags_list = old_tags_id.split(',')
new_tags5Id_list = tags5Id.split(',')

# print(new_tags_id) # 69
# print(old_tags_id) # 50
# print(tags5Id) # 68,69,70,71,73,74,75
# print(type(tags5Id))
# print('---------------------------------------------')
# print(old_tags_list) # ['50']
# print(new_tags5Id_list) # ['68', '69', '70', '71', '73', '74', '75']
# old_tags_id (1, 6, 28) tags5Id (1, 2, 3) 得到最终(6, 28)也就是将之前的标签剔除掉
res_tags = []
for i in old_tags_list:
if i not in new_tags5Id_list:
res_tags.append(i)
# 将新标签的id与旧的标签进行合并
res_tags.append(new_tags_id)
#
# result_tag = []
# for i in res_tags:
# result_tag.append(str(i))
# 返回最终的tagsId
return ','.join(res_tags)



class ComputeBase():
def __init__(self, appName, tag4_id):
self.appName = appName
self.tag4_id = tag4_id
tableName = 'tbl_basic_tag'
self.url = "jdbc:mysql://192.168.88.166:3306/tags_new?useSSL=false&useUnicode=true&characterEncoding=utf8&user=root&password=123456"
self.sql = f'select id, rule from {tableName} where id = {tag4_id} or pid = {tag4_id}'
self.result_index = "insurance_result"

# 创建Spark开发环境并返回
def getSparkSession(self):
spark = SparkSession.builder\
.appName(self.appName)\
.master('local[*]')\
.config('spark.sql.shuffle.partitions', 20)\
.getOrCreate()
return spark

# 读取MySQL中的数据, 返回tag_df(四级标签和五级标签的元数据
def getMysqlData(self, SparkSession:SparkSession):
tag_df = SparkSession.read.format('jdbc')\
.option('url', self.url)\
.option('query', self.sql)\
.load()
return tag_df

# 获取四级标签的rule对象
def getTag4Rule(self, tags_df: DataFrame):
# 获取四级标签的rule字符串
rule_str = tags_df.rdd.map(lambda row: row.rule).collect()[0]
# 将字符串转为字典
rule_dict = str2Dict(rule_str)
# 将字典转为对象
rule_meta = Rule4Meta.dict_to_obj(rule_dict)
# 返回rule对象
return rule_meta

# 获取标签计算需要的数据, 也就是从es中读取源数据 es_df
def getComputeData(self, rule_mete: Rule4Meta, SparkSession:SparkSession):
es_df = SparkSession.read.format('es')\
.option('es.nodes', rule_mete.esNodes)\
.option('es.resource', rule_mete.esIndex) \
.option('es.mapping.date.rich', 'false') \
.option('es.read.field.include', rule_mete.selectFields)\
.load()
return es_df

# 获取五级标签的元数据(tag5_df)
def getTag5Df(self, tags_df):
tag5_df = tags_df.where(f'id!={self.tag4_id}')
return tag5_df

# 标签计算(有些标签计算逻辑不相同, 因此后面需要重写该方法)
def tagCompute(self, es_df:DataFrame,tag5_df:DataFrame):
pass

# 合并新老结果
def tagResultUpdate(self, new_result_df, SparkSession:SparkSession, tag5_df: DataFrame, rule_meta:Rule4Meta):
# 首先读取旧的标签的结果(userId, tagsId)
old_result_df = SparkSession.read.format('es')\
.option('es.nodes', rule_meta.esNodes)\
.option('es.resource', self.result_index)\
.load()
old_result_pre_df = old_result_df.select(F.col('userId'), F.col('tagsId').cast(StringType()).alias('tagsId'))
# 获取五级标签的集合(注意是rule的第一个元素
tag5_list = tag5_df.select(F.col('id').cast(StringType())).rdd.map(lambda row:row.id).collect()
# 给获取的五级标签的集合放到new_result_df中(userId, tagsId, tags5Id)
new_df = new_result_df.withColumn('tags5Id', F.lit(','.join(tag5_list)))
# 使用自定义udf函数(传入两个udf, 最终实现返回一个最终的df
res_df = old_result_pre_df\
.join(new_df, on=new_df.userId==old_result_df.userId, how='left')\
.select(old_result_df.userId.alias('userId'), updateTagsId(new_df.tagsId, old_result_pre_df.tagsId, new_df.tags5Id).alias('tagsId'))

return res_df

# 将标签计算的结果写入到es中
def saveResult(self, result_df: DataFrame, rule_meta: Rule4Meta):
result_df.write.format('es')\
.option('es.nodes', rule_meta.esNodes)\
.option('es.resource', self.result_index)\
.option('es.write.operation', 'upsert') \
.option('es.mapping.id', 'userId') \
.option('es.mapping.name', 'userId:userId, tagsId:tagsId') \
.mode('append') \
.save()

# 自动调用所有的方法
def execute(self):
# 创建Spark运行环境
spark = self.getSparkSession()
# 读取MySQL中的元数据
tags_df = self.getMysqlData(spark)
# 获取四级标签的rule对象
rule_meta = self.getTag4Rule(tags_df)
# 获取标签计算需要的数据(es_df)
es_df = self.getComputeData(rule_meta,spark)
# 获取五级标签的元数据
tag5_df = self.getTag5Df(tags_df)
# 标签计算
res_df = self.tagCompute(es_df,tag5_df)
if res_df is not None:
# 合并新老标签结果
new_res_df = self.tagResultUpdate(res_df, spark, tag5_df, rule_meta)
# new_res_df.show()
# 将结果写入es
self.saveResult(new_res_df,rule_meta)

学历标签计算任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from pyspark.sql.types import StringType

from TagComputeBase import ComputeBase

# 定义学历标签计算类, 继承自标签计算基类
class DegreeTagCompute(ComputeBase):
def tagCompute(self, es_df: DataFrame, tag5_df: DataFrame):
# es_df.show()
# +----+-------+
# | edu|user_id|
# +----+-------+
# |大专| 22-540|
# |高中|292-162|
# |本科|332-107|

# tag5_df.show()
# +---+----+
# | id|rule|
# +---+----+
# | 68|小学|
# | 69|初中|
# | 70|中专|
# | 71|高中|
# | 73|本科|
res_df = es_df\
.join(tag5_df, on=es_df.edu == tag5_df.rule, how='left')\
.select(F.col('user_id').cast(StringType()).alias('userId'), tag5_df.id.cast(StringType()).alias('tagsId'))


# res_df.show()
return res_df
if __name__ == '__main__':
appName = '学历标签计算任务'
tag4Id = 52
# 创建标签计算对象
degreeTagCompute = DegreeTagCompute(appName, tag4Id)
degreeTagCompute.execute()

民族标签计算任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from pyspark.sql.types import StringType

from TagComputeBase import ComputeBase

# 定义民族标签计算类, 继承自标签计算基类
class RaceTagCompute(ComputeBase):
def tagCompute(self, es_df: DataFrame, tag5_df: DataFrame):
# es_df.show()
# +--------+-------+
# | race|user_id|
# +--------+-------+
# | 满族| 22-540|
# | 汉族|292-162|
# | 汉族|332-107|

# tag5_df.show()
tag5_pre_df = tag5_df.select('id', F.explode(F.split('rule', '、')).alias('rule'))
result_df = es_df.join(tag5_pre_df,on=tag5_pre_df.rule==es_df.race, how='left')\
.select(F.col('user_id').cast(StringType()).alias('userId'), F.col('id').alias('tagsId'))
return result_df
if __name__ == '__main__':
appName = '民族标签计算任务'
tag4Id = 47
# 创建标签计算对象
raceTagCompute = RaceTagCompute(appName, tag4Id)
raceTagCompute.execute()

购保年龄标签计算任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from pyspark.sql.types import StringType

from TagComputeBase import ComputeBase

# 定义购保标签计算类, 继承自标签计算基类
class BuyInsuranceAgeTagCompute(ComputeBase):
def tagCompute(self, es_df: DataFrame, tag5_df: DataFrame):
# es_df.show()
# +------------+-------+
# |buy_datetime|user_id|
# +------------+-------+
# | 2022-02-11| 1-422|
# | 2019-07-17| 1-424|

# tag5_df.show()
# +---+------+
# | id| rule|
# +---+------+
# | 77| 0-18|
# | 78| 18-28|

tag5_pre_df = tag5_df.select(F.col('id').cast(StringType()).alias('id'), F.split('rule', '-')[0].alias('start'), F.split('rule', '-')[1].alias('end'))
# tag5_pre_df.show()
# +------+-----+---+
# |id |start|end|
# +------+-----+---+
# | 77| 0| 18|
# | 78| 18| 28|


# 从es中读取客户的生日
spark = self.getSparkSession()
birthday_df = spark.read.format('es')\
.option('es.nodes', 'up01:9200')\
.option('es.resource', 'policy_client')\
.option('es.mapping.date.rich', 'false')\
.option('es.read.field.include', 'user_id,birthday')\
.load()
# birthday_df.show()
# +----------+-------+
# | birthday|user_id|
# +----------+-------+
# |1981-07-04| 22-540|
# |1960-09-27|292-162|

es_pre_df = es_df\
.join(birthday_df, on=birthday_df.user_id==es_df.user_id,how='left')\
.select(es_df.user_id.cast(StringType()).alias('userId'), (F.datediff(F.col('buy_datetime'), F.col('birthday'))/365).alias('days'))
# es_pre_df.show()
# +------+------------------+
# |userId| days|
# +------+------------------+
# | 1-111| 58.78356164383562|
# | 1-150|60.087671232876716|
# | 1-156| 30.96986301369863|

es_final_df = es_pre_df\
.groupby('userId')\
.agg(F.round(F.max('days'), 0).alias('days'))
# es_final_df.show()
# +------+----+
# |userId|days|
# +------+----+
# | 1-111|59.0|
# | 1-150|60.0|
# | 1-156|31.0|
# | 1-166|20.0|

# 进行join连接, 判断用户购买保险的年龄是多大, 并打上标签
res_df = es_final_df\
.join(tag5_pre_df, on=es_final_df.days.between(tag5_pre_df.start, tag5_pre_df.end), how='left')\
.select(F.col('userId').cast(StringType()).alias('userId'), F.col('id').cast(StringType()).alias('tagsId'))
res_df.show()
return res_df
if __name__ == '__main__':
appName = '购买保险年龄标签计算任务'
tag4Id = 76
# 创建标签计算对象
buyTagCompute = BuyInsuranceAgeTagCompute(appName, tag4Id)
buyTagCompute.execute()

封装对象任务

(不用手写ES的Nodes端口号主机号等)其实主要怕是写错

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from dataclasses import dataclass

# 使用装饰器的方式来创建类,是为了不手动写全参构造函数
@dataclass
class Rule4Meta():
# 定义四级标签的封装类
inType: str
esIndex: str
esNodes: str
esType: str
selectFields: str

def dict_to_obj(rule_four_dict: dict):
# 传入四级标签的rule(类型是dict)返回一个对象
return Rule4Meta(
rule_four_dict.get('inType',''),
rule_four_dict.get('esIndex', ''),
rule_four_dict.get('esNodes',''),
rule_four_dict.get('esType', ''),
rule_four_dict.get('selectFields', '')
)

民族标签计算(不使用基类)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
from pyspark.sql import SparkSession
import os

from pyspark.sql.types import StringType

from RuleMeta import Rule4Meta
import pyspark.sql.functions as F

# 准备Spark开发环境
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_HOME'] = '/root/anaconda3/envs/pyspark_env/bin/python'

if __name__ == '__main__':
# 创建Spark运行环境
spark = SparkSession\
.builder\
.master("local[*]")\
.appName('获取MySQL的数据到ES中')\
.config("spark.sql.shuffle.partitions", 10)\
.getOrCreate()

# 配置MySQL的相关参数
url = "jdbc:mysql://192.168.88.166:3306/tags_new?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false&user=root&password=123456"
tableName = "tbl_basic_tag"
tag4Id = 47
sql = f"select id,rule from {tableName} where id = {tag4Id} or pid = {tag4Id}"

# 通过Spark读取四级和五级标签规则的元数据
tags_df = spark.read\
.format('jdbc')\
.option('url', url)\
.option('query', sql).load()
# tags_df.show()
# +---+-------------------------------------+
# | id| rule|
# +---+-------------------------------------+
# | 47| inType=Elasticsea...|
# | 50| 汉族|
# | 51|壮族、满族、回族、蒙古族、维吾尔族...|
# +---+-------------------------------------+

# 如果不加后面的0, 取出来的是个列表, 列表中有一个字符串
tags_str = tags_df.rdd.map(lambda row: row.rule).collect()[0]

# 将字符串转为字典
def str2Dict(tags: str):
all_list = tags.split('##')
tags_dict = {}
for i in all_list:
tags_dict[i.split('=')[0]] = i.split('=')[1]
return tags_dict

tags_dict = str2Dict(tags_str)
# 将tags_str解析为rule对象
meta_obj = Rule4Meta.dict_to_obj(tags_dict)

# 读取es中的元数据
es_df = spark.read.format('es')\
.option('es.resource', meta_obj.esIndex)\
.option('es.nodes', meta_obj.esNodes)\
.option('es.read.field.include', meta_obj.selectFields).load()
# es_df.show()
# +--------+-------+
# | race|user_id|
# +--------+-------+
# | 满族| 22-540|
# | 汉族|292-162|

# 读取五级标签的id以及rule
tag5_df = tags_df.where(f'id!={tag4Id}')
# tag5_df.show()
# +---+------------------------------+
# | id| rule|
# +---+------------------------------+
# | 50| 汉族|
# | 51|壮族、满族、回族、蒙古族、维吾尔族...|
# +---+------------------------------+

# 使用炸裂函数处理
tag5_pre_df = tag5_df.select(F.col('id'), F.explode(F.split('rule', '、')).alias('race'))
# tag5_pre_df.show()
# +---+--------+
# | id| race|
# +---+--------+
# | 50| 汉族|
# | 51| 壮族|
# | 51| 满族|

# 给用户打标签, 并写入到es中
result_df = es_df\
.join(tag5_pre_df, on=es_df.race == tag5_pre_df.race, how='left')\
.select(es_df.user_id.cast(StringType()).alias('userId'), tag5_pre_df.id.cast(StringType()).alias('tagsId'))

result_df.show()
# 将最终结果写入到es中
result_index = 'insurance_result'
result_df.write.format('es') \
.option('es.nodes', meta_obj.esNodes) \
.option('es.resource', result_index) \
.option('es.write.operation', 'upsert') \
.option('es.mapping.id', 'userId') \
.option('es.mapping.name', "userId:userId,tagsId:tagsId") \
.mode('append') \
.save()