粥所周知, Flink程序由四部分构成, 运行环境 + Source + Transformation + Sink.

接下来一个个说.

运行环境

批处理

  • 获取批处理执行环境(用于测试/生产)

    1
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

流处理

  • 获取流式处理执行环境(用于测试/生产)

    1
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

流批一体

  • 获取流批一体处理执行环境(用于测试/生产)

    1
    2
    3
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 设置为批处理模式
    env.setRuntimeMode(ExecutionMode.BATCH);

本地环境

  • 创建本地执行环境(用于测试)

    1
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

带有WebUI的本地环境

  • 创建带有webui的本地执行环境(用于测试)

    1
    2
    3
    Configuration conf = new Configuration();
    conf.setInteger("rest.port",8081);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

执行模式

从Flink1.12.0版本起,Flink实现了API上的流批统一。DataStreamAPI新增了一个重要特性可以支持不同的执行模式,通过简单的设置就可以让一段Flink程序在流处理和批处理之间切换。这样一来,DataSet API也就没有存在的必要了。
执行模式的分类:

  • 流执行(STREAMING)模式。
    STREAMING模式是DataStream API最经典的模式,一般用于需要持续实时处理的无界数据流。在默认情况下,程序使用的就是STREAMING模式。
  • 批执行(BATCH)模式。
    BATCH模式是专门用于批处理的执行模式,在这种模式下,Flink处理作业的方式类似于MapReduce框架。对于不会持续计算的有界数据,用这种模式处理会更方便。
  • 自动(AUTOMATIC)模式。
    在AUTOMATIC模式下,将由程序根据输入数据源是否有界来自动选择执行模式。

执行模式的配置方法

(以BATCH为例,默认是STREAMING模式):

1
2
# 通过命令行配置(在提交作业时,增加execution.runtime-mode参数,进行指定)
bin/flink run -Dexecution.runtime-mode=BATCH ...
1
2
3
// 通过代码配置(在代码中调用setRuntimeMode方法指定)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment ();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

建议:不要在代码中配置,而是使用命令行。这同设置并行度是类似的在提交作业时指定参数可以更加灵活,同一段应用在程序写好之后,既可以用于批处理,又可以用于流处理而在代码中进行硬编码的方式的可扩展性比较差,一般都不推荐。

输入算子Source

输入算子总结Summary

单并行算子如果显式设置>1的并行度,会抛异常

  • 使用env.fromElements(),这种方式也支持Tuple,自定义对象等复合形式
    • DataStreamSource <String> words = env.fromElements(“flink”, “hadoop”, “flink”);
  • 使用env.fromCollection(),这种方式支持多种Collection的具体类型,如List,Set,Queue
    • 非并行的Source(是一个单并行度的source算子),可以将一个Collection作为参数传入到该方法中,返回一个DataStreamSource。
      • List <String> dataList = Arrays.asList(“a”, “b”, “a”, “c”);
  • 使用env.fromParallelCollection所返回的source算子,是一个多并行度的source算子
1
DataStreamSource<LongValue> parallelCollection = env.fromParallelCollection(new LongValueSequenceIterator(1, 100), TypeInformation.of(LongValue.class)).setParallelism(2);
  • 使用env.generateSequence()方法创建基于Sequence的DataStream

    • 并行的Source(并行度也可以通过调用该方法后,再调用setParallelism来设置)通过指定的起始值和结束值来生成数据序列流;
      • DataStreamSource <Long> sequence = env.generateSequence(1, 100);
  • 使用env.fromSequence()方法创建基于开始和结束的DataStream

    • final DataStreamSource sequence2 = env.fromSequence(1L, 5L);

Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 使用env.fromElements(),这种方式也支持Tuple,自定义对象等复合形式
DataStreamSource<String> words = env.fromElements("flink", "hadoop", "flink");

// 使用env.fromCollection(),这种方式支持多种Collection的具体类型,如List,Set,Queue
// 非并行的Source,可以将一个Collection作为参数传入到该方法中,返回一个DataStreamSource。
List<String> dataList = Arrays.asList("a", "b", "a", "c");
// fromCollection方法所返回的source算子,是一个单并行度的source算子
DataStreamSource<String> fromCollection = env.fromCollection(dataList)/*.setParallelism(5)*/; // 单并行算子如果显式设置>1的并行度,会抛异常

// fromParallelCollection所返回的source算子,是一个多并行度的source算子
DataStreamSource<LongValue> parallelCollection = env.fromParallelCollection(new LongValueSequenceIterator(1, 100), TypeInformation.of(LongValue.class)).setParallelism(2);

// 使用env.generateSequence()方法创建基于Sequence的DataStream
// 并行的Source(并行度也可以通过调用该方法后,再调用setParallelism来设置)通过指定的起始值和结束值来生成数据序列流;
DataStreamSource<Long> sequence = env.generateSequence(1, 100);
sequence.map(x -> x - 1)/*.print()*/;

// 使用env.fromSequence()方法创建基于开始和结束的DataStream
final DataStreamSource<Long> sequence2 = env.fromSequence(1L, 5L);
sequence2.map(x -> x - 1)/*.print()*/;

基于集合的Source

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package io.github.sourceOperator;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;
import java.util.List;

public class CollectionSource {
public static void main(String[] args) throws Exception {
// 1, 创建带有WebUi的运行环境
Configuration conf = new Configuration();
conf.setInteger("rest.port", 8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

// 2, 构建source算子从集合中获取数据
List<String> list = Arrays.asList("a", "b", "c", "d");
// 创建单并行度集合数据源
DataStreamSource<String> listDS = env.fromCollection(list);

// 3, 处理数据 - 将每个单词都转为大写
listDS.map((MapFunction<String, String>) value -> value.toUpperCase()).print();

// 4, 提交执行
env.execute();

}
}

基于Socket的Source

非并行的Source,DataStream API 支持从 Socket 套接字读取数据。只需要指定要从其中读取数据的主机和端口号即可。读取 Socket 套接字的数据源函数定义如下:

