本文主要解决搭建数仓的过程中遇到的一些自关联查询,多表联查,join错位问题。

DWB(DataWarehouse Base)

  • 名称:基础数据层、中间数据层
  • 功能:退化维度(Degenerate Dimension-DD)(降维)形成大宽表

功能

  • 通过退化维度操作之后,带来的显著效果是

    • 整个数仓中表的个数减少了;
    • 业务相关联的数据(跟你分析相关的)数据字段聚在一起了,形成一张宽表
    • 分析查询时的效率显著提高了:多表查询和单表查询的差异。
  • 带来的坏处是

    • 数据大量冗余、宽表的概念已经不符合3范式设计要求了。
    • 但是数仓建模的核心追求是,只要有利于分析,能够加快数据分析,都可以做。
  • 订单明细宽表 dwb_order_detail

  • 店铺明细宽表 dwb_shop_detail

  • 商品明细宽表 dwb_goods_detail

订单明细宽表fact_shop_order

注意

  • 1,如果表是一张拉链表,加上过滤条件 end_date=’9999-99-99’,把当前有效的数据查询出来
  • 2,对于fact_shop_order的end_date=’9999-99-99’过滤,应该放在where(on)条件中完成,先过滤,后join
  • 3,以yp_dwd.fact_shop_order为左表,其他表进行left join

伪代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
select
xxxxx
xxx
xxxxx
FROM yp_dwd.fact_shop_order o -- 订单明细表(订单主表)
--订单副表
LEFT JOIN yp_dwd.fact_shop_order_address_detail od on o.id=od.id and od.end_date='9999-99-99'
--订单组
LEFT JOIN yp_dwd.fact_shop_order_group og on og.order_id = o.id and og.end_date='9999-99-99'
--and og.is_pay=1 是否支付的过滤 0未支付 1 已支付
--订单组支付信息
LEFT JOIN yp_dwd.fact_order_pay op ON op.group_id = og.group_id and op.end_date='9999-99-99'
--退款信息
LEFT JOIN yp_dwd.fact_refund_order refund on refund.order_id=o.id and refund.end_date='9999-99-99'
--and refund.refund_state=5 退款状态 5表示退款已经完成
--结算信息
LEFT JOIN yp_dwd.fact_order_settle os on os.order_id = o.id and os.end_date='9999-99-99'
--商品快照
LEFT JOIN yp_dwd.fact_shop_order_goods_details ogoods on ogoods.order_id = o.id and ogoods.end_date='9999-99-99'
--订单评价表
LEFT JOIN yp_dwd.fact_goods_evaluation e on e.order_id=o.id and e.is_valid=1
--订单配送表
LEFT JOIN yp_dwd.fact_order_delievery_item d on d.shop_order_id=o.id and d.dispatcher_order_type=1 and d.is_valid=1
where o.end_date='9999-99-99';

Hive执行动态分区配置参数

1
2
3
4
5
6
7
8
9
10
11
--分区
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=10000;
set hive.exec.max.dynamic.partitions=100000;
set hive.exec.max.created.files=150000;
--hive压缩
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
--写入时压缩生效
set hive.exec.orc.compression.strategy=COMPRESSION;

店铺明细宽表dwb_shop_detail

由于用到了省份,城市,县区字段。而这些字段都是在dim_district表中,所以需要用到自连接查询。

1
2
3
province_name string COMMENT '省份名称', 
city_name string COMMENT '城市名称',
area_name string COMMENT '县名称'

举个例子:

1
2
3
4
5
6
7
8
9
select * from yp_dwd.dim_district;
select distinct
t3.name,
t2.name,
t1.name -- t1必须做区表,t2做市表,t3做省表
from yp_dwd.dim_district t1
left join yp_dwd.dim_district t2 on t1.pid=t2.id
left join yp_dwd.dim_district t3 on t2.pid=t3.id
order by t3.name;

这里如果写left join必须以区表(最小单位的表为基本表-自己总结)

要不就会得到错误的结果。

伪代码实现

