Flink SQL基本介绍

实操

FlinkSQL启动!!

1
2
3
4
# 先启动集群
/export/server/flink-1.15.2/bin/start-cluster.sh
# 再启动客户端
/export/server/flink-1.15.2/bin/sql-client.sh

设置输出模式

设置输出模式:我一般用的是tableau模式.

1
2
3
4
5
6
# 表格模式(table mode)在内存中实体化结果,并将结果用规则的分页表格可视化展示出来。执行如下命令启用:
SET sql-client.execution.result-mode=table;
# 变更日志模式(changelog mode)不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流:
SET sql-client.execution.result-mode=changelog;
# Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上:
SET sql-client.execution.result-mode=tableau;

全心全意写SQL

执行 SQL 查询:

1
2
3
SELECT 'Hello World';

SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;

SQL 查询案例

  • 案例场景:计算每一种商品(sku_id 唯一标识)的售出个数、总销售额、平均销售额、最低价、最高价
  • 数据准备:数据源为商品的销售流水(sku_id:商品,price:销售价格),然后写入到 Kafka 的指定 topic(sku_id:商品,count_result:售出个数、sum_result:总销售额、avg_result:平均销售额、min_result:最低价、max_result:最高价)当中
  • SQL Client中演示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
//1.创建一个数据源(输入)表,这里的数据源是 flink 自带的一个随机 mock 数据的数据源。
CREATE TABLE source_table (
sku_id STRING,
price BIGINT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.sku_id.length' = '1',
'fields.price.min' = '1',
'fields.price.max' = '1000000'
);