  • socketTextStream(hostName, port)
  • socketTextStream(hostName, port, delimiter):可指定分隔符,默认行分隔符是”\n”
  • socketTextStream(hostName, port, delimiter, maxRetry):还可以指定 API 应该尝试获取数据的最大次数,默认最大重新连接次数为0。

基于文件的Source

基于文件的Source,本质上就是使用指定的FileInputFormat组件读取数据,可以指定TextInputFormat、CsvInputFormat、BinaryInputFormat等格式;底层都是ContinuousFileMonitoringFunction,这个类继承了RichSourceFunction,都是非并行的Source;

  • readFile(FileInputFormat inputFormat, String filePath) 方法可以指定读取文件的FileInputFormat 格式,参数FileProcessingMode,可取值:
    • PROCESS_ONCE,只读取文件中的数据一次,读取完成后,程序退出
    • PROCESS_CONTINUOUSLY,会一直监听指定的文件,文件的内容发生变化后,会将以前的内容和新的内容全部都读取出来,进而造成数据重复读取。
  • readTextFile(String filePath) 可以从指定的目录或文件读取数据,默认使用的是TextInputFormat格式读取数据,还有一个重载的方法readTextFile(String filePath, String charsetName)可以传入读取文件指定的字符集,默认是UTF-8编码。该方法是一个有限的数据源,数据读完后,程序就会退出,不能一直运行。该方法底层调用的是readFile方法,FileProcessingMode为PROCESS_ONCE
  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 这是前两个方法内部调用的方法。它基于给定的 fileInputFormat 读取路径 path 上的文件。根据提供的 watchType 的不同,source 可能定期(每 interval 毫秒)监控路径上的新数据(watchType 为 FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次当前路径中的数据然后退出(watchType 为 FileProcessingMode.PROCESS_ONCE)。使用 pathFilter,用户可以进一步排除正在处理的文件。

第三方 Source(Kafka举例)

在实际生产环境中,为了保证flink可以高效地读取数据源中的数据,通常是跟一些分布式消息中件结合使用,例如Apache Kafka。Kafka的特点是分布式、多副本、高可用、高吞吐、可以记录偏移量等。Flink和Kafka整合可以高效的读取数据,并且可以保证Exactly Once(精确一次性语义)。

  • 添加依赖
1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
  • Kafka Source 提供了构建类来创建 KafkaSource 的实例。构建 KafkaSource 来消费 “input-topic” 最早位点的数据, 使用消费组 “my-group”,并且将 Kafka 消息体反序列化为字符串:
1
2
3
4
5
6
7
8
9
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("input-topic")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
  • 以下属性在构建 KafkaSource 时是必须指定的:
  • Bootstrap server,通过 setBootstrapServers(String) 方法配置
  • 消费者组 ID,通过 setGroupId(String) 配置
  • 要订阅的 Topic / Partition,
  • 用于解析 Kafka 消息的反序列化器(Deserializer)
  • Topic / Partition订阅
1
2
3
4
5
6
7
8
9
// Topic 列表,订阅 Topic 列表中所有 Partition 的消息:
KafkaSource.builder().setTopics("topic-a", "topic-b");
// 正则表达式匹配,订阅与正则表达式所匹配的 Topic 下的所有 Partition:
KafkaSource.builder().setTopicPattern("topic.*");
// Partition 列表,订阅指定的 Partition:
final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
new TopicPartition("topic-a", 0), // Partition 0 of topic "topic-a"
new TopicPartition("topic-b", 5))); // Partition 5 of topic "topic-b"
KafkaSource.builder().setPartitions(partitionSet);
  • 消费解析
1
2
3
4
// 代码中需要提供一个反序列化器(Deserializer)来对Kafka的消息进行解析。 反序列化器通过setDeserializer(KafkaRecordDeserializationSchema)来指定,其中 KafkaRecordDeserializationSchema 定义了如何解析Kafka的ConsumerRecord。
// 如果只需要 Kafka 消息中的消息体(value)部分的数据,可以使用 KafkaSource 构建类中的 setValueOnlyDeserializer(DeserializationSchema) 方法,其中 DeserializationSchema 定义了如何解析 Kafka 消息体中的二进制数据。
// 也可使用 Kafka 提供的解析器 来解析 Kafka 消息体。例如使用 StringDeserializer 来将 Kafka 消息体解析成字符串:
KafkaSource.<String>builder() .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
  • 起始偏移量
1
2
3
4
5
6
7
8
9
10
11
KafkaSource.builder()
// 从消费组提交的位点开始消费,不指定位点重置策略
.setStartingOffsets(OffsetsInitializer.committedOffsets())
// 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
// 从时间戳大于等于指定时间的数据开始消费
.setStartingOffsets(OffsetsInitializer.timestamp(1592323200L))
// 从最早位点开始消费
.setStartingOffsets(OffsetsInitializer.earliest())
// 从最末尾位点开始消费
.setStartingOffsets(OffsetsInitializer.latest());
  • 动态分区检查
1
2
3
4
// 为了在不重启 Flink 作业的情况下处理 Topic 扩容或新建 Topic 等场景,可以将 Kafka Source 配置为在提供的 Topic / Partition 订阅模式下定期检查新分区。要启用动态分区检查,请将 partition.discovery.interval.ms 设置为非负值:
// 分区检查功能默认不开启。需要显式地设置分区检查间隔才能启用此功能。
KafkaSource.builder()
.setProperty("partition.discovery.interval.ms", "10000"); // 每 10 秒检查一次新分区

CodeDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package io.github.sourceOperator;

import javafx.beans.property.SimpleStringProperty;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.parquet.format.OffsetIndex;

