数据湖简介

数仓和数据湖

数据仓库

数据仓库(英语:Data Warehouse,简称数仓、DW),是一个用于存储、分析、报告的数据系统。

  • 数据仓库的目的是构建面向分析的集成化数据环境,分析结果为企业提供决策支持(DecisionSupport)。
  • 数据仓库的特点是本身不产生数据,也不最终消费数据。
  • 每个企业根据自己的业务需求可以分成不同的层次。但是最基础的分层思想,理论上分为三个层:操作型数据层(ODS)、数据仓库层(DW)和数据应用层(DA)。

数据湖

数据湖是一个集中式数据存储库,用来存储大量的原始数据,使用平面架构来存储数据。

  • 数据湖一个以原始格式(通常是对象块或文件)存储数据的系统或存储库,通常是所有企业数据的单一存储。
  • 数据湖可以包括来自关系数据库的结构化数据(行和列)、半结构化数据(CSV、日志、XML、JSON)、非结构化数据(电子邮件、文档、pdf)和二进制数据(图像、音频、视频)。
  • 数据湖中数据,用于报告、可视化、高级分析和机器学习等任务。

数据仓库VS数据湖

湖仓一体

  • 湖仓一体(LakeHouse):是新出现的一种数据架构,它同时吸收了数据仓库和数据湖的优势,数据分析师和数据科学家可以在同一个数据存储中对数据进行操作,同时它也能为公司进行数据治理带来更多的便利性。
  • LakeHouse使用新的系统设计:直接在用于数据湖的低成本存储上实现与数据仓库中类似的数据结构和数据管理功能。

数据湖框架

目前市面上流行的三大开源数据湖方案分别为:Delta Lake、Apache Iceberg和Apache Hudi。

Hudi基本介绍

Hudi 概念

Hudi(Hadoop Upserts Deletes and Incrementals缩写):用于管理分布式文件系统DFS上大型分析数据集存储。一言以蔽之,Hudi是一种针对分析型业务的、扫描优化的数据存储抽象,它能够使DFS数据集在分钟级的时延内支持变更,也支持下游系统对这个数据集的增量处理。

Hudi 功能

  • Hudi是在大数据存储上的一个数据集,可以将Change Logs通过upsert的方式合并进Hudi;
  • Hudi对上可以暴露成一个普通Hive或Spark或Flink表,通过API或命令行可以获取到增量修改的信息,继续供下游消费;
  • Hudi保管修改历史,可以做时间旅行或回退;
  • Hudi内部有主键到文件级的索引,默认是记录到文件的布隆过滤器;

Hudi 特性

Apache Hudi使得用户能在Hadoop兼容的存储之上存储大量数据,同时它还提供两种原语,不仅可以批处理,还可以在数据湖上进行流处理。

  • Update/Delete记录:Hudi使用细粒度的文件/记录级别索引来支持Update/Delete记录,同时还提供写操作的事务保证。查询会处理最后一个提交的快照,并基于此输出结果。
  • 变更流:Hudi对获取数据变更提供了流的支持,可以从给定的时间点获取给定表中已updated/inserted/deleted的所有记录的增量流,并解锁新的查询类别。

Hudi 应用

  • Hudi对于Flink友好支持以后,可以使用Flink + Hudi构建实时湖仓一体架构,数据的时效性可以到分钟级,能很好的满足业务准实时数仓的需求。
  • 通过湖仓一体、流批一体,准实时场景下做到了:数据同源、同计算引擎、同存储、同计算口径。

Hudi 发展

  • Apache Hudi由Uber开发并开源,该项目在2016年开始开发,并于2017年开源,2019年1月进入 Apache 孵化器,且2020年6月成为Apache顶级项目,目前最新版本:0.12.2版本。
  • Hudi一开始支持Spark进行数据摄入(批量Batch和流式Streaming),从0.7.0版本开始,逐渐与Flink整合,主要在于Flink SQL整合,还支持Flink SQL CDC。

Hudi 体验

启动服务

Hudi是基于Hadoop的一个框架,首先要启动HDFS,Flink On Hudi,Flink,Flink SQL Client

启动HDFS集群,node1执行:

1
/export/server/hadoop/sbin/start-dfs.sh

单机版直接使用 start-dfs.sh 即可

将以下两个jar包放入到/export/server/flink/lib下(如果有多台Flink机器都要放)。