1
2
3
4
5
6
7
8
9
10
--店铺先和地址连接 得到店铺的adcode
select * from
yp_dwd.dim_store s
LEFT JOIN yp_dwd.dim_location lc on lc.correlation_id = s.id and lc.type=2
--在根据adcode去区域字典表中进行查询,先查出县
LEFT JOIN yp_dwd.dim_district d1 ON d1.code = lc.adcode
--根据县的pid再去区域字典表中查询出市
LEFT JOIN yp_dwd.dim_district d2 ON d2.id = d1.pid
--根据市的pid再去区域字典表中查询出省
LEFT JOIN yp_dwd.dim_district d3 ON d3.id = d2.pid

Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
INSERT into yp_dwb.dwb_shop_detail
SELECT
s.id,
s.address_info,
s.name as store_name,
s.is_pay_bond,
s.trade_area_id,
s.delivery_method,
s.store_type,
s.is_primary,
s.parent_store_id,
ta.name as trade_area_name,
d3.code as province_id,
d2.code as city_id,
d1.code as area_id,
d3.name as province_name,
d2.name as city_name,
d1.name as area_name
--店铺
FROM yp_dwd.dim_store s
--商圈
LEFT JOIN yp_dwd.dim_trade_area ta ON ta.id = s.trade_area_id and ta.end_date='9999-99-99'
--地区 注意type=2才表示地址是店铺地址
LEFT JOIN yp_dwd.dim_location lc on lc.correlation_id = s.id and lc.type=2 and lc.end_date='9999-99-99'
LEFT JOIN yp_dwd.dim_district d1 ON d1.code = lc.adcode
LEFT JOIN yp_dwd.dim_district d2 ON d2.id = d1.pid
LEFT JOIN yp_dwd.dim_district d3 ON d3.id = d2.pid
WHERE s.end_date='9999-99-99'
;

商品明细宽表dwb_goods_detail

如果只是单纯的进行自连接,像省市区那种就会造成join错乱(因为没有进行过滤-将错误的值显示为null),这是由于后端程序员设计数据库不规范导致的。

  • 有三张表class1 是小类(level=3三级标题-最小分类),class2 是中类(level=2二级标题), class3 是大类(level=1一级标题),.

  • class1的level必须是3(三级标题-最小分类) ,如果不是3而是level=1或level=2就混乱了(这是由于class1充当小类这张表的属性决定的。

  • class2的level必须是2(同上

再说一个相同意思但表达方式不同的说法:

  • level1可以是class1,class2,class3
  • level2可以是class1,class2
  • level3必须是class1(小类)
1
2
3
4
5
6
# 1、业务系统在设计商品分类表dim_goods_class时,准备采用的是3级存储。通过level值来表示。
1 大类
2 中类
3 小类

# 2、但是dim_goods_class数据集中实际等级效果只有两类,level=3的只有1个分类。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
select
CASE class1.level WHEN 3 -- 这里只有为3才有意义,因为要向上找(两级)level2,level1
THEN class1.name
ELSE NULL
END as min_class_name,
CASE WHEN class1.level=2 -- 这里只有level=2(可以是小类class1的,也可以是中类class2的level=2)才有意义,因为要向上找(一级)level1
THEN class1.name
WHEN class2.level=2
THEN class2.name
ELSE NULL
END as mid_class_name,
CASE WHEN class1.level=1 -- 这里是1才有意义,因为不找了(所以class1,class2,class3的level=1都行)
THEN class1.name
WHEN class2.level=1
THEN class2.name
WHEN class3.level=1
THEN class3.name
ELSE NULL
END as max_class_name
from yp_dwd.dim_goods_class class1 -- class1 是小类,class2 是中类, class3 是大类
left join yp_dwd.dim_goods_class class2 on class1.parent_id=class2.id
left join yp_dwd.dim_goods_class class3 on class2.parent_id=class3.id

Presto食用说明

什么是Presto,个人理解就是一个提速的工具(虽然不一定能提速吧!)

还有个巨大的BUG,用Presto插入数据hive中竟然查询不到。(日了狗了,Facebook真非死不可)

Presto查询引擎是一个M-S的架构,由一个coordinator节点,一个Discovery Server节点,多个Worker节点组成,注意Discovery Server通常内嵌在Coordinator节点中。

  • 主角色:Coordinator负责SQL的解析,生成执行计划,分发给Worker节点进行执行;

  • 从角色:Worker节点负责实时查询执行任务。Worker节点启动后向discovery Server服务注册,Coordinator 从discovery server获取可以工作的Worker节点。

  • 如果配置了hive connector,需要配置hive MetaSote服务为Presto提供元信息,worker节点和HDFS进行交互数据。

##优点
1)Presto与Hive对比,都能够处理PB级别的海量数据分析,但Presto是基于内存运算,减少没必要的硬盘IO,所以更快。
2)能够连接多个数据源,跨数据源连表查,如从Hive查询大量网站访问记录,然后从Mysql中匹配出设备信息。
3)部署也比Hive简单,因为Hive是基于HDFS的,需要先部署HDFS。

