流式计算

什么是数据流?什么是数据集

批处理对应的是数据集

流处理对应的是数据流

无界数据流和有界数据流

有界数据流:明确定义开始和结束的数据流,计算之前获取的所有数据-mysql、日志文件

无界数据流:只有开始没有结束的数据流,获取数据立即处理,无法等待所有数据到达-socket、kafka

硬核Flink介绍

基本介绍

这里先贴上一个官方链接🔗保命: https://flink.apache.org/

  • 第1代—Hadoop MapReduce

首先第一代的计算引擎,无疑就是 Hadoop 承载的 MapReduce。它将计算分为两个阶段,分别为 Map 和 Reduce。对于上层应用来说,就不得不想方设法去拆分算法,甚至于不得不在上层应用实现多个 Job 的串联,以完成一个完整的算法,例如迭代计算。

  • 第2代—DAG框架(Tez) + MapReduce

由于这样的弊端,催生了支持 DAG 框架的产生。因此,支持 DAG 的框架被划分为第二代计算引擎。如 Tez 以及更上层的 Oozie。这里我们不去细究各种 DAG 实现之间的区别,不过对于当时的 Tez 和 Oozie 来说,大多还是批处理的任务。

  • 第3代—Spark

接下来就是以 Spark 为代表的第三代的计算引擎。第三代计算引擎的特点主要是 Job 内部的 DAG 支持(不跨越 Job),以及强调的实时计算。在这里,很多人也会认为第三代计算引擎也能够很好的运行批处理的 Job。

  1. 离线计算、实时计算、SQL高层API
  2. 自带DAG
  3. 内存迭代计算,性能大幅提升
  • 第4代—Flink

随着第三代计算引擎的出现,促进了上层应用快速发展,例如各种迭代计算的性能以及对流计算和 SQL 等的支持。Flink 的诞生就被归在了第四代。这应该主要表现在 Flink 对流计算的支持,以及更一步的实时性上面。当然 Flink 也可以支持 Batch 的任务,以及 DAG 的运算。

  1. 离线计算、实时计算、SQL高层API
  2. 自带DAG
  3. 流式计算性能更好,可靠性更高

Flink流处理特性

  • 支持高吞吐、低延迟(亚秒)、高性能的流处理
  • 支持带有事件时间的窗口(Window)操作
  • 支持有状态计算端到端的Exactly-once语义
  • 支持高度灵活的窗口(Window)操作,支持基于time、count、session,以及data-driven的窗口操作
  • 支持具有Backpressure功能的持续流模型
  • 支持基于轻量级分布式快照(Snapshot)实现的容错
  • 一个运行时同时支持Batch on Streaming处理和Streaming处理
  • Flink在JVM内部实现了自己的内存管理
  • 支持迭代计算
  • 支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存

Flink的应用场景

  • Event-driven Applications(事件驱动)
    • 信用卡交易、刷单、监控
  • Data Analytics Applications(数据分析)
    • 双11大屏、库存分析
  • Data Pipeline Applications(数据管道)
    • 用于数据提取-转换-加载在存储系统间进行数据转换和迁移的管道
    • 对于批处理ETL是一个周期性的任务
    • 对于流处理ETL是一个持续提取-转换-加载的管道

Flink架构体系

Flink是典型的主从架构(浅浅总结一下吧!)

  • 主从架构:hadoop(hdfs mapreduce yarn)、spark、hbase、flink
  • 去中心化架构:zookeeper、Kafka

所有的 Flink 程序都可以归纳为由三部分构成:

  • Source:表示“源算子”,负责读取数据源。
  • Transformation:表示“转换算子”,利用各种算子进行处理加工。
  • Sink:表示“下沉算子”,负责数据的输出。

系统整体构成

  • Client:Flink 客户端是 F1ink 提供的 CLI 命令行工具,用来提交 Flink 作业到 Flink 集群,在客户端中负责 StreamGraph (流图)和 Job Graph (作业图)的构建。
  • JobManager:JobManager处理器也称之为Master,JobManager根据并行度将Flink客户端提交的Flink应用分解为子任务,从资源管理器 ResourceManager 申请所需的计算资源,资源具备之后,开始分发任务到TaskManager 执行 Task,并负责应用容错,跟踪作业的执行状态,发现异常则恢复作业等。
  • TaskManager:TaskManager处理器也称之为Worker,TaskManager 接收 JobManage 分发的子任务,根据自身的资源情况管理子任务的启动、 停止、销毁、异常恢复等生命周期阶段。Flink运行时至少会存在一个worker处理器。