Jar包 资源地址
hudi-flink1.14-bundle-0.12.1.jar /export/server/hudi-0.12.1/packaging/hudi-flink-bundle/target/
flink-sql-connector-hive-3.1.2_2.12-1.14.5.jar

启动standalone集群服务,node1执行:

1
/export/server/flink/bin/start-cluster.sh

启动Flink SQL Cli命令行,node1执行:

1
/export/server/flink/bin/sql-client.sh embedded shell

设置CheckPoint:

1
set execution.checkpointing.interval=30sec;

可以在启动FlinkSQL客户端后面加-i,表示启动之前先运行一下脚本

sql-client.sh -i:指定文件启动flink-sql
可以将通用的sql放在一个初始的sql文件中,启动flink的同时执行语句
文件中可以写多个sql,用分号分割
vim sql-client.sql

1
2
-- 启动sql-client 
sql-client.sh -i sql-client.sql

插入数据

创建t1表,在Flink SQL Cli执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE t1(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi', -- 连接器指定hudi
'path' = 'hdfs://flinknode0:8020/hudi/t1', -- 数据存储地址
'table.type' = 'MERGE_ON_READ' -- 表类型,默认COPY_ON_WRITE,可选MERGE_ON_READ
);

使用values插入数据,执行:

1
2
3
4
5
6
7
8
9
INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

查看hdfs文件系统,hudi文件夹下生成名为t1的文件夹。地址:

[http://node1:9870/explorer.html#/hudi](http://node1:9870/explorer.html \l /hudi ) (注意对应的域名解析为node1)

在Flink SQL Cli查看表内容:

1
select * from t1;

更新数据

更新主键为id1的数据内容,执行:

1
2
insert into t1 values ('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');
select * from t1;

流式查询

流式查询(Streaming Query)需要设置read.streaming.enabled = true。再设置read.start-commit,如果想消费所有数据,设置值为earliest。

使用参数如下:

参数名称 是否必填 默认值 备注
read.streaming.enabled false false 设置为true,开启stream query
read.start-commit false the latest commit Instant time的格式为:’yyyyMMddHHmmss’
read.streaming_skip_compaction false false 是否不消费compaction commit,消费compaction commit可能会出现重复数据
clean.retain_commits false 10 cleaner 最多保留的历史 commits 数,大于此数量的历史 commits 会被清理掉,changelog 模式下,这个参数可以控制 changelog 的保留时间,例如 checkpoint 周期为 5 分钟一次,默认最少保留 50 分钟的时间。
注意:如果开启read.streaming.skip_compaction,但stream reader的速度比clean.retain_commits慢,就可能会造成没有读取数据就先进行了合并,合并后不再读取,从而造成数据丢失

在Flink Sql 执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
CREATE TABLE t2(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi', -- 连接器指定为hudi
'path' = 'hdfs://flinknode0:8020/hudi/t2', -- 数据存储地址
'table.type' = 'MERGE_ON_READ', -- 表类型,默认COPY_ON_WRITE,可选MERGE_ON_READ
'read.streaming.enabled' = 'true', -- 默认值false,设置为true,开启stream query
'read.start-commit' = '20210316134557', -- start-commit之前提交的数据不显示,默认值the latest commit,instant time的格式为:‘yyyyMMddHHmmss’
'read.streaming.check-interval' = '4' -- 检查间隔,默认60s
);


INSERT INTO t2 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

select * from t2;

Apache Hudi 核心概念剖析

Hudi表的三个主要组件:

  • 有序的时间轴元数据,类似于数据库事务日志;
  • 分层布局的数据文件:实际写入表中的数据;
  • 索引(多种实现方式):映射包含指定记录的数据集。

时间轴(⭐️)

Hudi把随着时间流逝,对表的一系列CRUD(增删改查)操作叫做Timeline,Timeline中某一次的操作,叫做Instant。Hudi的核心就是在所有的表中维护了一个包含在不同的即时时间对数据集操作(比如新增、修改或删除)的时间轴(Timeline)。

在Timeline上,每个commit被抽象为一个HoodieInstant,一个instant记录了一次提交 (commit) 的行为(action)、时间戳(time)、和状态(state)。

  • Instant Action: 指的是对Hudi表执行的操作类型,目前包括COMMITS、CLEANS、
    DELTA_COMMIT、COMPACTION、ROLLBACK、SAVEPOINT这6种操作类型。

    • Commits:表示一批记录原子性的写入到一张表中。
    • Cleans:清除表中不再需要的旧版本文件。
    • Delta_commit:增量提交指的是将一批记录原子地写入MergeOnRead类型表,其中一些/所有数据都可以写入增量日志。
    • Compaction:将行式文件转化为列式文件。
    • Rollback:Commits或者Delta_commit执行不成功时回滚数据,删除期间产生的任意文件。
    • Savepoint:将文件组标记为“saved”,cleans执行时不会删除对应的数据。
  • Instant Time:本次操作发生的时间,通常是时间戳(例如:20190117010349),它按照
    动作开始时间的顺序单调递增;

  • Instant State:表示在指定的时间点(Instant Time)对Hudi表执行操作(Instant Action)后,表所处的状态,目前包括REQUESTED(已调度但未初始化)、INFLIGHT(当前正在执行)、COMPLETED(操作执行完成)这3种状态

元数据表

Hudi元数据表可以显著提高查询的读/写性能,元数据表的主要目的是处理对“列表文件”操作的需求以时间轴(Timeline)的形式将数据集上的各项操作元数据维护起来,以支持数据集的瞬态视图,这部分元数据存储于根目录下的元数据目录。

  • Commits:一个单独的commit包含对数据集上一批数据的一次写入操作的相关信息。Hudi用

单调递增的时间戳来标识commits,标定的是一次写入操作的开始。

  • Cleans:用于清除数据集中不再被查询所用到的旧文件的后台活动。
  • Compactions:用于协调Hudi内部的数据结构差异的后台活动。例如,将更新操作由基于行存的日志文件归集到列存数据上。

文件管理(⭐️)

Hudi为了实现数据的CRUD(增删改查),需要能够唯一标识一条记录,Hudi将把数据集中的唯一字段(record key ) +数据所在分区(partitionPath) 联合起来当做数据的唯一键。其数据集的组织目录结构与Hive表示非常相似,一份数据集对应着一个根目录。数据集被打散为多个分区,分区字段以文件夹形式存在,该文件夹包含该分区的所有文件。在根目录下,每个分区都有唯一的分区路径,每个分区目录下有多个文件。

以HDFS存储来看,一个Hudi表的存储文件分为.hoodie文件与数据文件两类:

  • .hoodie 文件:由于CRUD(增删改查)的零散性,每一次的操作都会生成一个文件,这些小文件越来越多后,会严重影响HDFS的性能,Hudi设计了一套文件合并机制。.hoodie文件夹中存放了对应的文件合并操作相关的日志文件。
  • par1、par2等相关的路径是实际的数据文件,按分区存储,par1、par2等即分区名。
  • .hoodie文件

Instant State操作的状态:发起(REQUESTED),进行中(INFLIGHT),还是已完成(COMPLETED)

  • 数据文件

Hudi的base file (parquet文件) 在footer的meta记录了record key组成的BloomFilter,用于在file based index的实现中实现高效率的key contains检测。

Hudi的log(avro文件)是自己编码的,通过积攒数据buffer以LogBlock为单位写出,每个LogBlock 包含magic number、size、content、footer等信息,用于数据读、校验和过滤。

Index索引

hudi支持以下索引选项,可以使用hoodie.index.type选择这些选项:

  • Bloom Index(默认):使用由记录键构建的Bloom过滤器,还可以选择使用记录键范围修改候选文件。
  • 简单索引:将索引键从存储表中提取出来,与update/delete 操作的新数据对应的键进行join。
  • HBase索引:管理外部 Apache HBase 表中的索引映射。
  • 自带实现:可以扩展此公共API以实现自定义索引。

Bloom Index 和 简 单 索 引 都 有 全 局 选 项 : hoodie.index.type=GLOBAL_BLOOM hoodie.index.type=GLOBAL_SIMPLE。HBase索引本质上是一个全局索引。

全局索引之间的区别:

  • 全局索引:全局索引在表的所有分区中强制执行键的唯一性,即保证表中对于给定的记录键只存在一条记录。全局索引提供了更强的保证,但更新/删除成本随着表的大小而增长,所以更适合小表。
  • 非全局索引:仅在表的某一个分区内强制要求键保持唯一,非全局索引依靠写入器为同一个记录的 update/delete 提供一致的分区路径,同时大幅提高了效率,更适用于大表。因为分区后索引查找操作可以更好,并且扩展更加方便。

存储类型

数据计算模型

  • 批式模型(Batch)

批式模型就是使用MapReduce、Hive、Spark等典型的批计算引擎,以小时任务或者天任务的形式来做数据计算。

  • 流式模型(Stream)

流式模型,典型的就是使用Flink来进行实时的数据计算。

  • 增量模型(Incremental)

针对批式和流式的优缺点,Uber提出了增量模型(Incremental Mode),相对批式来讲,更加实时;相对流式而言,更加经济。增量模型,简单来讲,是以mini batch的形式来跑准实时任务。

Hudi在增量模型中支持了两个最重要的特性:

  • Upsert:这个主要是解决批式模型中,数据不能插入、更新的问题,有了这个特性,可以往 Hudi中写入增量数据,而不是每次进行完全的覆盖。(Hudi自身维护了key->file的映射,所以当upsert时很容易找到key对应的文件)
  • Incremental Query:增量查询,减少计算的原始数据量。以Uber中司机和乘客的数据流Join 为例,每次抓取两条数据流中的增量数据进行批式的Join即可,相比流式数据而言,成本要降低几个数量级。

Hudi的表类型(⭐️)

Hudi提供两类型表:写时复制(Copy on Write,COW)表和读时合并(Merge On Read,MOR)表。

  • Copy On Write

Copy On Write简称COW。它是在数据写入的时候,复制一份原来的拷贝,在其基础上添加新数据仅使用列文件格式(例如parquet)存储数据。

  • 更新update:在更新记录时,Hudi会先找到包含更新数据的文件,然后再使用更新值(最新的数据)重写该文件,包含其他记录的文件保持不变。当突然有大量写操作时会导致重写大量文件,从而导致极大的I/O开销。
  • 读取read:在读取数据时,通过读取最新的数据文件来获取最新的更新,此存储类型适用于少量写入和大量读取的场景。

数据副本在超过一定的个数限制后,将被删除(hoodie.cleaner.commits.retained )参数配置,保留几个历史版本,不包含最后一个版本,默认10个)。这种类型的表,没有compact instant,因为写入时相当于已经compact了。

  • Merge On Read
  • 更新Update:在更新记录时, 仅更新到增量文件(Avro)中,然后进行异步(或同步)的compaction, 最后创建列式文件(parquet)的新版本。此存储类型适合频繁写的工作负载,因为新记录是以追加的模式写入增量文件中。
  • 读取Read:在读取数据集时,需要先将delta log增量文件与旧文件进行合并,然后生成列式文件成功后,再进行查询。