##缺点
1)虽然能够处理PB级别的海量数据分析,但不是代表Presto把PB级别都放在内存中计算的。而是根据场景,如count,avg等聚合运算,是边读数据边计算,再清内存,再读数据再计算,这种耗的内存并不高。但是连表查,就可能产生大量的临时数据,因此速度会变慢,反而Hive此时会更擅长。
2)为了达到实时查询,可能会想到用它直连MySql来操作查询,这效率并不会提升,瓶颈依然在MySql,此时还引入网络瓶颈,所以会比原本直接操作数据库要慢。

Presto页面:http://192.168.88.80:8090/ui/

安装JDK

可以手动安装oracle JDK

也可以使用yum在线安装 openjDK, 注意: 两台机器都要安装JDK

yum install java-1.8.0-openjdk* -y

安装完成后,查看jdk版本:

java -version

上传Presto安装包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#创建安装目录
mkdir -p /export/server

#yum安装上传文件插件lrzsz
yum install -y lrzsz

#上传安装包到hadoop01的/export/server目录
presto-server-0.245.1.tar.gz

#解压、重命名
tar -xzvf presto-server-0.245.1.tar.gz
mv presto-server-0.245.1 presto

#创建配置文件存储目录
mkdir -p /export/server/presto/etc

添加配置文件

  • etc/config.properties

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    cd /export/server/presto

    vim etc/config.properties

    #---------添加如下内容---------------
    coordinator=true
    node-scheduler.include-coordinator=true
    http-server.http.port=8090
    query.max-memory=6GB
    query.max-memory-per-node=2GB
    query.max-total-memory-per-node=2GB
    discovery-server.enabled=true
    discovery.uri=http://192.168.88.80:8090
    #---------end-------------------

    #参数说明
    coordinator:是否为coordinator节点,注意worker节点需要写false
    node-scheduler.include-coordinator:coordinator在调度时是否也作为worker
    discovery-server.enabled:Discovery服务开启功能。presto通过该服务来找到集群中所有的节点。每一个Presto实例都会在启动的时候将自己注册到discovery服务; 注意:worker节点不需要配
    discovery.uri:Discovery server的URI。由于启用了Presto coordinator内嵌的Discovery服务,因此这个uri就是Presto coordinator的uri。
  • etc/jvm.config

1
2
3
4
5
6
7
8
9
10
vim etc/jvm.config

-server
-Xmx3G
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+UseGCOverheadLimit
-XX:+ExplicitGCInvokesConcurrent
-XX:+HeapDumpOnOutOfMemoryError
-XX:+ExitOnOutOfMemoryError
  • etc/node.properties
1
2
3
4
5
6
mkdir -p /export/data/presto
vim etc/node.properties

node.environment=cdhpresto
node.id=presto-cdh01
node.data-dir=/export/data/presto
  • etc/catalog/hive.properties
1
2
3
4
5
6
mkdir -p etc/catalog
vim etc/catalog/hive.properties

connector.name=hive-hadoop2
hive.metastore.uri=thrift://192.168.88.80:9083
hive.max-partitions-per-writers=300

scp安装包到其他机器

1
2
3
4
5
6
7
8
#在hadoop02创建文件夹
mkdir -p /export/server

#在hadoop01远程cp安装包
cd /export/server
scp -r presto hadoop02:$PWD

#ssh的时候如果没有配置免密登录 需要输入密码scp 密码:123456

hadoop02配置修改

  • etc/config.properties

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    cd /export/server/presto
    vim etc/config.properties

    #----删除之前文件中的全部内容 替换为以下的内容 vim编辑器删除命令 8dd
    coordinator=false
    http-server.http.port=8090
    query.max-memory=6GB
    query.max-memory-per-node=2GB
    query.max-total-memory-per-node=2GB
    discovery.uri=http://192.168.88.80:8090
  • etc/jvm.config

    和hadoop01一样,不变,唯一注意的就是如果机器内存小,需要调整-Xmx参数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    vim etc/jvm.config

    -server
    -Xmx3G
    -XX:+UseG1GC
    -XX:G1HeapRegionSize=32M
    -XX:+UseGCOverheadLimit
    -XX:+ExplicitGCInvokesConcurrent
    -XX:+HeapDumpOnOutOfMemoryError
    -XX:+ExitOnOutOfMemoryError
  • etc/node.properties

    修改编号node.id

    1
    2
    3
    4
    5
    6
    mkdir -p /export/data/presto
    vim etc/node.properties

    node.environment=cdhpresto
    node.id=presto-cdh02
    node.data-dir=/export/data/presto
  • etc/catalog/hive.properties

    保持不变

    1
    2
    3
    4
    5
    vim etc/catalog/hive.properties

    connector.name=hive-hadoop2
    hive.metastore.uri=thrift://192.168.88.80:9083
    hive.max-partitions-per-writers=300

