Exists优化

  • exists性能比in高,能用exists就不要用in
  • EXISTS里面的子查询,可以使用外部查询的表字段,而且必须和外部表存在关联关系,否则会报错
  • exists中的子查询,select * / id / 1 都一样,重点是where条件
  • EXISTS有了关联以后,外部的每一行数据,在子查询中查出来的结果都是不一样的

CodeDmeo

使用in(效率低)

1
2
3
4
SELECT * from sqooptohive.emp2
WHERE area in (SELECT concat(province, city)
from sqooptohive.emp_add_hive
WHERE name like '张%');

使用exists

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
SELECT * from sqooptohive.emp2
WHERE EXISTS(
SELECT * from sqooptohive.emp_add_hive
where emp2.area=concat(province, city) AND name like '张%'
);

SELECT * from sqooptohive.emp2
WHERE EXISTS(
SELECT id from sqooptohive.emp_add_hive
where emp2.id=emp_add_hive.id AND city='sec-bad'
);

SELECT * from sqooptohive.emp2
WHERE EXISTS(
SELECT 1 from sqooptohive.emp_add_hive
where emp2.id=emp_add_hive.id AND city='sec-bad'
);

Sqoop导入的两种方式

1、直接导入到Hive表中; a.性能更高;b.bug更少(NULL值异常);c.Hive表可以使用OrcFile
2、先导入到HDFS,再Load到Hive表;

CodeDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
一般情况,没有HDFS中数仓目录的权限
第一步:sqoop导入数据到test目录下
sqoop import \
--connect jdbc:mysql://192.168.52.150:3306/test \
--username root \
--password 123456 \
--e "select * from emp_add where 1=1 and \$CONDITIONS" \
--target-dir /test/emp_add_hive \
--delete-target-dir \
--fields-terminated-by '\t' \
-m 1