运行时架构

作业管理器-JobManager(包含Dispatcher, JobMaster, ResourceManager)

  • 分发器(Dispatcher)
    • Dispatcher 主要负责提供一个 REST 接口,用来提交应用,并且负责为每一个新提交的作业启动一个新的 JobMaster 组件。Dispatcher 也会启动一个 Web UI,用来方便地展示和监控作业执行的信息。
  • 作业处理器(JobMaster)
    • JobMaster 是 JobManager 中最核心的组件,负责处理单独的作业(Job),JobMaster和每个作业是一一对应的。
  • 资源管理器(ResourceManager)
    • ResourceManager 主要负责资源的分配和管理,在 Flink 集群中只有一个。所谓“资源”,主要是指 TaskManager 的任务槽(task slots)。任务槽就是 Flink 集群中的资源调配单元,包含了机器用来执行计算的一组 CPU 和内存资源。每一个任务(Task)都需要分配到一个 slot 上执行。

任务管理器-TaskManager

  • Slot 任务执行槽
    • 物理概念,一个TM(TaskManager)内会划分出多个Slot,1个Slot内最多可以运行1个Task(Subtask)或一组由Task(Subtask)组成的任务链。
    • 多个Slot之间会共享平分当前TM的内存空间。Slot是对一个TM的资源进行固定分配的工具,每个Slot在TM启动后,可以获得固定的资源。
    • flink集群只要启动,TaskManager设置的资源就是固定,就意味着每个slot资源也是固定。
    • 通过集群的配置文件来设定 TaskManager 的 slot 数量taskmanager.numberOfTaskSlots:8

注意:slot 目前仅仅用来隔离内存,不会涉及 CPU 的隔离。在具体应用时,可以将 slot 数量配置为机器的 CPU 核心数,尽量避免不同任务之间对 CPU 的竞争。这也是开发环境默认并行度设为机器 CPU 数量的原因。

Flink计算优化

Flink并行计算

如何设置:

  • 代码中设置
1
2
3
4
5
6
# 我们在代码中,可以很简单地在算子后跟着调用 setParallelism()方法,来设置当前算子的并行度,这种方式设置的并行度,只针对当前算子有效:
stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);

# 我们也可以直接调用执行环境的 setParallelism()方法,全局设定并行度:
env.setParallelism(2);
# 这样代码中所有算子,默认的并行度就都为 2 了。我们一般不会在程序中设置全局并行度,因为如果在程序中对全局并行度进行硬编码,会导致无法动态扩容。
  • 提交应用时设置
1
2
# 在使用 flink run 命令提交应用时,可以增加-p 参数来指定当前应用程序执行的并行度,它的作用类似于执行环境的全局设置:
bin/flink run –p 2 –c org.apache.flink.examples.java.wordcount.WordCount /export/server/flink/examples/batch/WordCount.jar
  • 配置文件中设置
1
2
# 直接在集群的配置文件 flink-conf.yaml 中直接更改默认并行度:
parallelism.default: 2

优先级:算子 > 代码全局 > 命令行参数 > 配置文件

Flink合并算子链(Operator Chain)

什么是算子链: (算子间的数据传输)

  • 一对一(One-to-one,forwarding)

这种模式下,数据流维护着分区以及元素的顺序。比如图中的 source 和 map 算子,source算子读取数据之后,可以直接发送给 map 算子做处理,它们之间不需要重新分区,也不需要调整数据的顺序。这就意味着 map 算子的子任务,看到的元素个数和顺序跟 source 算子的子任务产生的完全一样,保证着“一对一”的关系。map、filter、flatMap 等算子都是这种 one-to-one的对应关系。这种关系类似于 Spark 中的窄依赖。

  • 重分区(Redistributing)