public class KafkaSourceDemo {
public static void main(String[] args) throws Exception {
// 1, 构建带有WebUI的运行环境
Configuration conf = new Configuration();
conf.setInteger("rest.port", 8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

// 2, 构建Kafka数据源Source
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("node1:9092")
.setTopics("test")
.setGroupId("test1")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperty("auto.offset.commit", "true")
.build();
DataStreamSource<String> kafkaDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource");

// 3, 处理数据
kafkaDS.print();

// 4, 提交执行
env.execute();

}
}

自定义Source(MySQL举例)

1
2
3
4
5
6
// 可以实现   SourceFunction  或者 RichSourceFunction , 这两者都是非并行的source算子
// 也可实现 ParallelSourceFunction 或者 RichParallelSourceFunction , 这两者都是可并行的source算子
// 带 Rich的,都拥有 open() ,close() ,getRuntimeContext() 方法
// 带 Parallel的,都可多实例并行执行

// 都需要实现的方法:run(),作为数据源,所有数据的产生都在 run() 方法中实现

上面已经使用了自定义数据源和Flink自带的Kafka source,那么接下来就模仿着写一个从 MySQL 中读取数据的 Source。

  • 建表语句
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
create database if not exists flinkdemo;
use flinkdemo;
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
`id` int(11) NOT NULL,
`username` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;

-- ----------------------------
-- Records of user
-- ----------------------------
INSERT INTO `user` VALUES (10, 'dazhuang', '123456', '大壮');
INSERT INTO `user` VALUES (11, 'erya', '123456', '二丫');
INSERT INTO `user` VALUES (12, 'sanpang', '123456', '三胖');

SET FOREIGN_KEY_CHECKS = 1;
  • 添加依赖
1
2
3
4
5
6
<!-- 指定mysql-connector的依赖 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>

CodeDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package io.github.sourceOperator;

import com.alibaba.fastjson2.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class CustomSource {
public static void main(String[] args) throws Exception {
/**
* 自定义source读取MySQL中的数据
*/
// 1,创建Flink运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 设置并行度
env.setParallelism(1);
// 2, 创建Source算子
DataStreamSource<UserInfo> dataStreamSource = env.addSource(new RichParallelSourceFunction<UserInfo>() {
/**
* 重写RichParallelSourceFunction中的 open, close方法
* @param parameters
* @throws Exception
*/
private Connection connection = null; // 定义数据库连接对象
private PreparedStatement stat = null; // 定义preparedStatement对象

@Override
public void open(Configuration parameters) throws Exception {
/**
* 使用open方法, 这个方法在实例化类的时候会执行一次, 比较适合用来做数据库连接
* open需要 1, 加载数据库驱动 2, 创建数据库连接 3, 准备preparedStatement对象
*/
super.open(parameters);
// 1, 加载数据库驱动
Class.forName("com.mysql.jdbc.Driver");

// 2, 创建数据库连接
String url = "jdbc:mysql://node1:3306/flinkdemo?serverTimezone=UTC&useSSL=false";
this.connection = DriverManager.getConnection(url, "root", "123456");

// 3, 创建预编译平台对象
String sql = "select id,username,password,name from user;";
this.stat = connection.prepareStatement(sql);
}

@Override
public void close() throws Exception {
/**
* 使用close方法, 这个方法在销毁实例的时候会执行一次, 比较实用用来关闭连接
*/
super.close();
if (this.stat != null) this.stat.close();
if (this.connection != null) this.connection.close();
}

@Override
public void run(SourceContext<UserInfo> ctx) throws Exception {
// 执行查询SQL语句, 获取结果集
ResultSet resultSet = stat.executeQuery();
while (resultSet.next()){
ctx.collect(new UserInfo(
resultSet.getString("id"),
resultSet.getString("userName"),
resultSet.getString("passWord"),
resultSet.getString("name")
));
}
}

@Override
public void cancel() {

}
}).setParallelism(2);

// 3,处理数据 - 将对象转为Json字符串并打印
dataStreamSource.map(JSON::toJSONString).print();

// 4, 提交执行
env.execute();
}

@Data
@AllArgsConstructor
@NoArgsConstructor
private static class UserInfo{
private String id;
private String userName;
private String passWord;
private String name;
}
}

转换算子Transformation

转换算子

Flink中的算子,是对DataStream进行操作,返回一个新的DataStream的过程。Transformation 过程,是将一个或多个DataStream 转换为新的 DataStream,可以将多个转换组合成复杂的数据流拓扑。

Transformation:指数据转换的各种操作。有 map / flatMap / filter / keyBy / reduce / fold / aggregations / window / windowAll / union / window join / split / select / project 等,可以将数据转换计算成你想要的数据。

map映射:DataStream→DataStream

DataStream.map(MapFunction<T, R> mapper)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* @param <T> 输入元素的数据类型.
* @param <O> 输出元素的数据类型.
*/
@Public
@FunctionalInterface
public interface MapFunction<T, O> extends Function, Serializable {

/**
* The mapping method. Takes an element from the input data set and transforms it into exactly
* one element.
*
* @param 输入元素的数据类型.
* @return 输出元素的数据类型.
* @throws Exception This method may throw exceptions. Throwing an exception will cause the
* operation to fail and may trigger recovery.
*/
O map(T value) throws Exception;
}

flatmap扁平化映射:DataStream→DataStream

DataStream.flatMap(FlatMapFunction<T, R> flatMapper)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* @param <T> 输入元素的数据类型.
* @param <O> 输出元素的数据类型.
*/
@Public
@FunctionalInterface
public interface FlatMapFunction<T, O> extends Function, Serializable {

/**
* The core method of the FlatMapFunction. Takes an element from the input data set and
* transforms it into zero, one, or more elements.
*
* @param 输入元素的数据类型.
* @param out 返回元素收集器
* @throws Exception This method may throw exceptions. Throwing an exception will cause the
* operation to fail and may trigger recovery.
*/
void flatMap(T value, Collector<O> out) throws Exception;
}

filter过滤:DataStream→DataStream

DataStream.filter(FilterFunction filter)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* @param <T> 输入元素的数据类型
*/
@Public
@FunctionalInterface
public interface FilterFunction<T> extends Function, Serializable {

/**
* The filter function that evaluates the predicate.
*
* <p><strong>IMPORTANT:</strong> The system assumes that the function does not modify the
* elements on which the predicate is applied. Violating this assumption can lead to incorrect
* results.
*
* @param value 输入元素的数据类型
* @return 返回值如果为True该元素就保留,返回值如果为False该就过滤掉
* @throws Exception This method may throw exceptions. Throwing an exception will cause the
* operation to fail and may trigger recovery.
*/
boolean filter(T value) throws Exception;
}

keyBy按key分组:DataStream→KeyedStream

DataStream.keyBy(KeySelector<T, K> key)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/**
* @param <IN> 输入元素,并从该元素中抽取key
* @param <KEY> key的数据类型.
*/
@Public
@FunctionalInterface
public interface KeySelector<IN, KEY> extends Function, Serializable {

/**
* @param value The object to get the key from.
* @return The extracted key.
* @throws Exception Throwing an exception will cause the execution of the respective task to
* fail, and trigger recovery or cancellation of the program.
*/
KEY getKey(IN value) throws Exception;
}

/***************** 单个字段keyBy ********************/
//用字段位置(已过期)
wordAndOne.keyBy(0)

//用字段表达式
wordAndOne.keyBy(v -> v.f0)

/***************** 多个字段keyBy ********************/
//用字段位置(已过期)
wordAndOne.keyBy(0, 1);

//用KeySelector
wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> getKey(Tuple2<String, Integer> value) throws Exception {
return Tuple2.of(value.f0, value.f1);
}
});
// 用lambda简化
wordAndOne.keyBy(
(KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>) value ->
Tuple2.of(value.f0, value.f1)
);