第二步,建Hive表:
drop table sqooptohive.emp_add_hive;
CREATE EXTERNAL TABLE sqooptohive.emp_add_hive
(
id INT,
hno STRING,
street STRING,
city STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';


第三步,加载hdfs文件到Hive表中:
-- inpath指的是 加载hdfs文件;
-- local inpath,指的是linux本地文件路径;
load data inpath '/test/emp_add_hive/part-m-00000'
overwrite into table sqooptohive.emp_add_hive;

注意:overwrite可选,代表的含义是,先删除后插入。

分桶

  • 作用:
    • 1、支持事务表;
    • 2、JOIN效率性能变高(Bucket-Mapjoin、SMB-Mapjoin);
    • 3、数据采样;
    • select * from table tablesample(bucket x out of y on column) as t;这句话的作用是按照字段column将数据分为y个桶, 并抽样第x个桶
1
2
3
4
5
6
7
8
9
10
11
12
底层:将数据按照 hash取余 划分为不同的文件,第一个桶的余数是0;

LOAD DATA不支持分桶表,可以通过中间表(不分桶),LOAD DATA到中间表,再用insert将中间表数据插入到分桶表;
Sqoop不支持导入到分桶表,ODS层一般不用分桶表。


数据采样用法:
-- 注意:表的别名,一定要写在采样的后面,否则报错
select * from table tablesample(bucket x out of y on column) as t;

可以通过执行计划来验证查询性能是否有提升
explain select ...

CodeDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
--启用强制分桶
set hive.enforce.bucketing=true;

DROP table sqooptohive.emp3;
CREATE TABLE emp3
(
id int,
name string,
deg STRING,
salary int,
dept string
)
CLUSTERED BY (id) sorted by (id DESC) INTO 5 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS ORC
TBLPROPERTIES ('orc.compress'='SNAPPY');

INSERT into sqooptohive.emp3
SELECT * from sqooptohive.emp;

explain SELECT count(1), dept from sqooptohive.emp3 GROUP BY dept;

explain SELECT count(1), dept from sqooptohive.emp3 TABLESAMPLE(BUCKET 1 OUT OF 5 on id)
GROUP BY dept;

MapJoin,SMB,BucketMapJoin

  • MapJoin: 大表和小表Join
  • bucket-map join: 大表和中表Join
  • SMB-join: 大表和大表Join

MapJoin

1
select * from 学生表 join 班级表 where 学生表.class_id=班级表.id;-- 大表和小表Join
  • 开启条件:
    • 1、开启开关; set hive.auto.convert.join=true;
    • 2、小表数据量 <= 阈值(默认20MB)

BucketMapJoin

大表和中表Join

  • 开启条件:
    • 1、打开开关
      • set hive.auto.convert.join=true;
      • set hive.strict.checks.bucketing=true;
      • set hive.optimize.bucketmapjoin = true;
    • 2、分桶字段=join字段
    • 3、分桶数量要能被整除
    • 4、满足mapjoin的条件

SMB

开启条件:
1、打开开关

1
2
3
4
5
6
7
set hive.auto.convert.join=true;
set hive.optimize.bucketmapjoin = true;
set hive.strict.checks.bucketing=true;
set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;
set hive.optimize.bucketmapjoin.sortedmerge=true;

2、分桶字段=join字段=sort字段
3、分桶数量要相等
4、满足bucket-mapjoin的条件

Hive分区

  • 分区建表关键字:PARTITIONED BY
  • 插入分区关键字:PARTITION
  • 插入数据时,才会有动态分区和静态分区之分
  • 动态分区插入数据时,写入的分区数量无限,查出来多少个分区,就生成几个分区
  • 严格模式下,分区中至少要有一个静态分区;非严格模式下,分区可以都是动态分区
1
2
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;

上述代码为关闭严格模式代码

静态分区

向静态分区表中插入数据

  • 注意select后面的字段必须和要插入的表的字段保持一致
1
2
INSERT INTO TABLE emp_partition partition (year='2020')
SELECT * FROM emp WHERE dept='TP';

动态分区

向动态分区中插入数据

  • 注意select后面的字段必须和要插入的表的字段保持一致的基础上, 还需要添加一个分区字段(select表中必须有的字段)
1
2
3
INSERT INTO emp_dyn_partition PARTITION (year)
SELECT id, name, deg, salary, dept, year
from emp_partition;

注意事项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
建表时,字段不能使用hive关键字
插入数据时,是按照顺序插入的,不是按照字段名插入的,插入时别名加不加都可以
加别名,主要是为了字段比较多时,方便快速定位,增加代码易读性
-- 动静混合,可以再严格模式下使用
set hive.exec.dynamic.partition.mode=strict;

-- 动静混用时,动态分区必须在静态分区之后,也就是动态分区必须作为静态分区的子节点
INSERT into emp_mul_dyn_partition PARTITION (year_info='2010', month_info='05', day_info)
SELECT id, name, deg, salary, dept,
-- year, 静态分区字段不能出现在select中
-- '03' month_info,
'21' day_info
from emp_partition;

SELECT * FROM emp_mul_dyn_partition;

-- 完整的分区配置项
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=10000;
set hive.exec.max.dynamic.partitions=100000;
set hive.exec.max.created.files=10000000;

详细描述分区理论(文言文)

1
2
3
作用:查询快
劣势:插入慢
分区不是越多越好,够用就好,一般按天分区;
  • 分区和分桶的区别:

    • 1、分区是按照目录划分的;分桶是按照文件划分的;
    • 2、分区按照字段值划分,容易出现数据倾斜;分桶是hash后取余划分,相对比较平均,可以用来采样;
    • 3、分区和分桶是可以同时使用的。
  • 静态分区和动态分区区别:

    • 1、静态分区和动态分区,建表时,是没有区别的;
    • 2、在插入数据时,才会出现静态分区和动态分区,不同的SQL中,year可能是静态分区,也可能是动态分区;
    • 3、静态分区插入数据时,要指定分区的字段名 和 字段值;一个SQL只能指定一个分区;
    • 4、动态分区插入数据时,只需要指定分区的 字段名 即可,一个SQL自动可以插入到多个分区下;
    • 5、静态分区的分区取值,来自于partition中指定的值;分区字段不能在select中出现,否则报错:字段数量不一致;
    • 6、动态分区的分区取值,来自于select中读取到的字段,自动分配生成分区;分区字段必须在select中出现,否则报错:字段数量不一致;

Hive索引

行组索引(row group index)

  • 作用:提升 数值类型 字段为的条件 > < = 查询性能
  • 开启条件:
    • 0、必须使用ORC文件格式
    • 1、开启hive索引;
    • 2、建表时,开启行组索引;
    • 3、插入数据时,按照索引字段排序;

CodeDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
-- 行组索引
CREATE table emp_row_index
(
id int,
name string,
deg string,
salary int,
dept string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
-- 0、必须使用ORC文件格式
STORED AS ORC
sorted by id
-- 1、建表时指定开启索引
-- 2、行组索引,默认开启
TBLPROPERTIES ('orc.create.index'='true');

-- 2、开启Hive索引
set hive.optimize.index.filter=true;

-- 3、插入数据时,按照索引字段排序
-- 按照哪个字段排序,哪个字段就是行组索引
INSERT into sqooptohive.emp_row_index
SELECT * FROM emp;
-- CLUSTER BY id;

explain SELECT * from sqooptohive.emp_row_index
WHERE id=1201;

explain SELECT * from sqooptohive.emp
WHERE id=1201;

布隆过滤索引(bloom filter index)

  • 作用:针对=值操作条件,对于> <等操作是不生效的;
  • 对字段的数据类型没有要求,不止是数值类型,字符类型、日期类型等都可以使用布隆过滤索引
  • 开启条件:
    • 0、必须使用ORC文件格式;
    • 1、开启hive索引;
    • 2、建表时,开启行组索引;(默认开启)
    • 3、建表时,指定布隆索引的字段名
    • 4、布隆索引可以指定多个,中间用逗号,分隔

CodeDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
- 布隆过滤索引
drop table emp_bloom_index;
CREATE table emp_bloom_index
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
-- 0、必须使用ORC文件格式
STORED AS ORC
-- 1、建表时指定开启索引
-- 2、指定布隆过滤索引的列
TBLPROPERTIES ('orc.create.index'='true', 'orc.bloom.filter.columns'='name')
AS
SELECT * FROM emp;


SELECT * from sqooptohive.emp_bloom_index
-- in就相当于 多个=条件
WHERE name in ('gopal', 'manisha');



-- 多个布隆过滤索引
drop table emp_bloom_index;
CREATE table emp_bloom_index
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
-- 0、必须使用ORC文件格式
STORED AS ORC
-- 1、建表时指定开启索引
-- 2、指定布隆过滤索引的列
-- 创建索引比行组索引方便,直接指定即可,不需要再去排序
-- 布隆索引可以创建多个,而行组索引受到排序的限制,只能有一个
TBLPROPERTIES ('orc.create.index'='true',
'orc.bloom.filter.columns'='name,dept')
AS
SELECT * FROM emp;


SELECT * from sqooptohive.emp_bloom_index
-- in就相当于 多个=条件
WHERE name in ('gopal', 'manisha') and dept='ORC';

数据倾斜

  • Join中的数据倾斜
  • GroupBy中的数据倾斜

表join数据倾斜

三个MapJoin

JoinSkew运行时优化,编译时优化

GroupBy数据倾斜

Aggr

  • 在map端预聚合(spark的reduceByKey),减少传输给reduce的数据量
  • 默认开启:hive.map.aggr

Skewindata

  • 默认关闭:hive.groupby.skewindata
  • 机制:在正常的MR前面,加了一个MR:随机数打散-负载均衡

压缩

  • 压缩分为Map端压缩, Reduce端压缩, Hive执行过程的压缩

  • Map压缩

    • 打开开关
    • 设置压缩算法
  • Reduce压缩

    • 打开开关
    • 设置压缩算法
    • 设置压缩类型(Reduce数据压缩为None-不压缩, Record-压缩为一行数据, Block-压缩成块)
  • Hive压缩

    • 中间数据压缩
    • 最终输出压缩
1
2
3
4
5
6
7
8
9
10
11
12
Map压缩
1、开关:mapreduce.map.output.compress
2、压缩算法:mapreduce.map.output.compress.codec =org.apache.hadoop.io.compress.SnappyCodec

Reduce压缩
1、开关:mapreduce.output.fileoutputformat.compress
2、压缩算法:mapreduce.output.fileoutputformat.compress.codec =org.apache.hadoop.io.compress.SnappyCodec
3、类型:org.apache.hadoop.io.compress.SnappyCodec.type =NONE、RECORD、BLOCK;推荐用BLOCK。

Hive压缩
1、中间数据压缩 开关:hive.exec.compress.intermediate
2、最终输出压缩 开关:hive.exec.compress.output

Hive并行执行,小文件合并

  • 并行执行
    • 没有依赖关系的MR任务可以并行执行,提高性能
  • 小文件合并
    • 打开开关
    • 设置小文件的平均文件大小, 比如两个文件一个1MB, 一个150MB平均75MB, 则不会进行小文件合并
    • 设置合并文件后的(压缩)文件大小
1
2
3
4
5
6
7
8
9
10
11
-- 没有依赖关系的MR任务可以并行执行,提高性能。
set hive.exec.parallel=true,可以开启并发执行,默认为false
set hive.exec.parallel.thread.number=16; //同一个sql允许的最大并行度,默认为8


-- 小文件合并
开关:hive.merge.mapfiles、hive.merge.mapredfiles
阈值:hive.merge.smallfiles.avgsize
默认16M,输出文件平均大小 小于此值,则触发合并
合并后大小:hive.merge.size.per.task
默认256M

其他优化

1
2
3
4
5
6
7
8
9
-- 矢量化查询:批量处理,前提ORC
set hive.vectorized.execution.enabled=true;
set hive.vectorized.execution.reduce.enabled = true;

-- 读取零拷贝:尽量内存计算,减少shuffle,前提ORC
set hive.exec.orc.zerocopy=true;

-- 关联优化器:内存型计算,减少shuffle
set hive.optimize.correlation=true;

常用的窗口函数

1
2
3
row_number()	: 成绩排名		12345678
rank(): 成绩排名 11345578
dense_rank() : 排名 1123445678

去重

三种去重方式

group by

假设根据id进行分组就相当于对id进行去重了, 后面的row_number窗口函数也是这个原理

  • 好处
    • 如果不加group by,相当于整张表是一个分组;
    • groupby的去重范围:group by的分组字段;
    • groupby的SQL中,不能出现dinstinct,除非被包含在聚合函数中;
    • groupby的去重,会影响到 分组&聚合函数 的结果;

CodeDemo

1
2
3
4
5
6
select 
store_name,
count(distinct id)
from yp_dwb.dwb_shop_detail
group by store_name
order by store_name;

distinct

  • 针对id进行去重
1
select distinct(id), name from table;
  • 针对id和name去重
1
select distinct id, name from table;

注意

对于分组后(有groupby/有聚合函数)的SQL,不能直接distinct,但是可以在聚合函数中distinct;
distinct小括号,加不加无所谓,如果后面加了其他字段,当前去重就失效了;去重范围:后面的所有字段。

1
2
select distinct(store_name), store_type
from yp_dwb.dwb_shop_detail order by store_name;

row_number()

根据什么分组就根据什么去重, over里面写partition by需要去重的字段

  • 好处
    • row_number()的去重不会受到其他字段的影响;
    • row_number()去重,可以根据需求灵活控制去重行数据的顺序;
    • row_number()不影响分组聚合的规则;

CodeDemo

1
2
3
4
5
6
7
8
9
10
11
12
with tmp as
(
select row_number() over(partition by store_name order by dt desc) num,
row_number() over(partition by store_type order by dt desc) type_num,
*
from yp_dwb.dwb_shop_detail
)
-- 重名店铺,只保留最后新加的那一个,所有字段
select *, count(XX), sum(XX) from tmp
where num=1
order by store_name
;