本文主要讲了数据抽取分为全量和增量抽取,以及拉链表的具体实现。

拉链表的公式:(旧的拉链表 left join 增量数据) union all 增量数据。

数据流向:业务数据库-> ODS,ODS->DWD

Hive中解决乱码问题

在MySQL中执行如下命令

1
2
3
4
5
6
7
8
9
-- 注意 下面sql语句是需要在MySQL中执行  修改Hive存储的元数据信息(metadata)
use hive;
show tables;

alter table COLUMNS_V2 modify column COMMENT varchar(256) character set utf8;
alter table TABLE_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;
alter table PARTITION_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8 ;
alter table PARTITION_KEYS modify column PKEY_COMMENT varchar(4000) character set utf8;
alter table INDEX_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;

ODS搭建

通过Sqoop做数据抽取的工作,尽量保证数据仓库和MySQL同样的数据粒度。

任何增量导入的第一步一定是全量导入。

还有区分ODS和DWD等层,是通过建立不同的数据库实现。

全量导入

注意:

  • 这里如果是全量导入到ODS层,需要注意导入到ODS表的结构,是否是分区表,如果是分区表的话需要添加分区字段,而且分区字段需要起别名(为ODS表的分区字段名)

    • "select *, '2023-05-28' as dt from t_goods_evaluation where 1=1 and \\$CONDITIONS"

    例如:

    1
    2
    3
    4
    5
    6
    7
    8
    /usr/bin/sqoop import "-Dorg.apache.sqoop.splitter.allow_text_splitter=true" \
    --connect 'jdbc:mysql://192.168.88.80:3306/yipin?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true' \
    --username root \
    --password 123456 \
    --query "select *, '2023-05-28' as dt from t_goods_evaluation where 1=1 and create_time between '2010-01-01 00:00:00' and '2021-11-29 23:59:59' and \$CONDITIONS" \
    --hcatalog-database yp_ods \
    --hcatalog-table t_goods_evaluation \
    -m 1

全量覆盖导入

比如地区表,通常一个表的变化非常少,因为地名很少变嘛!(对于这种方式就用全量覆盖,通常一个月或者一个季度做一次)

1
2
3
4
5
6
7
8
/usr/bin/sqoop import "-Dorg.apache.sqoop.splitter.allow_text_splitter=true" \
--connect 'jdbc:mysql://192.168.88.80:3306/yipin?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true' \
--username root \
--password 123456 \
--query "select * from t_district where 1=1 and \$CONDITIONS" \
--hcatalog-database yp_ods \
--hcatalog-table t_district \
-m 1

Sqoop导入后如何验真?

  • 总量校验:select * from 表名;
  • 条件校验:select * from 表名 where id =1000;
  • 样本校验:select * from 表名 tablesample(bucket 1 out of 2 on rand(gender));按照性别将数据分成两个桶,取第一份数据。rand(gender) 也可以写成 rand(1)— 指定随机数种子

增量导入(2种方式)

增量导入仅新增

在query中,where后面需要写上create_time between ‘2023-05-28 00:00:00’ and ‘2023-05-28 23:59:59’

1
2
3
4
5
6
7
8
9
/usr/bin/sqoop import "-Dorg.apache.sqoop.splitter.allow_text_splitter=true" \
--connect 'jdbc:mysql://192.168.88.80:3306/yipin?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true' \
--username root \
--password 123456 \
--query "select *, '2023-05-28' as dt from t_store where
create_time between '2023-05-28 00:00:00' and '2023-05-28 23:59:59')and \$CONDITIONS" \
--hcatalog-database yp_ods \
--hcatalog-table t_store \
-m 1

增量导入新增+修改

1
2
3
4
5
6
7
8
9
10
11
12
/usr/bin/sqoop import "-Dorg.apache.sqoop.splitter.allow_text_splitter=true" \
--connect 'jdbc:mysql://192.168.88.80:3306/yipin?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true' \
--username root \
--password 123456 \
--query "select *, '2023-05-28' as dt from t_store where
(create_time between '2023-05-28 00:00:00' and '2023-05-28 23:59:59') -- 新增数据
or
(update_time between '2023-05-28 00:00:00' and '2023-05-28 23:59:59') -- 修改数据
and \$CONDITIONS" \
--hcatalog-database yp_ods \
--hcatalog-table t_store \
-m 1

Example:login_time between '2023-05-28 00:00:00' and '2023-05-28 23:59:59'

但是实际开发中要在Linux中

  • 定义时间TD_DATE = date -d '1 days ago' + "%Y-%m-%d"
  • 调用时间${TD_DATE}

例如:

1
2
3
4
5
6
7
8
/usr/bin/sqoop import "-Dorg.apache.sqoop.splitter.allow_text_splitter=true" \
--connect 'jdbc:mysql://192.168.88.80:3306/yipin?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true' \
--username root \
--password 123456 \
--query "select *, '${TD_DATE}' as dt from t_user_login where 1=1 and (login_time between '${TD_DATE} 00:00:00' and '${TD_DATE} 23:59:59') and \$CONDITIONS" \
--hcatalog-database yp_ods \
--hcatalog-table t_user_login \
-m 1

拉链表及实现

拉链表是缓慢渐变维的SCD2,showly changed dimension,是为了维护历史数据而产生的。

什么是维护历史数据,用户的居住地从杭州到北京,这段在杭州的时间就需要记录。(个人理解)

拉链表的公式:(旧的拉链表 left join 增量数据)union on 增量数据

解析:if (ozu.userid=null or d.endtime<’9999-12-31’,d.endtime,date_sub(ozu.starttime,1))

[是在left join中实现的]

以左表为基准进行join,

  • userid=nul说明增量表中没有这条数据,即这条数据没有被修改过
  • d.endtime < ‘9999-12-31’ 什么情况下会出现这种情况,我想一定是该拉链表中的endtime 已经被修改了,不是’9999-12-31’了,而是一个确定的时间,如(start_time - end_time)为2023-05-28 - 2023-05-29
  • 最后一句,如果前面的or事件都没有发生,则是原表的endtime 否则是增量表的starttime - 1即通过date_sub函数实现

p9XXt54.png

增量数据如何获取?

获取仅新增

1
2
3
select * from t_store 
where create_time
between '2023-05-28 00:00:00' and '2023-05-28 23:59:59')

获取新增和更新

1
2
3
4
select *from t_store where
(create_time between '2023-05-28 00:00:00' and '2023-05-28 23:59:59') -- 新增数据
or
(update_time between '2023-05-28 00:00:00' and '2023-05-28 23:59:59') -- 修改数据

这里模拟拉链表的实现

创建拉链表

1
2
3
4
5
6
7
8
9
10
-- 创建拉链表
create table dw_zipper(
userid string,
phone string,
nick string,
gender int,
addr string,
starttime string,
endtime string
) row format delimited fields terminated by '\t';

创建增量表