Merge On Read简称MOR,使用列式(例如parquet)+ 基于行(例如avro)的文件格式组合来存储数据。更新记录到增量文件(log文件)中,然后进行同步或异步压缩以生成列文件(parquet文件)的新版本。

hoodie.compact.inline:开启是否一个事务完成后执行压缩操作,默认不开启

hoodie.compact.inline.max.delta.commits:设置提交多少次合并log文件到新的parquet文件,默认是5次

hoodie.cleaner.commits.retained:保存有多少parquet文件,即控制FileSlice文件个数。

  • COW vs MOR
权衡 写时复制COW 读时合并MOR
写数据延迟 更高 更低
更新代价(I/O) 更高(重写整个Parquet文件) 更低(追加到增量日志)
文件大小 更小(高更新代价(I/O)) 更大(低更新代价)
写放大 更高 更低(取决于压缩策略)
适用场景 写少读多 写多读少

Hudi查询类型(⭐️)

Hudi支持三种 不同的查询表的方 式:Snapshot Queries 、Incremental Queries 和Read Optimized Queries。

  • Snapshot Queries(快照查询)

查询某个增量提交操作中数据集的最新快照,先进行动态合并最新的基本文件(parquet)和增量文件(log)来提供近实时数据集(通常会存在几分钟的延迟)。即读取所有partiiton下每个FileGroup最新的FileSlice中的文件,Copy On Write表读parquet文件,Merge On Read表读parquet+log文件。

  • Incremental Queries(增量查询)

仅查询新写入数据集的文件,需要指定一个Commit/Compaction的即时时间(位于Timeline上的某个Instant)作为条件,来查询此条件之后的新数据。这需要提供变更流来启用增量数据管道。

  • Read Optimized Queries(读优化查询)

直接查询基本文件(已存在的数据集的最新快照),其实就是列式文件(Parquet)。并保证与非Hudi列式数据集相比,具有相同的列式查询性能。

可查看给定的commit/compact即时操作的表的最新快照。

通常读优化查询数据的最新程度取决于压缩策略。