/***************** POJO方式 ********************/
public class PeopleCount {
private String province;
private String city;
private Integer counts;
public PeopleCount() {
}
//省略其他代码。。。
}
// 单个字段
source.keyBy(a -> a.getProvince());
source.keyBy(PeopleCount::getProvince);
// 多个字段
source.keyBy(new KeySelector<PeopleCount, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> getKey(PeopleCount value) throws Exception {
return Tuple2.of(value.getProvince(), value.getCity());
}
});

map.keyBy(
(KeySelector<PeopleCount, Tuple2<String, String>>) value ->
Tuple2.of(value.getProvince(), value.getCity())
);

简单聚合:KeyedStream→DataStream

  • sum():在输入流上,对指定的字段做叠加求和的操作。
  • min():在输入流上,对指定的字段求最小值。
  • max():在输入流上,对指定的字段求最大值。
  • minBy():与 min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而 minBy()则会返回包含字段最小值的整条数据。
  • maxBy():与 max()类似,在输入流上针对指定字段求最大值。两者区别与min()/minBy()完全一致。

reduce归约:KeyedStream→DataSream

KeyedStream.reduce(ReduceFunction reducer)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* @param <T> 输入元素的数据类型.
*/
@Public
@FunctionalInterface
public interface ReduceFunction<T> extends Function, Serializable {

/**
* The core method of ReduceFunction, combining two values into one value of the same type. The
* reduce function is consecutively applied to all values of a group until only a single value
* remains.
*
* @param value1 The first value to combine.
* @param value2 The second value to combine.
* @return The combined value of both input values.
* @throws Exception This method may throw exceptions. Throwing an exception will cause the
* operation to fail and may trigger recovery.
*/
T reduce(T value1, T value2) throws Exception;
}

转换算子Operator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package io.github.transformation;

import io.github.pojo.Order;
import org.apache.avro.data.Json;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.alibaba.fastjson2.JSON;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import scala.math.BigInt;


import java.util.Arrays;

public class KeyByOperator {
public static void main(String[] args) throws Exception {
// 1, 构建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 2, 构建source算子获取数据
DataStreamSource<String> streamSource = env.readTextFile("/Users/Liguibin/Desktop/opt/Java/FlinkCode/src/main/resources/order.csv");

// 3-1, 将获取到的数据封装至pojo中, 输入的数据是String类型, 返回的数据是Order对象
SingleOutputStreamOperator<Order> orderDS = streamSource.flatMap(
(FlatMapFunction<String, Order>) (value, out) -> out.collect(new Order(
value.split(",")[0],
BigInt.apply(value.split(",")[1]),
Double.parseDouble(value.split(",")[2]),
value.split(",")[3])))
.returns(Order.class);
// 3-2 将获取到的数据封装至四元组中
/**
* 如果将数据切分封装到元组中, 需要重写flatMap方法, out.collect(new Tuple<>(元组中四个元素))
* 如果FLatMap写成为Lambda表达式, 后面必须要跟returns(Types.Tuple(Types.字段类型 ....))
*/
SingleOutputStreamOperator<Tuple4<String, BigInt, Double, String>> tuple4DS = streamSource.flatMap(new FlatMapFunction<String, Tuple4<String, BigInt, Double, String>>() {
@Override
public void flatMap(String value, Collector<Tuple4<String, BigInt, Double, String>> out) throws Exception {
out.collect(new Tuple4<>(
value.split(",")[0],
BigInt.apply(value.split(",")[1]),
Double.parseDouble(value.split(",")[2]),
value.split(",")[3]));
}
});

// 4, 使用keyBy算子分发数据(按照用户的id进行分发)
// KeyedStream<Tuple4<String, BigInt, Double, String>, Tuple> keyedStream1 = tupleDS.keyBy(0);
// KeyedStream<Tuple4<String, BigInt, Double, String>, String> keyedStream1 = tuple4DS.keyBy(t -> t.f0);
// KeyedStream<Order, String> keyedStream1 = orderDS.keyBy(o -> o.getUserId());
KeyedStream<Order, String> keyedStream = orderDS.keyBy(Order::getUserId);

// 5-1 sum 求金额求和
// keyedStream.sum("money").print("sum求金额总和");
// 5-2 min求每个用户最小的消费金额
// keyedStream.minBy("money").print("min求每个用户最小的消费金额");
// 5-3 使用Reduce规约计算 (求金额的最小值 )
// keyedStream.reduce(new ReduceFunction<Order>() {
// @Override
// public Order reduce(Order value1, Order value2) throws Exception {
// if (value1.getMoney() > value2.getMoney())return value2;
// else return value1;
// }
// }).print("使用Reduce规约计算");
// 5-4 使用stream processAPI
keyedStream.process(new ProcessFunction<Order, String>() {
/**
* order 是入参, String 是返回值参数的类型
* @param value
* @param ctx 上下文运行环境, Context 背景语境上下文
* @param out
* @throws Exception
*/
@Override
public void processElement(Order value, Context ctx, Collector<String> out) throws Exception {
System.out.println(ctx.timestamp()); // 没获取到时间后续再说(略,服了
System.out.println(JSON.toJSONString(value)); // 将一个个传入的对象打印为Json字符串
}
});

// 6, 提交任务并执行
env.execute();
}
}