在这种模式下,数据流的分区会发生改变。比图中的 map 和后面的 keyBy/window 算子之间(这里的 keyBy 是数据传输算子,后面的 window、apply 方法共同构成了 window 算子),以及 keyBy/window 算子和 Sink 算子之间,都是这样的关系。
每一个算子的子任务,会根据数据传输的策略,把数据发送到不同的下游目标任务。例如,keyBy()是分组操作,本质上基于键(key)的哈希值(hashCode)进行了重分区;而当并行度改变时,比如从并行度为 2 的 window 算子,要传递到并行度为 1 的 Sink 算子,这时的数据传输方式是再平衡(rebalance),会把数据均匀地向下游子任务分发出去。这些传输方式都会引起重分区(redistribute)的过程,这一过程类似于 Spark 中的 shuffle。这种算子间的关系类似于 Spark 中的宽依赖。

合并算子链

  • 合并算子链是flink作业自动进行的优化,当然可以手动关闭

在 Flink 中,并行度相同的一对一(one to one)算子操作,可以直接链接在一起形成一个“大”的任务(task),这样原来的算子就成为了真正任务里的一部分,每个 task会被一个线程执行。这样的技术被称为“算子链”(Operator Chain)。

将算子链接成 task 是非常有效的优化:可以减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提升吞吐量。

  • 关闭合并算子链, 一般不这么做
1
2
3
4
5
在代码中对算子做一些特定的设置:
// 禁用算子链
.map(word -> Tuple2.of(word, 1L)).disableChaining();
// 从当前算子开始新链
.map(word -> Tuple2.of(word, 1L)).startNewChain()

Flink任务槽的共享

  • 引入, 因为一个算子就要占用一个slot, 且多个资源相互隔离有什么问题?
    • 存在比较多的网络io
  • Flink 默认是允许 slot 共享的
    • 如果希望某个算子对应的任务完全独占一个 slot,或者只有某一部分算子共享 slot,我们也可以通过设置“slot 共享组”(SlotSharingGroup)手动指定:.map(word -> Tuple2.of(word, 1L)).slotSharingGroup(“1”);

什么是任务槽的共享

每个任务节点的并行子任务一字排开,占据不同的 slot;而不同的任务节点的子任务可以共享 slot。一个 slot 中,可以将程序处理的所有任务都放在这里执行,我们把它叫作保存了整个作业的运行管道(pipeline)。

只要属于同一个作业,那么对于不同任务节点的并行子任务,就可以放到同一个 slot 上执行。

任务槽和并行度的关系※

  • 概念上
    • slot是静态的概念, 是指TaskManager具有的并发执行能力, 可以通过参数taskmanager.numberOfTaskSlots 进行配置;而并行度(parallelism)是动态概念,也就是TaskManager 运行程序时实际使用的并发能力,可以通过参数 parallelism.default 进行配置。
  • 同一个任务节点的并行子任务是不能共享 slot 的,所以允许 slot 共享之后,运行作业所需的 slot 数量正好就是作业中所有算子并行度的最大值。
  • 同一个slot中不能有同一个任务的两个及以上的并行子任务

流图转换

由 Flink 程序直接映射成的数据流图(dataflow graph),也被称为逻辑流图(logicalStreamGraph),因为它们表示的是计算逻辑的高级视图。到具体执行环节时,我们还要考虑并行子任务的分配、数据在任务间的传输,以及合并算子链的优化。为了说明最终应该怎样执行一个流处理程序,Flink 需要将逻辑流图进行解析,转换为物理数据流图。

在这个转换过程中,有几个不同的阶段,会生成不同层级的图,其中最重要的就是作业图(JobGraph)和执行图(ExecutionGraph)。Flink 中任务调度执行的图,按照生成顺序可以分成四层:

逻辑流图(StreamGraph)→ 作业图(JobGraph)→ 执行图(ExecutionGraph)→ 物理图(Physical Graph)

1.逻辑流图(一般在客户端完成)dataflow

这个是根据用户自己写的DataStream API代码生成的DAG图,用来表示程序的拓扑结构。

2.作业图(一般在客户端完成)job graph

逻辑流图经过优化之后生成的就是作业图,会随着作业一起提交给JobManager。

主要优化的是将多个符合条件的节点进行合并,形成算子链。

3.执行图(在JobMaster中完成)execute graph

JobMaster收到作业图后,会生成执行图,执行图是作业图的并行化版本,按照真正的并行度进行拆分。

4.物理图

Jobmaster生成执行图后,会将执行图分发给TaskManager,由TM根据执行图生成“物理图”。

Flink集群搭建