1
2
3
4
5
6
7
8
9
10
-- 创建增量表,用于模拟生产环境中的增量数据(即通过sqoop的query使用updatetime获取增量数据
create table ods_zipper_update(
userid string,
phone string,
nick string,
gender int,
addr string,
starttime string,
endtime string
) row format delimited fields terminated by '\t';

获取最新的拉链表

1
2
3
4
5
6
7
8
9
10
11
12
select
userid,
phone,
nick,
gender,
addr,
starttime,
if (ozu.userid=null or d.endtime<'9999-12-31',d.endtime,date_sub(ozu.starttime,1)) as endtime
from dw_zipper d
left join ods_zipper_update ozu on d.userid = ozu.userid
union all
select * from ods_zipper_update;

将获取的新拉链表覆盖旧的

1
2
3
4
5
6
7
8
9
10
11
12
13
insert overwrite table zipper
select
d.userid,
d.phone,
d.nick,
d.gender,
d.addr,
d.starttime,
if (ozu.userid=null or d.endtime<'9999-12-31',d.endtime,date_sub(ozu.starttime,1)) as endtime
from dw_zipper d
left join ods_zipper_update ozu on d.userid = ozu.userid
union all
select * from ods_zipper_update;

DWD

DWD 层数据的三种导入方式:

  • 方式一:拉链导入
    • 适合场景:增量及更新同步表
    • 表设计要求:start_date开始时间、end_date结束时间
      • start_date 表示数据有效的开始时间 可以作为表的分区字段来使用
      • end_date 表示数据失效的时间,默认数据都是9999-99-99 表示一直有效 。当有更新的时候,通过拉链表操作修改end_date。
    • 典型代表:fact_shop_order 订单表、fact_order_settle订单结算表等
  • 方式二:全量覆盖导入
    • 适合场景:不考虑历史数据是否存在,每次导入直接覆盖
    • 表设计要求:没啥要求,也不用分区,也不用拉链。
    • 典型代表:dim_district区域字典表、dim_date时间维度表
  • 方式三:增量导入
    • 适合场景:仅考虑每次的增量数据同步
    • 表设计要求:
      • 分区表partitioned by (dt string),分区字段往往是时间日期。
      • 一个日期一个分区,一次增量导入。
    • 典型代表:fact_goods_evaluation订单评价表、fact_user_login登录记录表。

向分区表中添加字段,如果报错

1
2
3
4
5
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=10000;
set hive.exec.max.dynamic.partitions=100000;
set hive.exec.max.created.files=150000;

增量导入(拉链导入)

​ 因为DWD层的事实表是拉链表,且是一个动态分区表,将ODS层的数据导入DWD也就是导入到拉链表及动态分区表的一个过程。

建表SQL(拉链表)-订单事实表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
DROP TABLE if EXISTS yp_dwd.fact_shop_order;
CREATE TABLE yp_dwd.fact_shop_order(
id string COMMENT '根据一定规则生成的订单编号',
order_num string COMMENT '订单序号',
buyer_id string COMMENT '买家的userId',
store_id string COMMENT '店铺的id',
order_from string COMMENT '此字段可以转换 1.安卓\; 2.ios\; 3.小程序H5 \; 4.PC',
order_state int COMMENT '订单状态:1.已下单\; 2.已付款, 3. 已确认 \;4.配送\; 5.已完成\; 6.退款\;7.已取消',
create_date string COMMENT '下单时间',
finnshed_time timestamp COMMENT '订单完成时间,当配送员点击确认送达时,进行更新订单完成时间,后期需要根据订单完成时间,进行自动收货以及自动评价',
is_settlement tinyint COMMENT '是否结算\;0.待结算订单\; 1.已结算订单\;',
is_delete tinyint COMMENT '订单评价的状态:0.未删除\; 1.已删除\;(默认0)',
evaluation_state tinyint COMMENT '订单评价的状态:0.未评价\; 1.已评价\;(默认0)',
way string COMMENT '取货方式:SELF自提\;SHOP店铺负责配送',
is_stock_up int COMMENT '是否需要备货 0:不需要 1:需要 2:平台确认备货 3:已完成备货 4平台已经将货物送至店铺 ',
create_user string,
create_time string,
update_user string,
update_time string,
is_valid tinyint COMMENT '是否有效 0: false\; 1: true\; 订单是否有效的标志',
end_date string COMMENT '拉链结束日期')
COMMENT '订单表'
partitioned by (start_date string)
row format delimited fields terminated by '\t'
stored as orc
tblproperties ('orc.compress' = 'SNAPPY');

首次全量导入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
------------------因为采用了动态分区插入技术 因此需要设置相关参数---------------
--分区
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=10000;
set hive.exec.max.dynamic.partitions=100000;
set hive.exec.max.created.files=150000;
--hive压缩
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
--写入时压缩生效
set hive.exec.orc.compression.strategy=COMPRESSION;

--首次全量导入
INSERT overwrite TABLE yp_dwd.fact_shop_order PARTITION (start_date)
SELECT
id, order_num, buyer_id, store_id,
case order_from -- 这一步就是所谓的数据转换吧
when 1 then 'android'
when 2 then 'ios'
when 3 then 'miniapp'
when 4 then 'pcweb'
else 'other' end as order_from,
order_state,
create_date,
finnshed_time,
is_settlement,
is_delete,
evaluation_state,
way,
is_stock_up,
create_user,
create_time,
update_user,
update_time,
is_valid,
'9999-99-99' end_date,
dt as start_date -- 既充当动态分区字段,又充当拉链表的开始时间
FROM yp_ods.t_shop_order;

ODS层抽取新增、更新数据(Sqoop抽取)

1
2
3
4
select * from t_shop_order where
create_time between '2023-05-29 00:00:00' and '2023-05-29 23:59:59' -- 新增
or
update_time between '2023-05-29 00:00:00' and '2023-05-29 23:59:59'; -- 修改, 总计: 2条(修改1条, 新增1条)
1
2
3
4
5
6
7
8
/usr/bin/sqoop import "-Dorg.apache.sqoop.splitter.allow_text_splitter=true" \
--connect 'jdbc:mysql://192.168.88.80:3306/yipin?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true' \
--username root \
--password 123456 \
--query "select *, '2021-11-30' as dt from t_shop_order where 1=1 and (create_time between '2021-11-30 00:00:00' and '2021-11-30 23:59:59') or (update_time between '2021-11-30 00:00:00' and '2021-11-30 23:59:59') and \$CONDITIONS" \
--hcatalog-database yp_ods \
--hcatalog-table t_shop_order \
-m 1

创建中间临时表,用于保存拉链结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
DROP TABLE if EXISTS yp_dwd.fact_shop_order_tmp;
CREATE TABLE yp_dwd.fact_shop_order_tmp(
id string COMMENT '根据一定规则生成的订单编号',
order_num string COMMENT '订单序号',
buyer_id string COMMENT '买家的userId',
store_id string COMMENT '店铺的id',
order_from string COMMENT '此字段可以转换 1.安卓\; 2.ios\; 3.小程序H5 \; 4.PC',
order_state int COMMENT '订单状态:1.已下单\; 2.已付款, 3. 已确认 \;4.配送\; 5.已完成\; 6.退款\;7.已取消',
create_date string COMMENT '下单时间',
finnshed_time timestamp COMMENT '订单完成时间,当配送员点击确认送达时,进行更新订单完成时间,后期需要根据订单完成时间,进行自动收货以及自动评价',
is_settlement tinyint COMMENT '是否结算\;0.待结算订单\; 1.已结算订单\;',
is_delete tinyint COMMENT '订单评价的状态:0.未删除\; 1.已删除\;(默认0)',
evaluation_state tinyint COMMENT '订单评价的状态:0.未评价\; 1.已评价\;(默认0)',
way string COMMENT '取货方式:SELF自提\;SHOP店铺负责配送',
is_stock_up int COMMENT '是否需要备货 0:不需要 1:需要 2:平台确认备货 3:已完成备货 4平台已经将货物送至店铺 ',
create_user string,
create_time string,
update_user string,
update_time string,
is_valid tinyint COMMENT '是否有效 0: false\; 1: true\; 订单是否有效的标志',
end_date string COMMENT '拉链结束日期')
COMMENT '订单表'
partitioned by (start_date string)
row format delimited fields terminated by '\t'
stored as orc
tblproperties ('orc.compress' = 'SNAPPY')

拉链操作,将结果写入临时表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
insert overwrite table yp_dwd.fact_shop_order_tmp partition (start_date)
select *
from (
--1、ods表的新分区数据(有新增和更新的数据)
select id,
order_num,
buyer_id,
store_id,
case order_from
when 1
then 'android'
when 2
then 'ios'
when 3
then 'miniapp'
when 4
then 'pcweb'
else 'other'
end
as order_from,
order_state,
create_date,
finnshed_time,
is_settlement,
is_delete,
evaluation_state,
way,
is_stock_up,
create_user,
create_time,
update_user,
update_time,
is_valid,
'9999-99-99' end_date,
'2021-11-30' as start_date
from yp_ods.t_shop_order
where dt='2021-11-30'

union all

-- 2、历史拉链表数据,并根据up_id判断更新end_time有效期
select
fso.id,
fso.order_num,
fso.buyer_id,
fso.store_id,
fso.order_from,
fso.order_state,
fso.create_date,
fso.finnshed_time,
fso.is_settlement,
fso.is_delete,
fso.evaluation_state,
fso.way,
fso.is_stock_up,
fso.create_user,
fso.create_time,
fso.update_user,
fso.update_time,
fso.is_valid,
--3、更新end_time:如果没有匹配到变更数据,或者当前已经是无效的历史数据,则保留原始end_time过期时间;否则变更end_time时间为前天(昨天之前有效)
if (tso.id is null or fso.end_date<'9999-99-99', fso.end_date, date_add(tso.dt, -1)) end_time,
fso.start_date
from yp_dwd.fact_shop_order fso left join (select * from yp_ods.t_shop_order where dt='2021-11-30') tso
on fso.id=tso.id
) his
order by his.id, start_date;

查询临时表验证,并将结果覆盖拉链表

1
2
3
4
5
6
7
8
9
select *
from yp_dwd.fact_shop_order_tmp where id='dd1910223851672f32'; -- 查询

--可以看到,这条订单有两条数据
--第一条根据end_date信息可以表名是历史状态数据
--第一条根据end_date信息是9999-99-99表明是当前有效的状态数据

INSERT OVERWRITE TABLE yp_dwd.fact_shop_order partition (start_date) -- 覆盖
SELECT * from yp_dwd.fact_shop_order_tmp;

增量同步

  • 第一次导入(全量)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
INSERT overwrite TABLE yp_dwd.fact_goods_evaluation PARTITION(dt)
select
id,
user_id,
store_id,
order_id,
geval_scores,
geval_scores_speed,
geval_scores_service,
geval_isanony,
create_user,
create_time,
update_user,
update_time,
is_valid,
substr(create_time, 1, 10) as dt
from yp_ods.t_goods_evaluation;
  • 增量导入操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
INSERT overwrite TABLE yp_dwd.fact_goods_evaluation PARTITION(dt)
select
id,
user_id,
store_id,
order_id,
geval_scores,
geval_scores_speed,
geval_scores_service,
geval_isanony,
create_user,
create_time,
update_user,
update_time,
is_valid,
substr(create_time, 1, 10) as dt
from yp_ods.t_goods_evaluation
where dt='xxxx-xx-xx';

全量覆盖

  • 建表操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    DROP TABLE if EXISTS yp_dwd.dim_district;
    CREATE TABLE yp_dwd.dim_district(
    id string COMMENT '主键ID',
    code string COMMENT '区域编码',
    name string COMMENT '区域名称',
    pid string COMMENT '父级ID',
    alias string COMMENT '别名')
    COMMENT '区域字典表'
    row format delimited fields terminated by '\t'
    stored as orc
    tblproperties ('orc.compress' = 'SNAPPY');
  • 全量覆盖操作

    1
    2
    3
    INSERT overwrite TABLE yp_dwd.dim_district
    select * from yp_ods.t_district
    WHERE code IS NOT NULL AND name IS NOT NULL;