物理分区

GLOBAL分区

GlobalPartitioner 分区器会将上游所有元素都发送到下游的第一个算子实例上(SubTask Id = 0):

1
2
3
4
@PublicEvolving
public DataStream<T> global() {
return setConnectionType(new GlobalPartitioner<T>());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Partitioner that sends all elements to the downstream operator with subtask ID=0.
* 分区器将所有上游元素发送到下游ID=0(第一个)的算子中
* @param <T> Type of the elements in the Stream being partitioned
*/
@Internal
public class GlobalPartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L;

@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return 0;
}

@Override
public StreamPartitioner<T> copy() {
return this;
}

@Override
public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
return SubtaskStateMapper.FIRST;
}

@Override
public boolean isPointwise() {
return false;
}

@Override
public String toString() {
return "GLOBAL";
}
}

FORWARD分区

与 GlobalPartitioner 实现相同,但它只会将数据输出到本地运行的下游算子的第一个实例,而非全局。在上下游的算子没有指定分区器的情况下,如果上下游的算子并行度一致,则使用ForwardPartitioner,否则使用 RebalancePartitioner,对于ForwardPartitioner,必须保证上下游算子并行度一致,否则会抛出异常。

1
2
3
4
5
6
7
8
9
/**
* Sets the partitioning of the {@link DataStream} so that the output elements are forwarded to
* the local subtask of the next operation.
*
* @return The DataStream with forward partitioning set.
*/
public DataStream<T> forward() {
return setConnectionType(new ForwardPartitioner<T>());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* Partitioner that forwards elements only to the locally running downstream operation.
* 分区器将元素发送至本地运行的下游算子,上游与下游并行度必须一致
* @param <T> Type of the elements in the Stream
*/
@Internal
public class ForwardPartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L;

@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return 0;
}

public StreamPartitioner<T> copy() {
return this;
}

@Override
public boolean isPointwise() {
return true;
}

@Override
public String toString() {
return "FORWARD";
}

@Override
public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
return SubtaskStateMapper.UNSUPPORTED;
}

@Override
public SubtaskStateMapper getUpstreamSubtaskStateMapper() {
return SubtaskStateMapper.UNSUPPORTED;
}
}

BROADCAST分区

广播分区将上游数据集输出到下游Operator的每个实例中。适合于大数据集Join小数据集的场景。

1
2
3
4
5
6
7
8
9
/**
* Sets the partitioning of the {@link DataStream} so that the output elements are broadcasted
* to every parallel instance of the next operation.
* 将上游所有元素复制多份发送至下游的所有算子中
* @return The DataStream with broadcast partitioning set.
*/
public DataStream<T> broadcast() {
return setConnectionType(new BroadcastPartitioner<T>());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* Partitioner that selects all the output channels.
*
* @param <T> Type of the elements in the Stream being broadcast
*/
@Internal
public class BroadcastPartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L;

/**
* Note: Broadcast mode could be handled directly for all the output channels in record writer,
* so it is no need to select channels via this method.
*/
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
throw new UnsupportedOperationException(
"Broadcast partitioner does not support select channels.");
}

@Override
public SubtaskStateMapper getUpstreamSubtaskStateMapper() {
return SubtaskStateMapper.UNSUPPORTED;
}

@Override
public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
return SubtaskStateMapper.UNSUPPORTED;
}

@Override
public boolean isBroadcast() {
return true;
}

@Override
public StreamPartitioner<T> copy() {
return this;
}

@Override
public boolean isPointwise() {
return false;
}

@Override
public String toString() {
return "BROADCAST";
}
}

SHUFFLE分区

随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区,因为是完全随机的,所以对于同样的输入数据, 每次执行得到的结果也不会相同。

1
2
3
4
5
6
7
8
9
10
/**
* Sets the partitioning of the {@link DataStream} so that the output elements are shuffled
* uniformly randomly to the next operation.
* 将上游算子中的所有元素随机发送至下游算子中
* @return The DataStream with shuffle partitioning set.
*/
@PublicEvolving
public DataStream<T> shuffle() {
return setConnectionType(new ShufflePartitioner<T>());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* Partitioner that distributes the data equally by selecting one output channel randomly.
*
* @param <T> Type of the Tuple
*/
@Internal
public class ShufflePartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L;

private Random random = new Random();

@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return random.nextInt(numberOfChannels);
}

@Override
public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
return SubtaskStateMapper.ROUND_ROBIN;
}

@Override
public StreamPartitioner<T> copy() {
return new ShufflePartitioner<T>();
}

@Override
public boolean isPointwise() {
return false;
}

@Override
public String toString() {
return "SHUFFLE";
}
}

REBALANCE分区