表类型 支持的查询类型
写入时复制(Copy On Write) 快照查询(Snapshot Queries)+ 增量查询(Incremental Queries)
读取时合并(Merge On Read) 快照查询(Snapshot Queries)+ 增量查询(Incremental Queries)+ 读优化查询(Read Opitimized Queries)

写入类型

在Hudi数据湖框架中支持三种方式写入数据:UPSERT(插入更新)INSERT(插入)BULK INSERT(批插入)

UPSERT 插入

这是默认操作。在该操作中,数据先通过index打标(INSERT/UPDATE),即通过查找索引,将输入记录标记为插入或更新。再运行启发式算法以确定如何最好地将这些记录放到存储上。

目前比较通用的启发式算法一般有模拟退火算法(SA)、遗传算法(GA)、蚁群算法(ACO)、人工神经网络(ANN)等。

INSERT 插入

这种操作与 UPSERT 操作非常类似,只是跳过了查找索引这一步,使得它在性能上要比UPSERT 要快很多。如果只是需要 Hudi 的事务写/增量拉取数据/存储管理的能力,并且可以容忍重复数据,那么可以选择 INSERT 操作。

区别:跟Upsert相比,不去重,效率高。

BULK 插入

bulk_insert可以减少数据序列化以及合并操作,于此同时,该数据写入方式会跳过数据去重,所以用户需要保证数据的唯一性。

bulk_insert在批量写入模式中是更加有效率的。默认情况下,批量执行模式按照分区路径对输入记录进行排序,并将这些记录写入Hudi,该方式可以避免频繁切换文件句柄导致的写性能下降。

可以通过hoodie.bulkinsert.sort.mode配置项来设置上述模式(NONE,GLOBAL_SORT , PARTITION_SORT),默认值为GLOBAL_SORT。

Insert 效率排名: NONE > PARTITION_SORT > GLOBAL_SORT

upsert效率排名: GLOBAL_SORT> PARTITION_SORT > NONE

Flink写入Hudi

bulk_insert

1、用于快速导入快照数据到hudi

2、参数

参数名称 是否必须 默认值 参数说明
write.operation true upsert 设置为 bulk_insert 以开启bulk_insert功能
write.tasks false 4 bulk_insert 并行度, the number of files >= write.bucket_assign.tasks
write.bulk_insert.shuffle_by_partition false true 写入前是否根据分区字段进行数据重分布。启用此选项将减少小文件的数量,但可能存在数据倾斜的风险
write.bulk_insert.sort_by_partition false true 写入前是否根据分区字段对数据进行排序。启用此选项将在写任务写多个分区时减少小文件的数量
write.sort.memory false 128 排序算子的可用托管内存。默认为 128 MB

3、示例

1
2
3
4
5
6
7
with (
'connector' = 'hudi',
'path' = 'hdfs://node1:8020/test/stu_sink_hudi',
'table.type' = 'MERGE_ON_READ',
'write.option' = 'bulk_insert',
'write.precombine.field' = 'age'
);

Index bootstrap

1、该方式用于快照数据+增量数据的导入。如果快照数据已经通过bulk_insert导入到hudi,那么用户就可以近实时插入增量数据并且通过index bootstrap功能来确保数据不会重复。

2、参数

参数名称 是否必须 默认值 参数说明
index.bootstrap.enabled true false 当启用index bootstrap功能时,会将Hudi表中记录的索引一次性加载到Flink状态中
index.partition.regex false * 优化参数,设置正则表达式来过滤分区。 默认情况下,所有分区都被加载到Flink状态

3、使用方法

  • CREATE TABLE创建一条与Hudi表对应的语句。 注意这个table.type配置必须正确。
  • 设置index.bootstrap.enabled = true来启用index bootstrap功能
  • 在flink-conf.yaml文件中设置Flink checkpoint的容错机制,设置配置项execution.checkpointing.tolerable-failed-checkpoints = n(取决于Flink checkpoint执行时间)
  • 等待直到第一个checkpoint成功,表明index bootstrap完成。
  • 在index bootstrap完成后,用户可以退出并保存savepoint(或直接使用外部 checkpoint)。
  • 重启任务,并且设置index.bootstrap.enable 为 false

Changelog Mode

1、Hudi可以保留消息的所有中间变化(I / -U / U / D),然后通过flink的状态计算消费,从而拥有一个接近实时的数据仓库ETL管道(增量计算)。

2、参数