//2.创建一个数据汇(输出)表,输出到 kafka 中
CREATE TABLE sink_table (
sku_id STRING,
count_result BIGINT,
sum_result BIGINT,
avg_result DOUBLE,
min_result BIGINT,
max_result BIGINT,
PRIMARY KEY (`sku_id`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'test',
'properties.bootstrap.servers' = 'node1.itcast.cn:9092',
'key.format' = 'json',
'value.format' = 'json'
);

CREATE TABLE sink_table (
sku_id STRING,
count_result BIGINT,
sum_result BIGINT,
avg_result DOUBLE,
min_result BIGINT,
max_result BIGINT,
PRIMARY KEY (`sku_id`) NOT ENFORCED
) WITH (
'connector' = 'print'
);

//3.执行一段 group by 的聚合 SQL 查询
insert into sink_table
select sku_id,
count(*) as count_result,
sum(price) as sum_result,
avg(price) as avg_result,
min(price) as min_result,
max(price) as max_result
from source_table
group by sku_id ;

理论(Theory)

标准SQL分类(回顾)

  • DML(Data Manipulation Language):数据操作语言,用来定义数据库中的记录
  • DCL(Data Control Language):数据控制语言,用来定义访问权限和安全级别
  • DQL(Data Query Language):数据查询语言,用来查询记录
  • DDL(Data Definition Language):数据定义语言,用来定义数据库中的对象
  • FlnkSQL比DataStreamAPI、DataSetAPI实现简单、方便
  • TableAPI和SQL是流批通用的,代码可以完全复用
  • TableAPI和SQL可以使用Calcite的SQL优化器,可以实现自动程序优化更容易写出执行效率高的应用

Flink1.9版本引入了阿里巴巴的Blink实现流批一体

SQL解析与优化引擎Apache Calcite

Apache Calcite是一款使用Java编程语言编写的开源动态数据管理框架,它具备很多常用的数据库管理需要的功能,比如:SQL解析、SQL校验、SQL查询优化、SQL生成以及数据连接查询,目前使用Calcite作为SQL解析与优化引擎的有Hive、Drill、Flink、Phoenix和Storm。

Calcite中提供了RBO(Rule-Based Optimization:基于规则)和CBO(Cost-Based Optimization:基于代价)两种优化器,在保证语义的基础上,生成执行成本最低的SQL逻辑树。

TableEnvironment API

TableEnvironment:Table API & SQL 的都集成在一个统一上下文(即 TableEnvironment)中,其地位等同于 DataStream API 中的 StreamExecutionEnvironment 的地位

1
2
3
4
TableEnvironment::executeSql:用于 SQL API 中,可以执行一段完整 DDL,DML SQL。举例,方法入参可以是 CREATE TABLE xxx,INSERT INTO xxx SELECT xxx FROM xxx。
TableEnvironment::from(xxx):用于 Table API 中,可以以强类型接口的方式运行。方法入参是一个表名称。
TableEnvironment::sqlQuery:用于 SQL API 中,可以执行一段查询 SQL,并把结果以 Table 的形式返回。举例,方法的入参是 SELECT xxx FROM xxx
Table::executeInsert:用于将 Table 的结果插入到结果表中。方法入参是写入的目标表。

TableEnvironment 的功能

  • Catalog 管理:Catalog 可以理解为 Flink 的 MetaStore,类似 Hive MetaStore 对在 Hive 中的地位,关于 Flink Catalog 的详细内容后续进行介绍
    表管理:在 Catalog 中注册表
  • SQL 查询:(这 TMD 还用说,最基本的功能啊),就像 DataStream 中提供了 addSource、map、flatmap 等接口
  • UDF 管理:注册用户定义(标量函数:一进一出、表函数:一进多出、聚合函数:多进一出)函数
  • UDF 扩展:加载可插拔 Module(Module 可以理解为 Flink 管理 UDF 的模块,是可插拔的,可以自定义 Module,去支持奇奇怪怪的 UDF 功能)
  • DataStream 和 Table(Table API & SQL 的查询结果)之间进行转换:1.13 版本的只有流任务支持,批任务不支持。1.14 支持流批

创建方式

方式一:通过 EnvironmentSettings 创建 TableEnvironment

1
2
3
4
5
6
7
// 1. 就是设置一些环境信息
final EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
// 2. 创建 TableEnvironment
final TableEnvironment tEnv = TableEnvironment.create(settings);
// 如果是 in_streaming_mode,则最终创建出来的 TableEnvironment 实例为 StreamTableEnvironmentImpl
// 如果是 in_batch_mode,则最终创建出来的 TableEnvironment 实例为 TableEnvironmentImpl
// 虽然两者都继承了 TableEnvironment 接口,但是 StreamTableEnvironmentImpl 支持的功能更多一些。可以直接去看看接口实验一下,这里就不进行详细介绍。

方式二:通过已有的 StreamExecutionEnvironment 创建 TableEnvironment

1
2
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

SQL中的表

外部表与视图

一个表的全名(标识)会由三个部分组成:Catalog 名称.数据库名称.表名称

如果 Catalog 名称或者数据库名称没有指明,就会使用当前默认值 default

  • 外部表 TABLE:描述的是外部数据,例如文件(HDFS)、消息队列(Kafka)等。依然拿离线 Hive SQL 举个例子,离线中一个表指的是 Hive 表,也就是所说的外部数据。
  • 视图 VIEW:从已经存在的表中创建,视图一般是一个 SQL 逻辑的查询结果。对比到离线的 Hive SQL 中,在离线的场景(Hive 表)中 VIEW 也都是从已有的表中去创建的。

注意:对于视图不会真的产生一个中间表供下游多个查询去引用,即多个查询不共享这个 Table 的结果,可以理解为是一种中间表的简化写法,不会先产出一个中间表结果,然后将这个结果在下游多个查询中复用,后续的多个查询会将这个 Table 的逻辑执行多次。类似于 with tmp as (DML) 的语法

临时表与永久表

  • 临时表:通常保存于内存中并且仅在创建它们的 Flink session(可以理解为一次 Flink 任务的运行)持续期间存在。这些表对于其它 session(即其他 Flink 任务或非此次运行的 Flink 任务)是不可见的。
  • 永久表:需要外部 Catalog(例如 Hive Metastore)来持久化表的元数据。一旦永久表被创建,它将对任何连接到这个 Catalog 的 Flink session 可见且持续存在,直至从 Catalog 中被明确删除。

如果临时表和永久表使用了相同的名称(Catalog名.数据库名.表名)。那么在这个 Flink session 中,你的任务访问到这个表时,访问到的永远是临时表(即相同名称的表,临时表会屏蔽永久表)。

Flink SQL数据类型

Java八大基本数据类型:byte→short→int→long float→double char boolean

String类型:引用类型,底层是字符数组char[]

原子数据类型

字符串类型
CHAR
CHAR(n)
定长字符串,就和 Java 中的 Char 一样,n 代表字符的定长
取值范围 [1-2,147,483,647]。如果不指定 n,则默认为 1。
VARCHAR
VARCHAR(n)
STRING
可变长字符串,就和 Java 中的 String 一样,n 代表字符的最大长度
取值范围 [1-2,147,483,647]。如果不指定 n,则默认为 1。STRING 等同于 VARCHAR(2147483647)。
二进制类型
BINARY
BINARY(n)
定长二进制字符串,n 代表定长
取值范围 [1, 2,147,483,647]。如果不指定 n,则默认为 1。
VARBINARY
VARBINARY(n)
BYTES
可变长二进制字符串,n 代表字符的最大长度
取值范围 [1, 2,147,483,647]。如果不指定 n,则默认为 1。BYTES 等同于 VARBINARY(2147483647)。
数值类型
DECIMAL
DECIMAL(p)
DECIMAL(p, s)
DEC、DEC(p)
DEC(p, s)
NUMERIC
NUMERIC(p)
NUMERIC(p, s)
固定长度和精度的数值类型,就和 Java 中的 BigDecimal 一样
p 代表数值位数(长度),取值范围 [1, 38]
s 代表小数点后的位数(精度),取值范围 [0, p]
如果不指定,p 默认为 10,s 默认为 0。
TINYINT -128 to 127 的 1 字节大小的有符号整数,就和 Java 中的 byte 一样。
SMALLINT -32,768 to 32,767 的 2 字节大小的有符号整数,就和 Java 中的 short 一样。
INT
INTEGER
-2,147,483,648 to 2,147,483,647 的 4 字节大小的有符号整数,就和 Java 中的 int 一样。
BIGINT -9,223,372,036,854,775,808 to 9,223,372,036,854,775,807 的 8 字节大小的有符号整数,就和 Java 中的 long 一样。
FLOAT 4 字节大小的单精度浮点数值,就和 Java 中的 float 一样。
DOUBLE
DOUBLE PRECISION
8 字节大小的双精度浮点数值,就和 Java 中的 double 一样。
特殊类型
NULL类型 NULL
Raw类型 RAW(‘class’, ‘snapshot’) 。只会在数据发生网络传输时进行序列化,反序列化操作,可以保留其原始数据。以 Java 举例,class 参数代表具体对应的 Java 类型,snapshot 代表类型在发生网络传输时的序列化器
布尔类型 BOOLEAN
时间类型
DATE 年-月-日组成的 不带时区含义 的日期类型,取值范围 [0000-01-01, 9999-12-31]
TIME
TIME(p)
小时:分钟:秒[.小数秒]组成的 不带时区含义 的的时间的数据类型,精度高达纳秒
取值范围 [00:00:00.000000000到23:59:59.9999999]。其中 p 代表小数秒的位数,取值范围 [0, 9],如果不指定 p,默认为 0。
TIMESTAMP
TIMESTAMP(p)
TIMESTAMP WITHOUT TIME ZONE
TIMESTAMP(p) WITHOUT TIME ZONE
年-月-日 小时:分钟:秒[.小数秒] 组成的不带时区含义的时间类型
取值范围 [0000-01-01 00:00:00.000000000, 9999-12-31 23:59:59.999999999]。其中 p 代表小数秒的位数,取值范围 [0, 9],如果不指定 p,默认为 6。
TIMESTAMP WITH TIME ZONE
TIMESTAMP(p) WITH TIME ZONE
年-月-日 小时:分钟:秒[.小数秒] 时区 组成的 带时区含义 的时间类型
取值范围 [0000-01-01 00:00:00.000000000 +14:59, 9999-12-31 23:59:59.999999999 -14:59]。其中 p 代表小数秒的位数,取值范围 [0, 9],如果不指定 p,默认为 6。
TIMESTAMP_LTZ
TIMESTAMP_LTZ(p)
年-月-日 小时:分钟:秒[.小数秒] 时区 组成的 带时区含义 的时间类型
取值范围 [0000-01-01 00:00:00.000000000 +14:59, 9999-12-31 23:59:59.999999999 -14:59]。其中 p 代表小数秒的位数,取值范围 [0, 9],如果不指定 p,默认为 6。
INTERVAL YEAR TO MONTH
INTERVAL DAY TO SECOND
interval 的涉及到的种类比较多。INTERVAL 主要是用于给 TIMESTAMP、TIMESTAMP_LTZ 添加偏移量的。举例,比如给 TIMESTAMP 加、减几天、几个月、几年。

TIMESTAMP_LTZTIMESTAMP WITH TIME ZONE 的区别在于:TIMESTAMP WITH TIME ZONE 的时区信息是携带在数据中的,举例:其输入数据应该是 2022-01-01 00:00:00.000000000 +08:00;TIMESTAMP_LTZ 的时区信息不是携带在数据中的,而是由 Flink SQL 任务的全局配置决定的,我们可以由 table.local-time-zone 参数来设置时区。

INTERVAL演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
Flink SQL> CREATE TABLE sink_table2 (
result_interval_year TIMESTAMP(3),
result_interval_year_p TIMESTAMP(3),
result_interval_year_p_to_month TIMESTAMP(3),
result_interval_month TIMESTAMP(3),
result_interval_day TIMESTAMP(3),
result_interval_day_p1 TIMESTAMP(3),
result_interval_day_p1_to_hour TIMESTAMP(3),
result_interval_day_p1_to_minute TIMESTAMP(3),
result_interval_day_p1_to_second_p2 TIMESTAMP(3),
result_interval_hour TIMESTAMP(3),
result_interval_hour_to_minute TIMESTAMP(3),
result_interval_hour_to_second TIMESTAMP(3),
result_interval_minute TIMESTAMP(3),
result_interval_minute_to_second_p2 TIMESTAMP(3),
result_interval_second TIMESTAMP(3),
result_interval_second_p2 TIMESTAMP(3)
) WITH (
'connector' = 'print'
);

Flink SQL> INSERT INTO sink_table2
SELECT
-- Flink SQL 支持的所有 INTERVAL 子句如下,总体可以分为 `年-月`、`日-小时-秒` 两种

-- 1. 年-月。取值范围为 [-9999-11, +9999-11],其中 p 是指有效位数,取值范围 [1, 4],默认值为 2。比如如果值为 1000,但是 p = 2,则会直接报错。
-- INTERVAL YEAR
f1 + INTERVAL '10' YEAR as result_interval_year
-- INTERVAL YEAR(p)
, f1 + INTERVAL '100' YEAR(3) as result_interval_year_p
-- INTERVAL YEAR(p) TO MONTH
, f1 + INTERVAL '10-03' YEAR(3) TO MONTH as result_interval_year_p_to_month
-- INTERVAL MONTH
, f1 + INTERVAL '13' MONTH as result_interval_month
-- 2. 日-小时-秒。取值范围为 [-999999 23:59:59.999999999, +999999 23:59:59.999999999],其中 p1\p2 都是有效位数,p1 取值范围 [1, 6],默认值为 2;p2 取值范围 [0, 9],默认值为 6
-- INTERVAL DAY
, f1 + INTERVAL '10' DAY as result_interval_day
-- INTERVAL DAY(p1)
, f1 + INTERVAL '100' DAY(3) as result_interval_day_p1
-- INTERVAL DAY(p1) TO HOUR
, f1 + INTERVAL '10 03' DAY(3) TO HOUR as result_interval_day_p1_to_hour
-- INTERVAL DAY(p1) TO MINUTE
, f1 + INTERVAL '10 03:12' DAY(3) TO MINUTE as result_interval_day_p1_to_minute
-- INTERVAL DAY(p1) TO SECOND(p2)
, f1 + INTERVAL '10 00:00:00.004' DAY TO SECOND(3) as result_interval_day_p1_to_second_p2
-- INTERVAL HOUR
, f1 + INTERVAL '10' HOUR as result_interval_hour
-- INTERVAL HOUR TO MINUTE
, f1 + INTERVAL '10:03' HOUR TO MINUTE as result_interval_hour_to_minute
-- INTERVAL HOUR TO SECOND(p2)
, f1 + INTERVAL '00:00:00.004' HOUR TO SECOND(3) as result_interval_hour_to_second
-- INTERVAL MINUTE
, f1 + INTERVAL '10' MINUTE as result_interval_minute
-- INTERVAL MINUTE TO SECOND(p2)
, f1 + INTERVAL '05:05.006' MINUTE TO SECOND(3) as result_interval_minute_to_second_p2
-- INTERVAL SECOND
, f1 + INTERVAL '3' SECOND as result_interval_second
-- INTERVAL SECOND(p2)
, f1 + INTERVAL '300' SECOND(3) as result_interval_second_p2
FROM (SELECT TO_TIMESTAMP_LTZ(1640966476500, 3) as f1)

复合数据类型

数组类型
ARRAY
t ARRAY
数组最大长度为 2,147,483,647。t 代表数组内的数据类型。举例 ARRAY、ARRAY,其等同于 INT ARRAY、STRING ARRAY
Map类型
MAP<kt, vt> Map 类型就和 Java 中的 Map 类型一样,key 是没有重复的。举例 Map<STRING, INT>、Map<BIGINT, STRING>
集合类型
MULTISET
t MULTISET
就和 Java 中的 List 类型一样,允许重复的数据。举例 MULTISET,其等同于 INT MULTISET
对象类型
ROW<n0 t0, n1 t1, …>
ROW<n0 t0 ‘d0’, n1 t1 ‘d1’, …>
ROW(n0 t0, n1 t1, …)
ROW(n0 t0 ‘d0’, n1 t1 ‘d1’, …)
就和 Java 中的自定义对象一样。举例:ROW(myField INT, myOtherField BOOLEAN),其等同于 ROW<myField INT, myOtherField BOOLEAN>

案例演示:

  • 样例数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
{
"id":1238123899121,
"name":"itcast",
"date":"1990-10-14",
"obj":{
"time1":"12:12:43",
"str":"sfasfafs",
"lg":2324342345
},
"arr":[
{
"f1":"f1str11",
"f2":134
},
{
"f1":"f1str22",
"f2":555
}
],
"time":"12:12:43",
"timestamp":"1990-10-14 12:12:43",
"map":{
"flink":123
},
"mapinmap":{
"inner_map":{
"key":234
}
}
}
  • 开启netcat
1
{"id":1238123899121,"name":"itcast","date":"1990-10-14","obj":{"time1":"12:12:43","str":"sfasfafs","lg":2324342345},"arr":[{"f1":"f1str11","f2":134},{"f1":"f1str22","f2":555}],"time":"12:12:43","timestamp":"1990-10-14 12:12:43","map":{"flink":123},"mapinmap":{"inner_map":{"key":234}}}
  • 创建映射表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CREATE TABLE json_source (
id BIGINT,
name STRING,
`date` DATE,
obj ROW<time1 TIME,str STRING,lg BIGINT>,
arr ARRAY<ROW<f1 STRING,f2 INT>>,
`time` TIME,
`timestamp` TIMESTAMP(3),
`map` MAP<STRING,BIGINT>,
mapinmap MAP<STRING,MAP<STRING,INT>>,
proctime as PROCTIME()
) WITH (
'connector' = 'socket',
'hostname' = 'node1',
'port' = '9999',
'format' = 'json'
);
  • 查询结果
1
select id, name,`date`,obj.str,arr[3].f1,`map`['flink'],mapinmap['inner_map']['key'] from json_source;

Flink SQL应用于流

流批处理的异同

输入表 处理逻辑 结果表
批处理 静态表:输入数据有限、是有界集合 批式计算:每次执行查询能够访问到完整的输入数据,然后计算,输出完整的结果数据 静态表:数据有限
流处理 动态表:输入数据无限,数据实时增加,并且源源不断 流式计算:执行时不能够访问到完整的输入数据,每次计算的结果都是一个中间结果 动态表:数据无限

要将 SQL 应用于流式任务的三个要解决的核心点:

1.SQL 输入表:分析如何将一个实时的,源源不断的输入流数据表示为 SQL 中的输入表。

2.SQL 处理计算:分析将 SQL 查询逻辑翻译成什么样的底层处理技术才能够实时的处理流式输入数据,然后产出流式输出数据。

3.SQL 输出表:分析如何将 SQL 查询输出的源源不断的流数据表示为一个 SQL 中的输出表。

两种技术方案:

1.动态表:源源不断的输入、输出流数据映射到 动态表

2.连续查询:实时处理输入数据,产出输出数据的实时处理技术

动态表

如果把有界数据集当作表,那么无界数据集(流)就是一个随着时间变化持续写入数据的表,Flink中使用动态表表示流,使用静态表表示传统的批处理中的数据集。

流是Flink DataStream中的概念,动态表是Flink SQL中的概念,两者都是无界数据集。动态表在Flink中抽象为Table API

连续查询

将SQL查询应用于动态表,会持续执行而不会终止,因为数据会持续的产生,所以连续查询不会给出一个最终结果,而是持续不断地更新结果,实际上给出的总是中间结果。

流上的SQL查询运算与批处理中的SQL查询运算在语义上完全相同,对于相同的数据集计算结果也是相同的。

执行过程

从概念上来说:

1.流转换为动态表

2.在动态表上执行连续查询,生成新的动态表

3.生成的动态表转换回流

从开发上来说:

1.将DataStream注册为Table

2.在Table上应用SQL查询语句,结果为一个新的Table

3.将Table转换为DataStream

第一步-流转换为表

第二步-更新和追加查询

1.向结果表中插入新记录、更新旧的记录

2.只会向结果表中插入新记录

同时包含插入(Insert)、更新(Update)的查询必须维护更多的State,消耗更多的CPU、内存资源。

流上使用SQL的限制:

1.需要维护的状态太大

2.计算更新的成本太高

第三步-表转换为流

动态表分为三种类型:

1.只有更新行为,表中的结果被持续更新

2.只有插入行为,没有UPDATA和DELETE行为的结果表

3.既有更新行为又有插入行为的结果表

不同的表类型会转换为不同的流对外输出。

Append流

只支持写入行为,输出的结果只有 INSERT 操作的数据。

Retract流

Retract 流包含两种类型的 message: add messages 和 retract messages 。

  • 将 INSERT 操作编码为 add message
  • 将 DELETE 操作编码为 retract message
  • 将 UPDATE 操作编码为更新先前行的 retract message 和更新(新)行的 add message,从而将动态表转换为 retract 流。

Retract 流写入到输出结果表的数据有 -,+ 两种,分别 - 代表撤回旧数据,+ 代表输出最新的数据。这两种数据最终都会写入到输出的数据引擎中。

如果下游还有任务去消费这条流的话,要注意需要正确处理 -,+ 两种数据,防止数据计算重复或者错误。

Upsert流

Upsert 流包含两种类型的 message: upsert messages 和 delete messages。转换为 upsert 流的动态表需要唯一键(唯一键可以由多个字段组合而成)。

  • 将 INSERT 和 UPDATE 操作编码为 upsert message
  • 将 DELETE 操作编码为 delete message

Upsert流写入到输出结果表的数据每次输出的结果都是当前根据唯一键的最新结果数据,不会有 Retract流中的 - 回撤数据。

如果下游还有一个任务去消费这条流的话,消费流的算子需要知道唯一键(即 user),以便正确地根据唯一键(user)去拿到每一个 user 当前最新的状态。其与 retract 流的主要区别在于 UPDATE 操作是用单个 message 编码的,因此效率更高。

Flink 中的四大基石

Flink之所以能这么流行,离不开它最重要的四个基石:Checkpoint、State、Time、Window。

三种时间属性

  • 事件时间:必须是由数据本身携带的数据,这个时间标志的是事件产生的时间。
  • 处理时间:指的是算子计算这个数据的时候产生的时间。
  • 到达时间:指的是数据从数据源进入到计算引擎的时间。

指定时间属性

  • CREATE TABLE DDL 创建表的时候指定
  • 可以在 DataStream 中指定,在后续的 DataStream 转为 Table 中使用

SQL时间案例

处理时间案例

处理时间语义下,使用当前机器的系统时间作为处理时间。它是时间的最简单概念。它既不需要提取时间戳,也不需要生成watermark

  • 语法:CREATE TABLE DDL 指定时间戳的方式
1
2
3
4
5
6
7
8
CREATE TABLE user_actions (
user_name STRING,
data STRING,
-- 使用下面这句来将 user_action_time 声明为处理时间
user_action_time AS PROCTIME()
) WITH (
...
);
  • 读取’order.csv’文件的数据,在原本的Schema上添加一个虚拟的时间戳列,时间戳列由PROCTIME()函数计算产生。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
-- 创建映射表
create table InputTable (
`userid` varchar,
`timestamp` bigint,
`money` double,
`category` varchar,
`pt` AS PROCTIME()
) with (
'connector' = 'filesystem',
'path' = 'file:///export/data/input/order.csv',
'format' = 'csv'
);
-- 描述表
desc InputTable ;

事件时间案例

Event Time时间语义使用一条数据实际发生的时间作为时间属性,在Table API & SQL中这个字段通常被称为rowtime。这种模式下多次重复计算时,计算结果是确定的。

Event Time时间语义可以保证流处理和批处理的统一。

Event Time时间语义下,我们需要设置每条数据发生时的时间戳,并提供一个Watermark

  • 语法:CREATE TABLE DDL 指定时间戳的方式
1
2
3
4
5
6
7
8
9
10
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time TIMESTAMP(3),
-- 使用下面这句来将 user_action_time 声明为事件时间,并且声明 watermark 的生成规则,即 user_action_time 减 5 秒
-- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
...
);
  • 实际应用中时间戳一般都是秒或者是毫秒(BIGINT 类型)需要转换类型