基于上下游算子的并行度,将元素循环的分配到下游算子的某几个实例上。以上游算子并行度为 2,而下游算子并行度为 4 为例,当使用 RebalancePartitioner时,上游每个实例会轮询发给下游的 4 个实例。但是当使用 RescalePartitioner 时,上游每个实例只需轮询发给下游 2 个实例。因为 Channel 个数变少了,Subpartition 的 Buffer 填充速度能变快,能提高网络效率。当上游的数据比较均匀时,且上下游的并发数成比例时,可以使用 RescalePartitioner 替换 RebalancePartitioner。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Sets the partitioning of the {@link DataStream} so that the output elements are distributed
* evenly to a subset of instances of the next operation in a round-robin fashion.
*
* <p>The subset of downstream operations to which the upstream operation sends elements depends
* on the degree of parallelism of both the upstream and downstream operation. For example, if
* the upstream operation has parallelism 2 and the downstream operation has parallelism 4, then
* one upstream operation would distribute elements to two downstream operations while the other
* upstream operation would distribute to the other two downstream operations. If, on the other
* hand, the downstream operation has parallelism 2 while the upstream operation has parallelism
* 4 then two upstream operations will distribute to one downstream operation while the other
* two upstream operations will distribute to the other downstream operations.
*
* <p>In cases where the different parallelisms are not multiples of each other one or several
* downstream operations will have a differing number of inputs from upstream operations.
*
* @return The DataStream with rescale partitioning set.
*/
@PublicEvolving
public DataStream<T> rescale() {
return setConnectionType(new RescalePartitioner<T>());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* Partitioner that distributes the data equally by cycling through the output channels. This
* distributes only to a subset of downstream nodes because {@link
* org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator} instantiates a {@link
* DistributionPattern#POINTWISE} distribution pattern when encountering {@code RescalePartitioner}.
*
* <p>The subset of downstream operations to which the upstream operation sends elements depends on
* the degree of parallelism of both the upstream and downstream operation. For example, if the
* upstream operation has parallelism 2 and the downstream operation has parallelism 4, then one
* upstream operation would distribute elements to two downstream operations while the other
* upstream operation would distribute to the other two downstream operations. If, on the other
* hand, the downstream operation has parallelism 2 while the upstream operation has parallelism 4
* then two upstream operations will distribute to one downstream operation while the other two
* upstream operations will distribute to the other downstream operations.
*
* <p>In cases where the different parallelisms are not multiples of each other one or several
* downstream operations will have a differing number of inputs from upstream operations.
*
* @param <T> Type of the elements in the Stream being rescaled
*/
@Internal
public class RescalePartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L;

private int nextChannelToSendTo = -1;

@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
if (++nextChannelToSendTo >= numberOfChannels) {
nextChannelToSendTo = 0;
}
return nextChannelToSendTo;
}

@Override
public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
return SubtaskStateMapper.UNSUPPORTED;
}

@Override
public SubtaskStateMapper getUpstreamSubtaskStateMapper() {
return SubtaskStateMapper.UNSUPPORTED;
}

public StreamPartitioner<T> copy() {
return this;
}

@Override
public String toString() {
return "RESCALE";
}

@Override
public boolean isPointwise() {
return true;
}
}

CUSTOM分区

自定义实现元素要发送到相对应的下游算子实例上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Partitions a DataStream on the key returned by the selector, using a custom partitioner. This
* method takes the key selector to get the key to partition on, and a partitioner that accepts
* the key type.
*
* <p>Note: This method works only on single field keys, i.e. the selector cannot return tuples
* of fields.
*
* @param partitioner The partitioner to assign partitions to keys.
* @param keySelector The KeySelector with which the DataStream is partitioned.
* @return The partitioned DataStream.
* @see KeySelector
*/
public <K> DataStream<T> partitionCustom(
Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
return setConnectionType(
new CustomPartitionerWrapper<>(clean(partitioner), clean(keySelector)));
}

Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package io.github.pratitioner;

import com.alibaba.fastjson2.JSON;
import io.github.pojo.Order;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import scala.math.BigInt;

public class CustomPartitioner {
public static void main(String[] args) {
// 1, 构建Flink运行环境
Configuration conf = new Configuration();
conf.setInteger("rest.port", 8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

// 2, 构建source算子获取数据
DataStreamSource<String> streamSource = env.socketTextStream("node1", 9999);
// 3, 将获取到的数据封装至pojo中(订单实体类)
SingleOutputStreamOperator<Order> orderDS = streamSource.flatMap((FlatMapFunction<String, Order>) (value, out) -> out.collect(new Order(
value.split(",")[0],
BigInt.apply(value.split(",")[1]),
Double.valueOf(value.split(",")[2]),
value.split(",")[3]
))).returns(Order.class).setParallelism(2);

// 4, 进行自定义重分区
// partitionCustom自定义分区方法
// 第一个参数 Partitioner:实现分区器,如:将奇数发给0号分区,将偶数发给1号分区
// 第二个参数 KeySelector:指定分区字段
orderDS.partitionCustom(new MyCustomPartitioner(), Order::getUserId)
.map(JSON::toJSONString)
.setParallelism(3)
.print().setParallelism(3);

}
private static class MyCustomPartitioner implements Partitioner<String> {
// 把包含001的userId发送到下游第一个算子中
@Override
public int partition(String key, int numPartitions) {
if (key.contains("001")){
System.out.println(key + ":发送到第一个分区");
return 0; // 这里相当于把数据下发到下游第一个算子中了
}else return 1; // 把其他的数据都发送至第二个算子中
}
}
}

输出算子Sink

FileSink

场景描述:

  • 大数据业务场景中,经常有一种场景:外部数据发送到kafka中,Flink作为中间件消费kafka数据并进行业务处理;处理完成之后的数据可能还需要写入到数据库或者文件系统中,比如写入hdfs中。
  • FileSink就可以用来将分区文件写入到支持 Flink FileSystem 接口的文件系统中,支持Exactly-Once语义。
  • 这种sink实现的****Exactly-Once****都是基于Flink checkpoint来实现的两阶段提交模式来保证的,主要应用在实时数仓、topic拆分、基于小时分析处理等场景下。

实现原理:

image-20230904122702767