集群启停

注意,每台机器都需要启动

  • 前台启动

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    [root@hadoop01 ~]# cd ~
    [root@hadoop01 ~]# /export/server/presto/bin/launcher run


    [root@hadoop02 ~]# cd ~
    [root@hadoop02 ~]# /export/server/presto/bin/launcher run


    #如果出现下面的提示 表示启动成功
    2021-09-15T18:24:21.780+0800 INFO main com.facebook.presto.server.PrestoServer ======== SERVER STARTED ========

    #前台启动使用ctrl+c进行服务关闭
  • 后台启动

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    [root@hadoop01 ~]# cd ~
    [root@hadoop01 ~]# /export/server/presto/bin/launcher start
    Started as 89560

    [root@hadoop02 ~]# cd ~
    [root@hadoop02 ~]# /export/server/presto/bin/launcher start
    Started as 92288


    #后台启动使用jps 配合kill -9命令 关闭进程
  • web UI页面

    http://192.168.88.80:8090/ui/

Presto–Datagrip连接

配置URL地址

1
jdbc:presto://192.168.88.80:8090/hive

然后配置驱动就行了。

Presto–常规优化

  • 数据存储优化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    --1)合理设置分区
    与Hive类似,Presto会根据元信息读取分区数据,合理的分区能减少Presto数据读取量,提升查询性能。

    --2)使用列式存储
    Presto对ORC文件读取做了特定优化,因此在Hive中创建Presto使用的表时,建议采用ORC格式存储。相对于Parquet,Presto对ORC支持更好。
    Parquet和ORC一样都支持列式存储,但是Presto对ORC支持更好,而Impala对Parquet支持更好。在数仓设计时,要根据后续可能的查询引擎合理设置数据存储格式。

    --3)使用压缩
    数据压缩可以减少节点间数据传输对IO带宽压力,对于即席查询需要快速解压,建议采用Snappy压缩。

    --4)预先排序
    对于已经排序的数据,在查询的数据过滤阶段,ORC格式支持跳过读取不必要的数据。比如对于经常需要过滤的字段可以预先排序。

    INSERT INTO table nation_orc partition(p) SELECT * FROM nation SORT BY n_name;
    如果需要过滤n_name字段,则性能将提升。
    SELECT count(*) FROM nation_orc WHERE n_name=’AUSTRALIA’;

  • SQL优化

    • 列裁剪
    • 分区裁剪
    • group by优化
      • 按照数据量大小降序排列
    • order by使用limit
    • 用regexp_like代替多个like语句
    • join时候大表放置在左边
  • 替换非ORC格式的Hive表