参数名称 是否必须 默认值 参数说明
changelog.enabled false false 它在默认情况下是关闭的,为了拥有upsert语义,只有合并的消息被确保保留,中间的更改可以被合并。 设置为true以支持使用所有更改

Insert Mode

1、将write.insert.deduplicate设置为false,则跳过重复数据删除

效果:每次刷新行为直接写入一个新的 parquet文件(MOR表也直接写入parquet文件)。

2、参数

参数名称 是否必须 默认值 参数说明
write.insert.deduplicate false true “插入模式”默认启用重复数据删除功能。 关闭此选项后,每次刷新行为直接写入一个新的 parquet文件

Hudi on Hive

1、原理:

将Hudi表的数据映射为Hive外部表,基于该外部表,Hive可以方便的进行实时视图,读优化视图以及增量视图的查询。

2、配置

添加jar包:将以下两个jar包放入到/export/server/hive/lib目录下(只放node1

Jar包 地址
hudi-hadoop-mr-bundle-0.12.1.jar /export/software/hudi-0.12.1/packaging/hudi-hadoop-mr-bundle/target
hudi-hive-sync-bundle-0.12.1.jar /export/software/hudi-0.12.1/packaging/hudi-hive-sync-bundle/target

● 修改hive-site.xml配置文件,添加参数

1
2
3
4
5
6
7
8
9
<property>
<name>hive.default.aux.jars.path</name>
<value>file:///export/server/hive/lib/hudi-hadoop-mr-bundle-0.12.1.jar,file:///export/server/hive/lib/hudi-hive-sync-bundle-0.12.1.jar</value>
</property>

<property>
<name>hive.aux.jars.path</name>
<value>file:///export/server/hive/lib/hudi-hadoop-mr-bundle-0.12.1.jar,file:///export/server/hive/lib/hudi-hive-sync-bundle-0.12.1.jar</value>
</property>

3 案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
CREATE TABLE if not exists hudi_student_info (
`id` varchar(20),
`name` varchar(20),
`birth` varchar(20),
`sex` varchar(10),
`update_time` TIMESTAMP(3),
PRIMARY KEY (`id`) NOT ENFORCED
) with(
'connector'='hudi',
'path'= 'hdfs://192.168.88.161:8020/hudi/student_info', -- 数据存储目录
'hoodie.datasource.write.recordkey.field'= 'id', -- 主键
'write.precombine.field'= 'update_time', -- 自动precombine的字段
'write.tasks'= '1',
'compaction.tasks'= '1',
'write.rate.limit'= '2000', -- 限速
'table.type'= 'MERGE_ON_READ', -- 默认COPY_ON_WRITE,可选MERGE_ON_READ
'compaction.async.enabled'= 'true', -- 是否开启异步压缩
'compaction.trigger.strategy'= 'num_commits', -- 按次数压缩
'compaction.delta_commits'= '1', -- 默认为5
'changelog.enabled'= 'true', -- 开启changelog变更
'read.tasks' = '1',
'read.streaming.enabled'= 'true', -- 开启流读
'read.start-commit'='earliest', -- 开始读取的位点
'read.streaming.check-interval'= '3', -- 检查间隔,默认60s
'hive_sync.enable'= 'true', -- 开启自动同步hive
'hive_sync.mode'= 'hms', -- 自动同步hive模式,默认jdbc模式
'hive_sync.metastore.uris'= 'thrift://192.168.88.161:9083', -- hive metastore地址
'hive_sync.table'= 'hive_student_info', -- hive 新建表名
'hive_sync.db'= 'flink_demo', -- hive 新建数据库名
'hive_sync.username'= '', -- HMS 用户名
'hive_sync.password'= '', -- HMS 密码
'hive_sync.support_timestamp'= 'true'-- 兼容hive timestamp类型
);

4 Hudi元数据字段含义

1
2
3
4
5
6
7
`_hoodie_commit_time`    string, 提交时间
`_hoodie_commit_seqno` string, 提交的序列号
`_hoodie_record_key` string, 数据的主键
`_hoodie_partition_path` string, 分区名(如果是分区表的时候)
`_hoodie_file_name` string, 文件名
`_hoodie_operation` string, 操作名
注意: _hoodie_record_key + _hoodie_partition_path 共同构成唯一键

5 ro表和rt表

  • rt表支持快照查询和增量查询,查询rt表将会查询表基本列数据和增量日志数据的合并视图,立马可以查询到修改后的数据。
  • ro表则只查询表中基本列数据并不会去查询增量日志里的数据。

select * from t1;