  • Bucket:FileSink可向由Flink FileSystem抽象支持的文件系统写入分区文件(因为是流式写入,数据被视为无界)。该分区行为可配,默认按时间,具体来说每小时写入一个Bucket,该Bucket包括若干文件,内容是这一小时间隔内流中收到的所有record。
  • PartFile:每个Bucket内部分为多个PartFile来存储输出数据,该Bucket生命周期内接收到数据的sink的每个子任务至少有一个PartFile。
  • FileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded,比如 Parquet)格式。

配置详解:

  • 1-PartFile:每个Bucket内部分为多个部分文件,该Bucket内接收到数据的sink的每个子任务至少有一个PartFile。而额外文件滚动由可配的滚动策略决定。

    • 生命周期:在每个活跃的Bucket期间,每个Writer的子任务在任何时候都只会有一个单独的In-progress PartFile,但可有多个Peding和Finished状态文件。
      • In-progress :当前文件正在写入中
      • Pending :当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态
      • Finished :在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态,处于 Finished 状态的文件不会再被修改,可以被下游系统安全地读取。
    • 命名规则:
      • In-progress / Pending:part--.inprogress.uid
      • Finished:part-- 当 Sink Subtask 实例化时,它的 uid 是一个分配给 Subtask 的随机 ID 值。这个 uid 不具有容错机制,所以当 Subtask 从故障恢复时,uid 会重新生成
      • Flink 允许用户给 Part 文件名添加一个前缀和/或后缀。 使用 OutputFileConfig 来完成。
    • 注意:使用 FileSink 时需要启用 Checkpoint ,每次做 Checkpoint 时写入完成。如果 Checkpoint 被禁用,部分文件(part file)将永远处于 ‘in-progress’ 或 ‘pending’ 状态,下游系统无法安全地读取。
  • 2-序列化编码:FileSink 支持行编码格式和批量编码格式( 比如:Apache Parquet) 。

    • FileSink.forRowFormat(basePath, rowEncoder)

      • 必须配置项:

        • 输出数据的BasePath
        • 序列化每行数据写入PartFile的Encoder
      • 使用RowFormatBuilder可选配置项:

        • 自定义RollingPolicy:默认使用DefaultRollingPolicy来滚动文件,可自定义bucketCheckInterval
        • 默认1分钟。该值单位为毫秒,指定按时间滚动文件间隔时间
      •   FileSink sink  = FileSink
              .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
              .withRollingPolicy(
              DefaultRollingPolicy.builder()
                      .withRolloverInterval(Duration.ofMinutes(15))
                      .withInactivityInterval(Duration.ofMinutes(5))
                      .withMaxPartSize(MemorySize.ofMebiBytes(1))
                      .build())
              .build()
          
        1

    • FileSink.forBulkFormat(basePath, bulkWriterFactory)

      • Bulk-encoded 的 Sink 的创建和 Row-encoded 的相似,但不需要指定 Encoder,而是需要指定 BulkWriter.Factory。 BulkWriter 定义了如何添加和刷新新数据以及如何最终确定一批记录使用哪种编码字符集的逻辑。
      • 需要相关依赖
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18

    ```xml
    <!-- 应用FileSink功能所需要的依赖 -->
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-parquet</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-avro</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet-avro</artifactId>
    <version>${parquet-avro}</version>
    </dependency>
1
2
3
- 三种实现方式

- 根据schema将数据写成parquet格式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// 1. 先定义GenericRecord的数据模式
Schema schema = SchemaBuilder.builder()
.record("DataRecord")
.namespace("cn.itcast.flink.avro.schema")
.fields()
.requiredInt("gid")
.requiredLong("ts")
.requiredString("eventId")
.requiredString("sessionId")
.name("eventInfo")
.type()
.map()
.values()
.type("string")
.noDefault()
.endRecord();

// 构造好一个数据流
DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction());

// 2. 通过定义好的schema模式,来得到一个parquetWriter
ParquetWriterFactory<GenericRecord> writerFactory = AvroParquetWriters.forGenericRecord(schema);

// 3. 利用生成好的parquetWriter,来构造一个 支持列式输出parquet文件的 sink算子
FileSink<GenericRecord> sink1 = FileSink.forBulkFormat(new Path("d:/filesink/bulkformat"), writerFactory)
.withBucketAssigner(new DateTimeBucketAssigner<GenericRecord>("yyyy-MM-dd--HH"))
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("itcast").withPartSuffix(".parquet").build())
.build();

// 4. 将自定义javabean的流,转成 上述sink算子中parquetWriter所需要的 GenericRecord流
SingleOutputStreamOperator<GenericRecord> recordStream = streamSource
.map((MapFunction<EventLog, GenericRecord>) eventLog -> {
// 构造一个Record对象
GenericData.Record record = new GenericData.Record(schema);

// 将数据填入record
record.put("gid", (int) eventLog.getGuid());
record.put("eventId", eventLog.getEventId());
record.put("ts", eventLog.getTimeStamp());
record.put("sessionId", eventLog.getSessionId());
record.put("eventInfo", eventLog.getEventInfo());

return record;
}).returns(new GenericRecordAvroTypeInfo(schema)); // 由于avro的相关类、对象需要用avro的序列化器,所以需要显式指定AvroTypeInfo来提供AvroSerializer

// 5. 输出数据
recordStream.sinkTo(sink1).uid("fileSink");

env.execute();
1
2
- 传入特定JavaBean类class,它就能通过调用传入的类上的特定方法,来获得Schema对象
- 传入普通JavaBean,然后工具可以自己通过反射手段来获取用户的普通JavaBean中的包名、类名、字段名、字段类型等信息,来翻译成一个符合Avro要求的Schema。
  • 3-桶分配策略
    • FileSink使用BucketAssigner来确定每条输入的数据应该被放入哪个Bucket,默认情况下,DateTimeBucketAssigner 基于系统默认时区每小时创建一个桶。
    • Flink 有两个内置的 BucketAssigners :
      • DateTimeBucketAssigner:默认基于时间的分配器
      • BasePathBucketAssigner:将所有部分文件(part file)存储在基本路径中的分配器(单个全局桶)
  • 4-滚动策略
    • 滚动策略 RollingPolicy 定义了指定的文件在何时关闭(closed)并将其变为 Pending 状态,随后变为 Finished 状态。处于 Pending 状态的文件会在下一次 Checkpoint 时变为 Finished 状态,通过设置 Checkpoint 间隔时间,可以控制部分文件(part file)对下游读取者可用的速度、大小和数量。
    • Flink 有两个内置的滚动策略:
      • DefaultRollingPolicy
      • OnCheckpointRollingPolicy
    • 注意:使用Bulk Encoding时,文件滚动就只能使用OnCheckpointRollingPolicy的策略,该策略在每次checkpoint时滚动part-file。

文件合并:

在 Flink1.15 之后为了快速滚动,并且避免小文件的操作,添加了 compact 功能,可以在 checkpoint 的时候进行合并。

1
2
3
4
5
6
7
8
9
10
FileSink<Integer> fileSink=
FileSink.forRowFormat(new Path(path),new SimpleStringEncoder<Integer>())
.enableCompact(
FileCompactStrategy.Builder.newBuilder()
.setNumCompactThreads(1024)
.enableCompactionOnCheckpoint(5)
.build(),
new RecordWiseFileCompactor<>(
new DecoderBasedReader.Factory<>(SimpleStringDecoder::new)))
.build();
  • 参数设置
    • setNumcCompactThreads:设置合并的线程数
    • setSizeThreshold:设置大小的阈值(小于这个大小的会被合并)
    • enableCompactionOnCheckpoint:多少个 checkpoint 信号来了,会进行一次 compact
    • 如果开启了 Compaction,那么必须在 source.sinkTo(fileSink)的时候添加 uid:source.sinkTo(fileSink).uid(“fileSink”);

KafkaSink

Flink 与 Kafka 的连接器提供了端到端的精确一次(exactly once)语义保证,这在实际项目中是最高级别的一致性保证。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 1. 构造一个kafka的sink算子
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("node1.itcast.cn:9092,node2.itcast.cn:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder()
.setTopic("event-log")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();

// 2. 把数据流输出到构造好的sink算子
streamSource
.map(JSON::toJSONString).disableChaining()
.sinkTo(kafkaSink);

env.execute();

// 如果使用DeliveryGuarantee.EXACTLY_ONCE 的语义保证,则需要使用 setTransactionalIdPrefix(String),如:
// .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
// .setTransactionalIdPrefix("itcast-" + RandomUtils.nextInt(1, 100))
// .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG , "36000")

JDBCSink

  • 不保证Exactly-Once
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
SinkFunction<EventLog> jdbcSink = JdbcSink.sink(
"insert into t_eventlog values (?,?,?,?,?) on duplicate key update sessionId=?,eventId=?,ts=?,eventInfo=? ",
new JdbcStatementBuilder<EventLog>() {
@Override
public void accept(PreparedStatement preparedStatement, EventLog eventLog) throws SQLException {
preparedStatement.setLong(1, eventLog.getGuid());
preparedStatement.setString(2, eventLog.getSessionId());
preparedStatement.setString(3, eventLog.getEventId());
preparedStatement.setLong(4, eventLog.getTimeStamp());
preparedStatement.setString(5, JSON.toJSONString(eventLog.getEventInfo()));

preparedStatement.setString(6, eventLog.getSessionId());
preparedStatement.setString(7, eventLog.getEventId());
preparedStatement.setLong(8, eventLog.getTimeStamp());
preparedStatement.setString(9, JSON.toJSONString(eventLog.getEventInfo()));
}
},
JdbcExecutionOptions.builder()
.withMaxRetries(3)
.withBatchSize(1)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://node1:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
.withUsername("root")
.withPassword("123456")
.build()
);

// 输出数据
streamSource.addSink(jdbcSink);

env.execute();
  • 保证Exactly-Once
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
SinkFunction<EventLog> exactlyOnceSink = JdbcSink.exactlyOnceSink(
"insert into t_eventlog values (?,?,?,?,?) on duplicate key update sessionId=?,eventId=?,ts=?,eventInfo=? ",
new JdbcStatementBuilder<EventLog>() {
@Override
public void accept(PreparedStatement preparedStatement, EventLog eventLog) throws SQLException {
preparedStatement.setLong(1, eventLog.getGuid());
preparedStatement.setString(2, eventLog.getSessionId());
preparedStatement.setString(3, eventLog.getEventId());
preparedStatement.setLong(4, eventLog.getTimeStamp());
preparedStatement.setString(5, JSON.toJSONString(eventLog.getEventInfo()));

preparedStatement.setString(6, eventLog.getSessionId());
preparedStatement.setString(7, eventLog.getEventId());
preparedStatement.setLong(8, eventLog.getTimeStamp());
preparedStatement.setString(9, JSON.toJSONString(eventLog.getEventInfo()));
}
},
JdbcExecutionOptions.builder()
.withMaxRetries(3)
.withBatchSize(1)
.build(),
JdbcExactlyOnceOptions.builder()
// mysql不支持同一个连接上存在并行的多个事务,必须把该参数设置为true
.withTransactionPerConnection(true)
.build(),
new SerializableSupplier<XADataSource>() {
@Override
public XADataSource get() {
// XADataSource就是jdbc连接,不过它是支持分布式事务的连接
// 而且它的构造方法,不同的数据库构造方法不同
MysqlXADataSource xaDataSource = new MysqlXADataSource();
xaDataSource.setUrl("jdbc:mysql://node1:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8");
xaDataSource.setUser("root");
xaDataSource.setPassword("123456");
return xaDataSource;
}
}
);

// 输出数据
streamSource.addSink(exactlyOnceSink);

env.execute();