Presto–内存调优

  • 内存管理机制–内存分类

    Presto管理的内存分为两大类:user memory和system memory

    • user memory用户内存

      1
      跟用户数据相关的,比如读取用户输入数据会占据相应的内存,这种内存的占用量跟用户底层数据量大小是强相关的
    • system memory系统内存

      1
      执行过程中衍生出的副产品,比如tablescan表扫描,write buffers写入缓冲区,跟查询输入的数据本身不强相关的内存。
  • 内存管理机制–内存池

    内存池中来实现分配user memory和system memory

    内存池为常规内存池GENERAL_POOL、预留内存池RESERVED_POOL。

    1
    2
    3
    4
    5
    6
    7
    8
    1、GENERAL_POOL:在一般情况下,一个查询执行所需要的user/system内存都是从general pool中分配的,reserved pool在一般情况下是空闲不用的。

    2、RESERVED_POOL:大部分时间里是不参与计算的,但是当集群中某个Worker节点的general pool消耗殆尽之后,coordinator会选择集群中内存占用最多的查询,把这个查询分配到reserved pool,这样这个大查询自己可以继续执行,而腾出来的内存也使得其它的查询可以继续执行,从而避免整个系统阻塞。

    注意:
    reserved pool到底多大呢?这个是没有直接的配置可以设置的,他的大小上限就是集群允许的最大的查询的大小(query.total-max-memory-per-node)。
    reserved pool也有缺点,一个是在普通模式下这块内存会被浪费掉了,二是大查询可以用Hive来替代。因此也可以禁用掉reserved pool(experimental.reserved-pool-enabled设置为false),那系统内存耗尽的时候没有reserved pool怎么办呢?它有一个OOM Killer的机制,对于超出内存限制的大查询SQL将会被系统Kill掉,从而避免影响整个presto。

  • 内存相关参数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    1、user memory用户内存参数
    query.max-memory-per-node:单个query操作在单个worker上user memory能用的最大值
    query.max-memory:单个query在整个集群中允许占用的最大user memory

    2、user+system总内存参数
    query.max-total-memory-per-node:单个query操作可在单个worker上使用的最大(user + system)内存
    query.max-total-memory:单个query在整个集群中允许占用的最大(user + system) memory

    当这些阈值被突破的时候,query会以insufficient memory(内存不足)的错误被终结。

    3、协助阻止机制
    在高内存压力下保持系统稳定。当general pool常规内存池已满时,操作会被置为blocked阻塞状态,直到通用池中的内存可用为止。此机制可防止激进的查询填满JVM堆并引起可靠性问题。

    4、其他参数
    memory.heap-headroom-per-node:这个内存是JVM堆中预留给第三方库的内存分配,presto无法跟踪统计,默认值是-Xmx * 0.3

    5、结论
    GeneralPool = 服务器总内存 - ReservedPool - memory.heap-headroom-per-node - Linux系统内存

    常规内存池内存大小=服务器物理总内存-服务器linux操作系统内存-预留内存池大小-预留给第三方库内存
  • 内存优化建议

    • 常见的报错解决

      1
      2
      3
      4
      5
      6
      7
      8
      1、Query exceeded per-node total memory limit of xx
      适当增加query.max-total-memory-per-node。

      2、Query exceeded distributed user memory limit of xx
      适当增加query.max-memory。

      3、Could not communicate with the remote task. The node may have crashed or be under too much load
      内存不够,导致节点crash,可以查看/var/log/message。
    • 建议参数设置

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      1、query.max-memory-per-node和query.max-total-memory-per-node是query操作使用的主要内存配置,因此这两个配置可以适当加大。
      memory.heap-headroom-per-node是三方库的内存,默认值是JVM-Xmx * 0.3,可以手动改小一些。

      1) 各节点JVM内存推荐大小: 当前节点剩余内存*80%

      2) 对于heap-headroom-pre-node第三方库的内存配置: 建议jvm内存的%15左右

      3) 在配置的时候, 不要正正好好, 建议预留一点点, 以免出现问题

      数据量在35TB , presto节点数量大约在30台左右 (128GB内存 + 8核CPU)

      注意:
      1、query.max-memory-per-node小于query.max-total-memory-per-node。
      2、query.max-memory小于query.max-total-memory。
      3、query.max-total-memory-per-node 与memory.heap-headroom-per-node 之和必须小于 jvm max memory,也就是jvm.config 中配置的-Xmx。

Hive Map join优化

  • Map Side Join

    1
    2
    3
    4
    set hive.auto.convert.join=true;

    #如果参与连接的N个表(或分区)中的N-1个的总大小小于512MB,则直接将join转为Map端join,默认值为20MB
    set hive.auto.convert.join.noconditionaltask.size=512000000;
  • Bucket-Map Join

    1
    2
    3
    4
    5
    1)set hive.optimize.bucketmapjoin = true;

    2)一个表的bucket数是另一个表bucket数的整数倍

    3)bucket分桶字段 == join的字段
  • Sort Merge Bucket Join(SMB Join)

    SMB是针对Bucket Map Join的一种优化。条件类似却有些不一样。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    1)
    set hive.optimize.bucketmapjoin = true;
    set hive.auto.convert.sortmerge.join=true;
    set hive.optimize.bucketmapjoin.sortedmerge = true;
    set hive.auto.convert.sortmerge.join.noconditionaltask=true;

    2)
    Bucket 列 == Join 列 == sort 列

    #hive并不检查两个join的表是否已经做好bucket且sorted,需要用户自己去保证join的表数据sorted, 否则可能数据不正确。

    3)
    bucket数相等


    #注意:
    a、可以设置参数hive.enforce.sorting 为true,开启强制排序。插数据到表中会进行强制排序。
    b、表创建时必须是CLUSTERED BY+SORTED BY