1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE user_actions (
user_name STRING,
data STRING,
-- 1. 这个 ts 就是常见的毫秒级别时间戳
ts BIGINT,
-- 2. 将毫秒时间戳转换成 TIMESTAMP_LTZ 类型
time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
-- 3. 使用下面这句来将 user_action_time 声明为事件时间,并且声明 watermark 的生成规则,即 user_action_time 减 5 秒
-- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型
WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
...
);
  • 读取order.csv’文件的数据,定义现有事件时间字段上的 watermark 生成表达式,该表达式将事件时间字段标记为事件时间属性。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
-- 创建映射表
-- 这种方式只是增加了一个 rt 的TIMESTAMP列
create table InputTable2 (
`userid` varchar,
`timestamp` bigint,
`money` double,
`category` varchar,
rt AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`))
) with (
'connector' = 'filesystem',
'path' = 'file:///export/data/input/order.csv',
'format' = 'csv'
);
-- 描述表
desc InputTable2;
-- 事件时间需要结合watermark(水位线)使用
create table InputTable3 (
`userid` varchar,
`timestamp` bigint,
`money` double,
`category` varchar,
rt AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
watermark for rt as rt - interval '1' second
) with (
'connector' = 'filesystem',
'path' = 'file:///export/data/input/order.csv',
'format' = 'csv'
);

Flink 中的窗口操作

窗口概述

在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。

Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。而窗口(window)就是从 Streaming 到 Batch 的一个桥梁。

  • 一个Window代表有限对象的集合。一个窗口有一个最大的时间戳,该时间戳意味着在其代表的某时间点——所有应该进入这个窗口的元素都已经到达
  • Window就是用来对一个无限的流设置一个有限的集合,在有界的数据集上进行操作的一种机制。
  • 在Table API & SQL中,主要有两种窗口:Group Windows 和 Over Windows。
    • Group Windows 根据时间或行计数间隔将组行聚合成有限的组,并对每个组计算一次聚合函数
    • Over Windows 窗口内聚合为每个输入行在其相邻行范围内计算一个聚合

Group Windows

滚动窗口(TumblingTimeWindow)

滚动窗口定义:滚动窗口将每个元素指定给指定窗口大小的窗口。滚动窗口具有固定大小,且不重叠。例如,指定一个大小为 5 分钟的滚动窗口。在这种情况下,Flink 将每隔 5 分钟开启一个新的窗口,其中每一条数都会划分到唯一一个 5 分钟的窗口中:

应用场景:常见的按照一分钟对数据进行聚合,计算一分钟内 PV,UV 数据。

img

基于DataStream编程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
DataStream<T> input = ...

// 基于Event Time的滚动窗口
input.keyBy(…)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<window function>(...)

// 基于Processing Time的滚动窗口
input.keyBy(…)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<window function>(...)

// 基于EventTime Time,在小时级滚动窗口上设置15分钟的Offset偏移
input.keyBy(…)
.window(TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(15)))
.<window function>(...)

注意:时间窗口使用的是timeWindow()也可以使用window(),比如,input.keyBy(…).timeWindow(Time.seconds(1))。timeWindow()是一种简写,传入一个参数则滚动窗口,为窗口大小,若传入两个参数为滑动窗口,第一个参数为窗口大小,第二个参数为滑动时间。该方法在新版本中已过期。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package io.github.windows;

import lombok.SneakyThrows;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class TumblingWindowExercise {
public static void main(String[] args) throws Exception {
// 1, 创建DataStream运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度为1, 因为时间窗口是默认是CPU的核数
env.setParallelism(1);

// 2, 创建Source算子获取数据
DataStreamSource<String> streamSource = env.socketTextStream("node1", 9999);

// 3, 对数据进行处理
SingleOutputStreamOperator<Tuple3<String, Integer, Long>> tuple3DS = streamSource.flatMap((FlatMapFunction<String, Tuple3<String, Integer, Long>>) (value, out) ->
out.collect(Tuple3.of(
value.split(",")[0],
Integer.valueOf(value.split(",")[1]),
Long.valueOf(value.split(",")[2]))))
.returns(Types.TUPLE(Types.STRING, Types.INT, Types.LONG));

// 提取时间戳设置事件时间
SingleOutputStreamOperator<Tuple3<String, Integer, Long>> watermarks = tuple3DS.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple3<String, Integer, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, Long>>() {
@SneakyThrows
@Override
public long extractTimestamp(Tuple3<String, Integer, Long> element, long recordTimestamp) {
// 时间时间指定时的单位为毫秒
return element.f2 * 1000;
}
}));

// 4, 进行窗口计算
// 基于处理时间的分组窗口(是根据系统时间的推迟触发的计算, 如果没有keyby则会忽略key直接进行字段的计算sum)
// 有key的话, 则会一个key一个窗口, 到达指定时间就会触发计算
tuple3DS.keyBy(t -> t.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.sum(1)
.print();

// 基于处理时间的全局窗口
// tuple3DS
// .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(3L)))
// .sum(1)
// .print();

// 基于事件时间的分组窗口(必须要设置水印策略) TumblingEventTimeWindows 换成TumblingProcessingTimeWindows 其他一样
watermarks.keyBy(t -> t.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum(1)
.print();
// 5, 提交任务并执行
env.execute();
}
}

基于SQL编程

TUMBLE函数基于时间属性字段将每个元素分配到一个指定大小的窗口中。
在流模式下,时间属性字段必须是事件或处理时间属性。
在批处理模式下,窗口表函数的时间属性字段必须是TIMESTAMP或TIMESTAMP _LTZ类型的属性。TUMBLE的返回值是一个新的关系,它包括原始关系的所有列,以及另外3列“window_start”“window_end”“window_time”,以指示指定的窗口。原始时间属性“timecol”将是窗口TVF之后的常规时间戳列。

TUMBLE函数接受三个必需参数,一个可选参数:
TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])

  • data: 是一个表参数,可以是与时间属性列的任何关系。
  • timecol: 是一个列描述符,指示数据的哪些时间属性列应映射到翻转窗口。
  • size: 是指定滚动窗口宽度的持续时间。
  • offset: 是一个可选参数,用于指定窗口起始位置的偏移量。

两种 Flink SQL 实现方式:

  • Group Window Aggregation(1.14之前只有此类方案,此方案在 1.14及之后版本已经标记为废弃,不推荐使用)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
-- 数据源表
CREATE TABLE source_table (
user_id STRING,
price BIGINT,
`timestamp` bigint,
row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
watermark for row_time as row_time - interval '0' second
) WITH (
'connector' = 'socket',
'hostname' = 'node1',
'port' = '9999',
'format' = 'csv'
);

-- 数据处理逻辑
select
user_id,
sum(price) as sum_price,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '5' second) AS STRING)) * 1000 as window_start
from source_table
group by
user_id,
tumble(row_time, interval '5' second);

可以看到 Group Window Aggregation 滚动窗口的 SQL 语法就是把 tumble window 的声明写在了 group by 子句中,即 tumble(row_time, interval ‘1’ minute)

第一个参数为事件时间的时间戳

第二个参数为滚动窗口大小

  • Windowing TVF(1.14 只支持 Streaming 任务,1.15版本开始支持 Batch\Streaming 任务)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
SELECT 
user_id,
UNIX_TIMESTAMP(CAST(window_start AS STRING)) * 1000 as window_start,
UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end,
window_time,
sum(price) as sum_price
FROM TABLE(TUMBLE(
TABLE source_table
, DESCRIPTOR(row_time)
, INTERVAL '5' SECOND))
GROUP BY window_start,
window_end,
window_time,
user_id;

可以看到 Windowing TVF 滚动窗口的写法就是把 tumble window 的声明写在了数据源的 Table 子句中,即 TABLE(TUMBLE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL ‘60’ SECOND)),包含三部分参数。

第一个参数 TABLE source_table 声明数据源表;

第二个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;

第三个参数 INTERVAL ‘60’ SECOND 声明滚动窗口大小为 1 min。

滑动窗口(SlidingTimeWindow)

滑动窗口定义:滑动窗口也是将元素指定给固定长度的窗口。与滚动窗口功能一样,也有窗口大小的概念。不一样的地方在于,滑动窗口有另一个参数控制窗口计算的频率(滑动窗口滑动的步长)。因此,如果滑动的步长小于窗口大小,则滑动窗口之间每个窗口是可以重叠。在这种情况下,一条数据就会分配到多个窗口当中。举例,有 10 分钟大小的窗口,滑动步长为 5 分钟。这样,每 5 分钟会划分一次窗口,这个窗口包含的数据是过去 10 分钟内的数据,如下图所示:

在流模式下,时间属性字段必须是事件或处理时间属性。

应用场景:比如计算同时在线的数据,要求结果的输出频率是 1 分钟一次,每次计算的数据是过去 5 分钟的数据(有的场景下用户可能在线,但是可能会 2 分钟不活跃,但是这也要算在同时在线数据中,所以取最近 5 分钟的数据就能计算进去了)

image-20221224182559887

基于DataStream编程

我们使用Time类中的时间单位来定义Slide和Size,也可以设置offset。同样,timeWindow是一种缩写,根据执行环境中设置的时间语义来选择相应的方法初始化窗口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
val input: DataStream[T] = ...

// sliding event-time windows
input.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<window function>(...)

// sliding processing-time windows
input.keyBy(<...>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<window function>(...)

// sliding processing-time windows offset by -8 hours
input.keyBy(<...>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<window function>(...)

Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package io.github.windows;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.tuple.Tuple3;

public class SlidingTimeWindowDemo {
public static void main(String[] args) throws Exception {
// 1, 构建流处理运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);

// 2, 构建Socket Source
DataStreamSource<String> streamSource = env.socketTextStream("node1", 9999);

// 3, 对数据进行处理并提取时间戳
SingleOutputStreamOperator<Tuple3<String, Integer, Long>> tuple3DS = streamSource.flatMap(new FlatMapFunction<String, Tuple3<String, Integer, Long>>() {
/*
out.collect(new Tuple3(value.split(",")[0],value.split(",")[0],value.split(",")[0]))
可以换成String[] strings = value.split(",");
Tuple3<String,Integer, Long> tuple3 = new Tuple3<泛型就不写嘞>(strings[0], strings[1], strings[2]);
out.collect(tuple3);
*/
@Override
public void flatMap(String value, Collector<Tuple3<String, Integer, Long>> out) throws Exception {

out.collect(new Tuple3<>(
value.split(",")[0],
Integer.parseInt(value.split(",")[1]),
Long.parseLong(value.split(",")[2])
));
} // 提取字段的时间戳 因为forMonotonousTimestamps是泛型方法, 需要
});

// 4, 提取 输入数据的时间戳 作为事件时间,
// 最终通过tuple3.assignTimeStampAndWatermarks(WaterStrategy.forMonotonousTimeStamps().withTimeStampsAssigner)返回一个水印
SingleOutputStreamOperator<Tuple3<String, Integer, Long>> watermarks = tuple3DS.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Long>>forMonotonousTimestamps()
.withTimestampAssigner((Tuple3<String, Integer, Long> element, long recordTimestamp) -> {
return element.f2 * 1000; // 如果导入Scala的包, 很有可能会没有f0这个函数
}
));

// 5, 进行窗口计算 (如果代码报红, 先去顶部看看哪个包是灰的, 删除, 然后回来就看到提示导包了, 重新导入就行)
watermarks
.keyBy(t -> t.f0)
// 每隔5秒计算当前10s内的数据
// 如果每隔10秒计算当前5s内的数据, 则数据会漏掉
.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))
.sum(1)
.print();

// 6, 提交计算任务
env.execute();

}
}

基于SQL编程

在批处理模式下,窗口表函数的时间属性字段必须是TIMESTAMPTIMESTAMP _LTZ类型的属性。HOP的返回值是一个新的关系,它包括原始关系的所有列,以及另外3列“window_start”“window_end”“window_time”,以指示指定的窗口。原始时间属性“timecol”将是窗口TVF之后的常规时间戳列。
HOP接受四个必需参数,一个可选参数:
HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])

  • data: 是一个表参数,可以是与时间属性列的任何关系
  • timecol:是一个列描述符,指示数据的哪些时间属性列应映射到跳跃窗口。
  • slide: 是一个持续时间,指定顺序跳跃窗口开始之间的持续
  • size: 是指定跳跃窗口宽度的持续时间。
  • offset: 是一个可选参数,用于指定窗口起始位置的偏移量。

两种Flink SQL实现方式:

  • Group Window Aggregation 方案(支持 Batch\Streaming 任务):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
-- 数据源表
Flink SQL> CREATE TABLE source_table (
user_id STRING,
price BIGINT,
`timestamp` bigint,
row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
watermark for row_time as row_time - interval '0' second
) WITH (
'connector' = 'socket',
'hostname' = 'node1',
'port' = '9999',
'format' = 'csv'
);

-- 数据处理逻辑
Flink SQL> SELECT user_id,
UNIX_TIMESTAMP(CAST(hop_start(row_time, interval '5' SECOND, interval '10' SECOND) AS STRING)) * 1000 as window_start,
sum(price) as sum_price
FROM source_table
GROUP BY user_id
, hop(row_time, interval '5' SECOND, interval '10' SECOND);

可以看到 Group Window Aggregation 滚动窗口的写法就是把 hop window 的声明写在了 group by 子句中,即 hop(row_time, interval ‘1’ minute, interval ‘5’ minute)。其中:

第一个参数为事件时间的时间戳;

第二个参数为滑动窗口的滑动步长;

第三个参数为滑动窗口大小。

  • Windowing TVF 方案(1.14只支持 Streaming 任务,1.15版本开始支持 Batch\Streaming 任务):
1
2
3
4
5
6
7
8
9
10
11
12
13
-- 数据处理逻辑
Flink SQL> SELECT
user_id,
UNIX_TIMESTAMP(CAST(window_start AS STRING)) * 1000 as window_start,
UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end,
sum(price) as sum_price
FROM TABLE(HOP(
TABLE source_table
, DESCRIPTOR(row_time)
, interval '5' SECOND, interval '10' SECOND))
GROUP BY window_start,
window_end,
user_id;

image-20230909150424047

在该模式下,窗口大小必须是滑动距离的整数倍

可以看到 Windowing TVF 滚动窗口的写法就是把 hop window 的声明写在了数据源的 Table 子句中,即 TABLE(HOP(TABLE source_table, DESCRIPTOR(row_time), INTERVAL ‘1’ MINUTES, INTERVAL ‘5’ MINUTES)),包含四部分参数:

第一个参数 TABLE source_table 声明数据源表;

第二个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;

第三个参数 INTERVAL ‘1’ MINUTES 声明滚动窗口滑动步长大小为 1 min。

第四个参数 INTERVAL ‘5’ MINUTES 声明滚动窗口大小为 5 min。

会话窗口

Session 窗口定义:Session 时间窗口和滚动、滑动窗口不一样,其没有固定的持续时间,如果在定义的间隔期(Session Gap)内没有新的数据出现,则 Session 就会窗口关闭。

image-20221224184500079

基于DataStream编程

Session window的窗口大小,则是由数据本身决定

1
2
3
4
5
DataStream input =
DataStream result = input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.seconds(<seconds>))
.apply(<window function>) // or reduce() or fold()

Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package io.github.windows;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.typeinfo.Types;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;


public class TimeSessionWindowDemo {
public static void main(String[] args) throws Exception {
// 1, 构建流处理运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 如果不设置并行度,默认为CPU的核数8,不设置并行度会有8个并行计算
env.setParallelism(1);

// 2, 从网络端口中获取数据
DataStreamSource<String> streamSource = env.socketTextStream("node1", 9999);

// 3, 对数据进行处理并提取时间戳(两步合为一步)
SingleOutputStreamOperator<Tuple3<String, Integer, Long>> watermarks = streamSource.flatMap((FlatMapFunction<String, Tuple3<String, Integer, Long>>) (value, out) -> out.collect(new Tuple3<>(
value.split(",")[0],
Integer.parseInt(value.split(",")[1]),
Long.parseLong(value.split(",")[2]))))
.returns(Types.TUPLE(Types.STRING, Types.INT, Types.LONG))
.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, Long>>() {
@Override
public long extractTimestamp(Tuple3 element, long recordTimestamp) {
return (long) element.f2 * 1000;
}
}));

// 4, 进行窗口计算
watermarks.keyBy(t -> t.f0)
.window(EventTimeSessionWindows.withGap(Time.seconds(3)))
.sum(1)
.print();
env.execute();
}
}

基于SQL编程

session(row_time, interval '5' minute)

其中:

第一个参数为事件时间的时间戳;

第二个参数为 Session gap 间隔。

使用标识函数选出窗口的起始时间或者结束时间,窗口的时间属性用于下级Window的聚合。


Flink SQL 不支持 Session 窗口的 Window TVF:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
-- 数据源表,用户购买行为记录表
Flink SQL> CREATE TABLE source_table (
user_id STRING,
price BIGINT,
`timestamp` bigint,
row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
watermark for row_time as row_time - interval '0' second
) WITH (
'connector' = 'socket',
'hostname' = 'node1',
'port' = '9999',
'format' = 'csv'
)

-- 数据处理逻辑
Flink SQL> SELECT
user_id,
UNIX_TIMESTAMP(CAST(session_start(row_time, interval '5' SECOND) AS STRING)) * 1000 as window_start,
sum(price) as sum_price
FROM source_table
GROUP BY user_id
, session(row_time, interval '5' SECOND)

其中:

第一个参数为事件时间的时间戳;

第二个参数为 Session gap 间隔。

SQL 任务是在整个 Session 窗口结束之后才会把数据输出。Session 窗口即支持 处理时间 也支持 事件时间。但是处理时间只支持在 Streaming 任务中运行,Batch 任务不支持。

Session gap 间隔是5s,实际上是不包含5s,大于5s才会触发计算

渐进窗口

渐进式窗口定义:渐进式窗口在其实就是 固定窗口间隔内提前触发的的滚动窗口,其实就是 Tumble Window + early-fire 的一个事件时间的版本。

image-20221224190117981

在流模式下,时间属性字段必须是事件或处理时间属性。
在批处理模式下,窗口表函数的时间属性字段必须是TIMESTAMPTIMESTAMP _LTZ类型的属性。CUMULATE的返回值是一个新的关系,它包括原始关系的所有列,以及另外3列“window_start”“window_end”“window_time”,以指示指定的窗口。原始时间属性“timecol”将是窗口TVF之后的常规时间戳列。
CUMULATE接受四个必需参数,一个可选参数:
CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)

  • data: 是一个表参数,可以是与时间属性列的任何关系
  • timecol:是一个列描述符,指示数据的哪些时间属性列应映射到累积窗口。
  • step: 是指定连续累积窗口结束之间增加的窗口大小的持续时间
  • size: 是指定累积窗口的最大宽度的持续时间。size必须是 的整数倍step。
  • offset: 是一个可选参数,用于指定窗口起始位置的偏移量。

渐进式窗口目前只有 Windowing TVF 方案(1.14 只支持 Streaming 任务,1.15版本开始支持 Batch\Streaming 任务):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
-- 数据源表
Flink SQL> CREATE TABLE source_table (
-- 用户 id
user_id BIGINT,
-- 用户
money BIGINT,
-- 事件时间戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 设置
WATERMARK FOR row_time AS row_time - INTERVAL '0' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.money.min' = '1',
'fields.money.max' = '100000'
);

-- 数据汇表
Flink SQL> CREATE TABLE sink_table (
window_end bigint,
window_start bigint,
sum_money BIGINT,
count_distinct_id bigint
) WITH (
'connector' = 'print'
);

-- 数据处理逻辑
Flink SQL> insert into sink_table
SELECT
UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end,
window_start,
sum(money) as sum_money,
count(distinct user_id) as count_distinct_id
FROM TABLE(CUMULATE(
TABLE source_table
, DESCRIPTOR(row_time)
, INTERVAL '60' SECOND
, INTERVAL '1' DAY))
GROUP BY
window_start,
window_end

其中包含四部分参数:

第一个参数 TABLE source_table 声明数据源表;

第二个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;

第三个参数 INTERVAL ‘60’ SECOND 声明渐进式窗口触发的渐进步长为 1 min。

第四个参数 INTERVAL ‘1’ DAY 声明整个渐进式窗口的大小为 1 天,到了第二天新开一个窗口重新累计。

image-20230909160216014

计数窗口

计数窗口支持滚动计数、滑动计数、全局计数窗口仅DataStreamAPI支持

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package com.itheima.flink.windows;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;

public class CountWindow {

public static void main(String[] args) throws Exception {
// 1、构建流处理运行环境
Configuration conf = new Configuration();
conf.setInteger("rest.port", 8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);

// 2、从网络端口获取数据
DataStreamSource<String> streamSource = env.socketTextStream("node1", 9999);

// 3、对数据进行处理
SingleOutputStreamOperator<Tuple3<String, Integer, Long>> tuple3 = streamSource.flatMap(new FlatMapFunction<String, Tuple3<String, Integer, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple3<String, Integer, Long>> out) throws Exception {
String[] strings = value.split(",");
Tuple3<String, Integer, Long> tuple3 = new Tuple3<>(strings[0], Integer.valueOf(strings[1]), Long.parseLong(strings[2]));
out.collect(tuple3);
}
});

// 4、进行窗口计算

// 滚动计数窗口
tuple3.keyBy(t -> t.f0)
// 每10条数据进行一次窗口计算
.countWindow(10)
.sum(1)
.print("滚动计数窗口");
// 滑动计数窗口
tuple3.keyBy(t -> t.f0)
// 每来5条数据,把当前10条数据作为一个窗口进行计算
.countWindow(10, 5)
.sum(1)
.print("滑动计数窗口");

// 全局计数窗口
tuple3.keyBy(t -> t.f0)
// 使用全局窗口,必须自行定义触发器才能实现窗口计算
.window(GlobalWindows.create())
.trigger(new Trigger<Tuple3<String, Integer, Long>, GlobalWindow>() {
@Override
public TriggerResult onElement(Tuple3<String, Integer, Long> element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
return null;
}

@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return null;
}

@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return null;
}

@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {

}
})
.sum(1)
.print();


env.execute();
}

}

增量窗口函数

为了提高实时性,会将到达窗口数据提前进行计算,将计算结果保存至状态中,当窗口达到结束条件时,直接将结果从状态中取出输出。

  • 归约函数:ReduceFunction
  • 聚合函数:AggregateFunction
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.itheima.flink.windows;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class ReduceWindowFunc {

public static void main(String[] args) throws Exception {
// 1、构建流处理运行环境
Configuration conf = new Configuration();
conf.setInteger("rest.port", 8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);

// 2、从网络端口获取数据
DataStreamSource<String> streamSource = env.socketTextStream("node1", 9999);

// 3、对数据进行处理
SingleOutputStreamOperator<Tuple3<String, Integer, Long>> tuple3 = streamSource.flatMap(new FlatMapFunction<String, Tuple3<String, Integer, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple3<String, Integer, Long>> out) throws Exception {
String[] strings = value.split(",");
Tuple3<String, Integer, Long> tuple3 = new Tuple3<>(strings[0], Integer.valueOf(strings[1]), Long.parseLong(strings[2]));
out.collect(tuple3);
}
});

// 4、进行窗口计算
tuple3.keyBy(t -> t.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce(new ReduceFunction<Tuple3<String, Integer, Long>>() {
/**
* @param value1 The first value to combine.
* @param value2 The second value to combine.
*/
@Override
public Tuple3<String, Integer, Long> reduce(Tuple3<String, Integer, Long> value1, Tuple3<String, Integer, Long> value2) throws Exception {
return Tuple3.of(value1.f0, value1.f1 - value2.f1, value2.f2);
}
}).print();

env.execute();
}

}

全量窗口函数

全量窗口函数会先将窗口中的数据缓存起来,等到窗口结束要输出结果时再取出全部进行计算。(不常用)

  • WindowFunction-使用apply算子实现
  • ProcessWindowFunction-使用process算子实现(推荐)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package com.itheima.flink.windows;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.table.planner.expressions.In;
import org.apache.flink.util.Collector;

public class FullWindowFunc {

public static void main(String[] args) throws Exception {
// 1、构建流处理运行环境
Configuration conf = new Configuration();
conf.setInteger("rest.port", 8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);

// 2、从网络端口获取数据
DataStreamSource<String> streamSource = env.socketTextStream("node1", 9999);

// 3、对数据进行处理
SingleOutputStreamOperator<Tuple3<String, Integer, Long>> tuple3 = streamSource.flatMap(new FlatMapFunction<String, Tuple3<String, Integer, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple3<String, Integer, Long>> out) throws Exception {
String[] strings = value.split(",");
Tuple3<String, Integer, Long> tuple3 = new Tuple3<>(strings[0], Integer.valueOf(strings[1]), Long.parseLong(strings[2]));
out.collect(tuple3);
}
});

// 4、进行窗口计算
tuple3.keyBy(t -> t.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.apply(new WindowFunction<Tuple3<String, Integer, Long>, Tuple3<String, Integer, Long>, String, TimeWindow>() {
/**
* @param key 当前被计算窗口的key
* @param window 当前的窗口.
* @param input 当前窗口中的数据
* @param out 计算结果的收集器
* @throws Exception
*/
@Override
public void apply(String key, TimeWindow window, Iterable<Tuple3<String, Integer, Long>> input, Collector<Tuple3<String, Integer, Long>> out) throws Exception {
System.out.println("当前被计算窗口的key: " + key);
System.out.println("当前的窗口: " + window.toString());
// input.forEach(System.out::println);
input.forEach(out::collect);
}
}).print();

tuple3.keyBy(t -> t.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<Tuple3<String, Integer, Long>, Tuple3<String, Integer, Long>, String, TimeWindow>() {
/**
* @param key The key for which this window is evaluated.
* @param context 当前计算窗口的上下文环境
* @param elements The elements in the window being evaluated.
* @param out A collector for emitting elements.
*/
@Override
public void process(String key, ProcessWindowFunction<Tuple3<String, Integer, Long>, Tuple3<String, Integer, Long>, String, TimeWindow>.Context context, Iterable<Tuple3<String, Integer, Long>> elements, Collector<Tuple3<String, Integer, Long>> out) throws Exception {
System.out.println("当前被计算窗口的key: " + key);
System.out.println("当前窗口处理时间: " + context.currentProcessingTime());
elements.forEach(out::collect);
}
}).print();

env.execute();
}

}

Over Windows

Over 聚合定义(支持 Batch\Streaming):可以理解为是一种特殊的滑动窗口聚合函数。

  • 窗口聚合:不在 group by 中的字段,不能直接在 select 中拿到
  • Over 聚合:能够保留原始字段

Over 聚合的语法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
SELECT
agg_func(agg_col) OVER (
[PARTITION BY col1[, col2, ...]]
ORDER BY time_col
range_definition),
...
FROM ...
-- 示例
SELECT order_id, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM Orders
  • ORDER BY:必须是时间戳列(事件时间、处理时间)
  • PARTITION BY:标识了聚合窗口的聚合粒度,如上述案例是按照 product 进行聚合
  • range_definition:这个标识聚合窗口的聚合数据范围,在 Flink 中有两种指定数据范围的方式。第一种为 按照行数聚合,第二种为 按照时间区间聚合。