【Flink】Flink技术栈(Theory及集群部署)
流式计算
什么是数据流?什么是数据集
批处理对应的是数据集
流处理对应的是数据流
无界数据流和有界数据流
有界数据流:明确定义开始和结束的数据流,计算之前获取的所有数据-mysql、日志文件
无界数据流:只有开始没有结束的数据流,获取数据立即处理,无法等待所有数据到达-socket、kafka
硬核Flink介绍
基本介绍
这里先贴上一个官方链接🔗保命: https://flink.apache.org/
- 第1代—Hadoop
MapReduce
首先第一代的计算引擎,无疑就是 Hadoop 承载的 MapReduce。它将计算分为两个阶段,分别为 Map 和 Reduce。对于上层应用来说,就不得不想方设法去拆分算法,甚至于不得不在上层应用实现多个 Job 的串联,以完成一个完整的算法,例如迭代计算。
- 第2代—
DAG框架(Tez)
+ MapReduce由于这样的弊端,催生了支持 DAG 框架的产生。因此,支持 DAG 的框架被划分为第二代计算引擎。如 Tez 以及更上层的 Oozie。这里我们不去细究各种 DAG 实现之间的区别,不过对于当时的 Tez 和 Oozie 来说,大多还是批处理的任务。
- 第3代—
Spark
接下来就是以 Spark 为代表的第三代的计算引擎。第三代计算引擎的特点主要是 Job 内部的 DAG 支持(不跨越 Job),以及强调的实时计算。在这里,很多人也会认为第三代计算引擎也能够很好的运行批处理的 Job。
- 离线计算、实时计算、SQL高层API
- 自带DAG
- 内存迭代计算,性能大幅提升
- 第4代—
Flink
随着第三代计算引擎的出现,促进了上层应用快速发展,例如各种迭代计算的性能以及对流计算和 SQL 等的支持。Flink 的诞生就被归在了第四代。这应该主要表现在 Flink 对流计算的支持,以及更一步的实时性上面。当然 Flink 也可以支持 Batch 的任务,以及 DAG 的运算。
- 离线计算、实时计算、SQL高层API
- 自带DAG
- 流式计算性能更好,可靠性更高
Flink流处理特性
- 支持高吞吐、低延迟(亚秒)、高性能的流处理
- 支持带有事件时间的窗口(Window)操作
- 支持有状态计算端到端的Exactly-once语义
- 支持高度灵活的窗口(Window)操作,支持基于time、count、session,以及data-driven的窗口操作
- 支持具有Backpressure功能的持续流模型
- 支持基于轻量级分布式快照(Snapshot)实现的容错
- 一个运行时同时支持Batch on Streaming处理和Streaming处理
- Flink在JVM内部实现了自己的内存管理
- 支持迭代计算
- 支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存
Flink的应用场景
- Event-driven Applications(事件驱动)
- 信用卡交易、刷单、监控
- Data Analytics Applications(数据分析)
- 双11大屏、库存分析
- Data Pipeline Applications(数据管道)
- 用于数据提取-转换-加载在存储系统间进行数据转换和迁移的管道
- 对于批处理ETL是一个周期性的任务
- 对于流处理ETL是一个持续提取-转换-加载的管道
Flink架构体系
Flink是典型的主从架构(浅浅总结一下吧!)
- 主从架构:hadoop(hdfs mapreduce yarn)、spark、hbase、flink
- 去中心化架构:zookeeper、Kafka
所有的 Flink 程序都可以归纳为由三部分构成:
- Source:表示“源算子”,负责读取数据源。
- Transformation:表示“转换算子”,利用各种算子进行处理加工。
- Sink:表示“下沉算子”,负责数据的输出。
系统整体构成
- Client:Flink 客户端是 F1ink 提供的 CLI 命令行工具,用来提交 Flink 作业到 Flink 集群,在客户端中负责 StreamGraph (流图)和 Job Graph (作业图)的构建。
- JobManager:JobManager处理器也称之为Master,JobManager根据并行度将Flink客户端提交的Flink应用分解为子任务,从资源管理器 ResourceManager 申请所需的计算资源,资源具备之后,开始分发任务到TaskManager 执行 Task,并负责应用容错,跟踪作业的执行状态,发现异常则恢复作业等。
- TaskManager:TaskManager处理器也称之为Worker,TaskManager 接收 JobManage 分发的子任务,根据自身的资源情况管理子任务的启动、 停止、销毁、异常恢复等生命周期阶段。Flink运行时至少会存在一个worker处理器。
运行时架构
作业管理器-JobManager(包含Dispatcher, JobMaster, ResourceManager)
- 分发器(Dispatcher)
- Dispatcher 主要负责提供一个 REST 接口,用来提交应用,并且负责为每一个新提交的作业启动一个新的 JobMaster 组件。Dispatcher 也会启动一个 Web UI,用来方便地展示和监控作业执行的信息。
- 作业处理器(JobMaster)
- JobMaster 是 JobManager 中最核心的组件,负责处理单独的作业(Job),JobMaster和每个作业是一一对应的。
- 资源管理器(ResourceManager)
- ResourceManager 主要负责资源的分配和管理,在 Flink 集群中只有一个。所谓“资源”,主要是指 TaskManager 的任务槽(task slots)。任务槽就是 Flink 集群中的资源调配单元,包含了机器用来执行计算的一组 CPU 和内存资源。每一个任务(Task)都需要分配到一个 slot 上执行。
任务管理器-TaskManager
- Slot 任务执行槽
- 物理概念,一个TM(TaskManager)内会划分出多个Slot,1个Slot内最多可以运行1个Task(Subtask)或一组由Task(Subtask)组成的任务链。
- 多个Slot之间会共享平分当前TM的内存空间。Slot是对一个TM的资源进行固定分配的工具,每个Slot在TM启动后,可以获得固定的资源。
- flink集群只要启动,TaskManager设置的资源就是固定,就意味着每个slot资源也是固定。
通过集群的配置文件来设定 TaskManager 的 slot 数量taskmanager.numberOfTaskSlots:8
注意:slot 目前仅仅用来隔离内存,不会涉及 CPU 的隔离。在具体应用时,可以将 slot 数量配置为机器的 CPU 核心数,尽量避免不同任务之间对 CPU 的竞争。这也是开发环境默认并行度设为机器 CPU 数量的原因。
Flink计算优化
Flink并行计算
如何设置:
- 代码中设置
1 | 我们在代码中,可以很简单地在算子后跟着调用 setParallelism()方法,来设置当前算子的并行度,这种方式设置的并行度,只针对当前算子有效: |
- 提交应用时设置
1 | 在使用 flink run 命令提交应用时,可以增加-p 参数来指定当前应用程序执行的并行度,它的作用类似于执行环境的全局设置: |
- 配置文件中设置
1 | 直接在集群的配置文件 flink-conf.yaml 中直接更改默认并行度: |
优先级:算子 > 代码全局 > 命令行参数 > 配置文件
Flink合并算子链(Operator Chain)
什么是算子链: (算子间的数据传输)
- 一对一(One-to-one,forwarding)
这种模式下,数据流维护着分区以及元素的顺序。比如图中的 source 和 map 算子,source算子读取数据之后,可以直接发送给 map 算子做处理,它们之间不需要重新分区,也不需要调整数据的顺序。这就意味着 map 算子的子任务,看到的元素个数和顺序跟 source 算子的子任务产生的完全一样,保证着“一对一”的关系。map、filter、flatMap 等算子都是这种 one-to-one的对应关系。这种关系类似于 Spark 中的窄依赖。
- 重分区(Redistributing)
在这种模式下,数据流的分区会发生改变。比图中的 map 和后面的 keyBy/window 算子之间(这里的 keyBy 是数据传输算子,后面的 window、apply 方法共同构成了 window 算子),以及 keyBy/window 算子和 Sink 算子之间,都是这样的关系。
每一个算子的子任务,会根据数据传输的策略,把数据发送到不同的下游目标任务。例如,keyBy()是分组操作,本质上基于键(key)的哈希值(hashCode)进行了重分区;而当并行度改变时,比如从并行度为 2 的 window 算子,要传递到并行度为 1 的 Sink 算子,这时的数据传输方式是再平衡(rebalance),会把数据均匀地向下游子任务分发出去。这些传输方式都会引起重分区(redistribute)的过程,这一过程类似于 Spark 中的 shuffle。这种算子间的关系类似于 Spark 中的宽依赖。
合并算子链
- 合并算子链是flink作业
自动进行的优化
,当然可以手动关闭在 Flink 中,并行度相同的一对一(one to one)算子操作,可以直接链接在一起形成一个“大”的任务(task),这样原来的算子就成为了真正任务里的一部分,每个 task会被一个线程执行。这样的技术被称为“算子链”(Operator Chain)。
将算子链接成 task 是非常有效的优化:可以减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提升吞吐量。
- 关闭合并算子链, 一般不这么做
1
2
3
4
5 在代码中对算子做一些特定的设置:
// 禁用算子链
.map(word -> Tuple2.of(word, 1L)).disableChaining();
// 从当前算子开始新链
.map(word -> Tuple2.of(word, 1L)).startNewChain()
Flink任务槽的共享
- 引入, 因为一个算子就要占用一个slot, 且多个资源相互隔离有什么问题?
- 存在比较多的网络io
- Flink 默认是允许 slot 共享的
- 如果希望某个算子对应的任务完全独占一个 slot,或者只有某一部分算子共享 slot,我们也可以通过设置“slot 共享组”(SlotSharingGroup)手动指定:
.map(word -> Tuple2.of(word, 1L)).slotSharingGroup(“1”);
- 如果希望某个算子对应的任务完全独占一个 slot,或者只有某一部分算子共享 slot,我们也可以通过设置“slot 共享组”(SlotSharingGroup)手动指定:
什么是任务槽的共享
每个任务节点的并行子任务一字排开,占据不同的 slot;而不同的任务节点的子任务可以共享 slot。一个 slot 中,可以将程序处理的所有任务都放在这里执行,我们把它叫作保存了整个作业的运行管道(pipeline)。
只要属于同一个作业,那么对于不同任务节点的并行子任务,就可以放到同一个 slot 上执行。
任务槽和并行度的关系※
- 概念上
- slot是静态的概念, 是指TaskManager具有的并发执行能力, 可以通过参数
taskmanager.numberOfTaskSlots
进行配置;而并行度(parallelism)是动态概念,也就是TaskManager 运行程序时实际使用的并发能力,可以通过参数 parallelism.default 进行配置。- 同一个任务节点的并行子任务是不能共享 slot 的,所以允许 slot 共享之后,运行作业所需的 slot 数量正好就是作业中所有算子并行度的最大值。
- 同一个slot中不能有同一个任务的两个及以上的并行子任务
流图转换
由 Flink 程序直接映射成的数据流图(dataflow graph),也被称为逻辑流图(logicalStreamGraph),因为它们表示的是计算逻辑的高级视图。到具体执行环节时,我们还要考虑并行子任务的分配、数据在任务间的传输,以及合并算子链的优化。为了说明最终应该怎样执行一个流处理程序,Flink 需要将逻辑流图进行解析,转换为物理数据流图。
在这个转换过程中,有几个不同的阶段,会生成不同层级的图,其中最重要的就是作业图(JobGraph)和执行图(ExecutionGraph)。Flink 中任务调度执行的图,按照生成顺序可以分成四层:
逻辑流图(StreamGraph)→ 作业图(JobGraph)→ 执行图(ExecutionGraph)→ 物理图(Physical Graph)
1.逻辑流图(一般在客户端完成)dataflow
这个是根据用户自己写的DataStream API代码生成的DAG图,用来表示程序的拓扑结构。
2.作业图(一般在客户端完成)job graph
逻辑流图经过优化之后生成的就是作业图,会随着作业一起提交给JobManager。
主要优化的是将多个符合条件的节点进行合并,形成算子链。
3.执行图(在JobMaster中完成)execute graph
JobMaster收到作业图后,会生成执行图,执行图是作业图的并行化版本,按照真正的并行度进行拆分。
4.物理图
Jobmaster生成执行图后,会将执行图分发给TaskManager,由TM根据执行图生成“物理图”。
Flink集群搭建
Flink支持多种安装模式。
- local(本地)——本地模式
- standalone——独立模式,Flink自带集群,开发测试环境使用
- standaloneHA—独立集群高可用模式,Flink自带集群,开发测试环境使用
- yarn——计算资源统一由Hadoop YARN管理,生产环境测试
Standalone
Flink程序需要提交给JobClient
JobClient将作业提交给obManager
JobManager负责协调资源分配和作业执行。 资源分配完成后,任务将提交给相应的TaskManager
TaskManager启动一个线程以开始执行。TaskManager会向JobManager报告状态更改。例如开始执行,正在进行或已完成。
作业执行完成后,结果将发送回客户端(JobClient)
环境准备
- jdk11及以上【配置JAVA_HOME环境变量】
- ssh免密码登录【集群内节点之间免密登录】
下载安装包
https://dlcdn.apache.org/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz
服务器规划
服务器: node1(Master +Worker)
安装步骤
- 上传Flink压缩包到指定目录
- 解压缩flink到 /export/server 目录
1 | tar -zxvf flink-1.15.2-bin-scala_2.12.tgz -C /export/server/ |
- 改名或创建软链接:方便后期升级
1 | cd /export/server |
- 修改配置文件flink-conf.yaml
1 | cd /export/server/flink |
将下面两个参数注释掉或者值改为node1(这里我们采取第一种注释掉)
第二种方式
- 启动Flink
1 | bin/start-cluster.sh |
- 通过jps查看进程信息
- 访问web界面http://node1:8081
- 运行测试任务
1 | bin/flink run /export/server/flink/examples/batch/WordCount.jar |
- 日志的查看JobManager 和 TaskManager 的启动日志可以在 Flink binary 目录下的 log 子目录中找到
log 目录中以“flink-${user}-standalonesession-${id}-${hostname}”为前缀的文件对应的即是 JobManager 的输出,其中有三个文件:
1 | - flink-${user}-standalonesession-${id}-${hostname}.log:代码中的日志输出 |
日志的配置文件在 Flink binary 目录的 conf 子目录下:
1 | - log4j-cli.properties:用 Flink 命令行时用的 log 配置,比如执行“flink run”命令 |
yarn集群环境
准备工作
jdk1.8及以上(推荐jdk11)【配置JAVA_HOME环境变量】
ssh免密码登录【集群内节点之间免密登录】
至少hadoop2.8.5
hdfs & yarn均启动
集群规划
服务器: node1(Master +Worker)
服务器: node2(Worker)
服务器: node3(Worker)
修改hadoop的配置参数
- 打开yarn配置页面(每台hadoop节点都需要修改)
1 | vim /export/server/hadoop/etc/hadoop/yarn-site.xml |
- 分发yarn-site.xml到其它服务器节点
1 | scp yarn-site.xml node2:$PWD |
- 启动ZK、HDFS、YARN集群
Flink On Yarn
在企业实际开发中,使用Flink时,更多的使用方式是Flink On Yarn模式,原因如下:
- Yarn的资源可以按需使用,提高集群的资源利用率
- Yarn的任务有优先级,根据优先级运行作业
- 基于Yarn调度系统,能够自动化地处理各个角色的 Failover(容错)
- JobManager 进程和 TaskManager 进程都由 Yarn NodeManager 监控
- 如果 JobManager 进程异常退出,则 Yarn ResourceManager 会重新调度 JobManager 到其他机器
- 如果 TaskManager 进程异常退出,JobManager 会收到消息并重新向 Yarn ResourceManager 申请资源,重新启动 TaskManager
运行机制
Yarn的客户端需要获取hadoop的配置信息,连接Yarn的ResourceManager。所以要有设置有 YARN_CONF_DIR或者HADOOP_CONF_DIR或者HADOOP_CONF_PATH,只要设置了其中一个环境变量,就会被读取。如果读取上述的变量失败了,那么将会选择hadoop_home的环境变量,读取成功将会尝试加载$HADOOP_HOME/etc/hadoop的配置文件。
Session模式
这种模式会预先在yarn或者k8s上启动一个flink集群,然后将作业提交到这个集群上,这种模式,集群中的作业使用相同的资源,如果某一个作业出现了问题导致整个集群挂掉,那就得重启集群中的所有任务,这样就会给集群造成很大的负面影响。
特点:需要事先启动一个flink集群申请资源,提前启动JobManager和TaskManager
优点:不需要每次提交作业都申请资源,而是使用已经申请好的资源,从而执行效率高
缺点:作业每次执行完不会释放资源,因此会一起占用资源
应用场景:比较适合于提交比较频繁的场景,小作业比较多
Per-Job模式(1.15废弃)
考虑到集群的资源隔离情况,一般生产上的任务都会选择per job模式,也就是每个作业启动一个flink集群,各个集群之间独立运行,互不影响,且每个集群可以设置独立的配置。
特点:每次提交作业都需要申请一次资源(每个作业都会创建一个JobManager)
优点:作业运行完成后,资源会被释放
缺点:每次提交都重新申请资源,会影响执行效率
应用场景:适合作业比较少的场景,大作业的情况下
Application模式
flink-1.11引入了一种新的部署模式,即 Application 模式。目前,flink-1.11 已经可以支持基于 Yarn 和 Kubernetes 的 Application 模式。
Session模式:所有作业共享集群资源,隔离性差,JM 负载瓶颈,main 方法在客户端执行。
Per-Job模式:每个作业单独启动集群,隔离性好,JM 负载均衡,main 方法在客户端执行。
通过以上两种模式的特点描述,可以看出,main方法都是在客户端执行,社区考虑到在客户端执行 main() 方法来获取 flink 运行时所需的依赖项,并生成 JobGraph,提交到集群的操作都会在实时平台所在的机器上执行,那么将会给服务器造成很大的压力。尤其在大量用户共享客户端时,问题更加突出。
Application 模式下,用户程序的 main 方法将在集群中而不是客户端运行,用户将程序逻辑和依赖打包进一个可执行的 jar 包里,集群的入口程序 (ApplicationClusterEntryPoint) 负责调用其中的 main 方法来生成 JobGraph。Application 模式为每个提交的应用程序创建一个集群,该集群可以看作是在特定应用程序的作业之间共享的会话集群,并在应用程序完成时终止。
Fink On Yarn三种模式比较
yarn session:需要事先在yarn集群上启动一个flink集群,所有的flink作业共享这个集群,资源利用率高,程序执行效率高,但是程序运行结束不会释放资源,程序之间的隔离性较差,同时main方法运行客户端上,作业提交多
yarn per-job:不需要事先启动flink集群,每提交一个flink作业会启动一个flink集群,作业与作业之间资源互不影响,隔离性较好,作业运行结束会释放资源,同时main方法运行客户端上,作业提交少
yarn application:main方法运行在集群上,其它与per-job相同