Flink支持多种安装模式。

  • local(本地)——本地模式
  • standalone——独立模式,Flink自带集群,开发测试环境使用
  • standaloneHA—独立集群高可用模式,Flink自带集群,开发测试环境使用
  • yarn——计算资源统一由Hadoop YARN管理,生产环境测试

Standalone

Flink程序需要提交给JobClient

JobClient将作业提交给obManager

JobManager负责协调资源分配和作业执行。 资源分配完成后,任务将提交给相应的TaskManager

TaskManager启动一个线程以开始执行。TaskManager会向JobManager报告状态更改。例如开始执行,正在进行或已完成。

作业执行完成后,结果将发送回客户端(JobClient)

环境准备

  • jdk11及以上【配置JAVA_HOME环境变量】
  • ssh免密码登录【集群内节点之间免密登录】

下载安装包

https://dlcdn.apache.org/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz

服务器规划

服务器: node1(Master +Worker)

安装步骤

  • 上传Flink压缩包到指定目录
  • 解压缩flink到 /export/server 目录
1
tar -zxvf flink-1.15.2-bin-scala_2.12.tgz -C /export/server/
  • 改名或创建软链接:方便后期升级
1
2
cd /export/server
ln -s flink-1.15.2/ /export/server/flink
  • 修改配置文件flink-conf.yaml
1
2
cd /export/server/flink
vim conf/flink-conf.yaml

将下面两个参数注释掉或者值改为node1(这里我们采取第一种注释掉)

第二种方式

  • 启动Flink
1
bin/start-cluster.sh
1
bin/flink run /export/server/flink/examples/batch/WordCount.jar
  • 日志的查看JobManager 和 TaskManager 的启动日志可以在 Flink binary 目录下的 log 子目录中找到

log 目录中以“flink-${user}-standalonesession-${id}-${hostname}”为前缀的文件对应的即是 JobManager 的输出,其中有三个文件:

1
2
3
4
- flink-${user}-standalonesession-${id}-${hostname}.log:代码中的日志输出 
- flink-${user}-standalonesession-${id}-${hostname}.out:进程执行时的 stdout 输出
- flink-${user}-standalonesession-${id}-${hostname}-gc.log:JVM 的 GC 的日志
log 目录中以“flink-${user}-taskexecutor-${id}-${hostname}”为前缀的文件对应的是 TaskManager 的输出,也包括三个文件,和 JobManager 的输出一致。

日志的配置文件在 Flink binary 目录的 conf 子目录下:

1
2
3
4
5
6
7
8
9
10
11
12
13
- log4j-cli.properties:用 Flink 命令行时用的 log 配置,比如执行“flink run”命令
- log4j-console.properties:JobManagers/TaskManagers 在前台模式运行时使用(例如 Kubernetes);
- log4j-session.properties:是用 yarn-session.sh或Kubernetes session时启动时命令行执行时用的 log 配置
log4j.properties:无论是 standalone 还是 yarn 模式,JobManager 和 TaskManager 上用 的 log 配置都是 log4j.properties
这三个“log4j.*properties”文件分别有三个“logback.*xml”文件与之对应:
- log4j-console.properties -> logback-console.xml
- log4j-session.properties -> logback-session.xml
- log4j.properties -> logback.xml
如果想使用 logback 的同学, 需要
lib 目录中移除 log4j-slf4j-impl jars;
lib 目录中添加 logback-core 和 logback-classic jars。
如果启用了 logback,则会自动使用”logback.*xml”这些文件
需要注意的是,“flink-${user}-standalonesession-${id}-${hostname}”和“flink-${user}- taskexecutor-${id}-${hostname}”都带有“${id}”,“${id}”表示本进程在本机上该角色(JobManager 或 TaskManager)的所有进程中的启动顺序,默认从 0 开始。

yarn集群环境

准备工作

jdk1.8及以上(推荐jdk11)【配置JAVA_HOME环境变量】

ssh免密码登录【集群内节点之间免密登录】

至少hadoop2.8.5

hdfs & yarn均启动

集群规划

服务器: node1(Master +Worker)

服务器: node2(Worker)

服务器: node3(Worker)

修改hadoop的配置参数

  • 打开yarn配置页面(每台hadoop节点都需要修改)
1
2
3
4
5
6
7
8
vim /export/server/hadoop/etc/hadoop/yarn-site.xml
# 添加
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
# 是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true
# 在这里面我们需要关闭,因为对于flink使用yarn模式下,很容易内存超标,这个时候yarn会自动杀掉job
  • 分发yarn-site.xml到其它服务器节点
1
2
scp yarn-site.xml node2:$PWD
scp yarn-site.xml node3:$PWD
  • 启动ZK、HDFS、YARN集群

在企业实际开发中,使用Flink时,更多的使用方式是Flink On Yarn模式,原因如下:

  • Yarn的资源可以按需使用,提高集群的资源利用率
  • Yarn的任务有优先级,根据优先级运行作业
  • 基于Yarn调度系统,能够自动化地处理各个角色的 Failover(容错)
    • JobManager 进程和 TaskManager 进程都由 Yarn NodeManager 监控
    • 如果 JobManager 进程异常退出,则 Yarn ResourceManager 会重新调度 JobManager 到其他机器
    • 如果 TaskManager 进程异常退出,JobManager 会收到消息并重新向 Yarn ResourceManager 申请资源,重新启动 TaskManager

运行机制

Yarn的客户端需要获取hadoop的配置信息,连接Yarn的ResourceManager。所以要有设置有 YARN_CONF_DIR或者HADOOP_CONF_DIR或者HADOOP_CONF_PATH,只要设置了其中一个环境变量,就会被读取。如果读取上述的变量失败了,那么将会选择hadoop_home的环境变量,读取成功将会尝试加载$HADOOP_HOME/etc/hadoop的配置文件。

Session模式

这种模式会预先在yarn或者k8s上启动一个flink集群,然后将作业提交到这个集群上,这种模式,集群中的作业使用相同的资源,如果某一个作业出现了问题导致整个集群挂掉,那就得重启集群中的所有任务,这样就会给集群造成很大的负面影响。

特点:需要事先启动一个flink集群申请资源,提前启动JobManager和TaskManager

优点:不需要每次提交作业都申请资源,而是使用已经申请好的资源,从而执行效率高

缺点:作业每次执行完不会释放资源,因此会一起占用资源

应用场景:比较适合于提交比较频繁的场景,小作业比较多

Per-Job模式(1.15废弃)

考虑到集群的资源隔离情况,一般生产上的任务都会选择per job模式,也就是每个作业启动一个flink集群,各个集群之间独立运行,互不影响,且每个集群可以设置独立的配置。

特点:每次提交作业都需要申请一次资源(每个作业都会创建一个JobManager)

优点:作业运行完成后,资源会被释放

缺点:每次提交都重新申请资源,会影响执行效率

应用场景:适合作业比较少的场景,大作业的情况下

Application模式

flink-1.11引入了一种新的部署模式,即 Application 模式。目前,flink-1.11 已经可以支持基于 Yarn 和 Kubernetes 的 Application 模式。

Session模式:所有作业共享集群资源,隔离性差,JM 负载瓶颈,main 方法在客户端执行。

Per-Job模式:每个作业单独启动集群,隔离性好,JM 负载均衡,main 方法在客户端执行。

通过以上两种模式的特点描述,可以看出,main方法都是在客户端执行,社区考虑到在客户端执行 main() 方法来获取 flink 运行时所需的依赖项,并生成 JobGraph,提交到集群的操作都会在实时平台所在的机器上执行,那么将会给服务器造成很大的压力。尤其在大量用户共享客户端时,问题更加突出。

Application 模式下,用户程序的 main 方法将在集群中而不是客户端运行,用户将程序逻辑和依赖打包进一个可执行的 jar 包里,集群的入口程序 (ApplicationClusterEntryPoint) 负责调用其中的 main 方法来生成 JobGraph。Application 模式为每个提交的应用程序创建一个集群,该集群可以看作是在特定应用程序的作业之间共享的会话集群,并在应用程序完成时终止。

Fink On Yarn三种模式比较

yarn session:需要事先在yarn集群上启动一个flink集群,所有的flink作业共享这个集群,资源利用率高,程序执行效率高,但是程序运行结束不会释放资源,程序之间的隔离性较差,同时main方法运行客户端上,作业提交多

yarn per-job:不需要事先启动flink集群,每提交一个flink作业会启动一个flink集群,作业与作业之间资源互不影响,隔离性较好,作业运行结束会释放资源,同时main方法运行客户端上,作业提交少

yarn application:main方法运行在集群上,其它与per-job相同