Flink的安装部署的几种模式详解

Flink的安装部署:Local本地模式|Standalone独立集群模式|Standalone-HA高可用集群模式|Flink On Yarn模式**
**我这里电脑上有三台安装好的虚拟机分别是node1、node2、node3

Local本地模式:
** 原理:主节点JobManager(Master)和从节点TaskManager(Slave)在一台机器上模拟**

1
2
3
4
5
6
7
8
9
10
11
1、Flink程序由JobClient进行提交

2、JobClient将任务提交给JobManager

3、JobManager只负责协调分配资源和分发任务,资源分配完成后将任务提交给相应的TaskManager

4、TaskManager启动一个线程开始执行任务,TaskManager会向JobManager报告状态的变更,

例如:开始执行、正在执行、执行完成

5、作业执行完成后,结果将发送回客户端(JobClient)

具体部署:

  1. 下载安装包https://archive.apache.org/dist/flink/,下面以flink-1.12.0-bin-scala_2.12.tgz安装包为例说明

  2. 上传flink-1.12.0-bin-scala_2.12.tgz安装包到node1指定路径/export/software/下面

  3. 解压安装包到 /export/server/下面:tar -zxvf flink-1.12.0-bin-scala_2.12.tgz -C /export/server/

  4. 如果出现权限的问题需要修改权限:chown -R root:root /export/server/flink-1.12.0/

  5. 可以对其修改名称或者添加软连接方便后面操作:

    1
    mv flink-1.12.0 flink    |     ln -s /export/server/flink-1.12.0 /export/server/flink

测试使用:

  1. 准备测试文件
  2. 启动Fink本地集群(这里说的集群,不是真正意义上的集群,而是在一台虚拟机上模拟的集群)
  3. jps会看到启动了一个主节点和一个从节点
  4. 访问Flink的WEB UI:http://node1:8081/#/overview
  5. 执行官方示例
1
2
3
4
5
6
 /export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar
(可以只执行上面这个,也可以加上下面的参数)

--input /root/words.txt

--output /root/out
  1. 停止Flink:/export/server/flink/bin/stop-cluster.sh

Standalone独立集群模式:

** 原理:node1作为主节点JobManager(master),node1、node2、node3作为从节点TaskManager(Slave)**

1
2
3
4
5
6
7
1、client客户端提交任务给JobManager

2、JobManager负责申请分配任务所需资源并管理任务任务和资源

3、JobManager分发任务给TaskManager执行

4、TaskManager定期向JobManager汇报状态

具体部署:node1作为主节点JobManager(master),node1、node2、node3作为从节点TaskManager(Slave)

1、修改node1节点上/export/server/flink/conf/flink-conf.yaml

** **vim flink-conf.yaml

** **jobmanager.rpc.address: node1 #指定主节点的服务器名称

** **taskmanager.numberOfTaskSlots: 2 #指定每个从节点上有几个slot

** **web.submit.enable: true #是否可以web提交

** **#历史服务器

** **jobmanager.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/ #指定任务归档的路径

** **historyserver.web.address: node1 #指定历史服务器的名称

** **historyserver.web.port: 8082 #指定历史服务器的web端口

** **historyserver.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/ #指定历史服务器获取历史任务的路径

** **注意:冒号后面的空格不能删除

2、修改主节点配置:vim /export/server/flink/conf/masters node1:8081 #指定主节点服务器及web端口

3、修改从节点配置: vim /export/server/flink/conf/workers

** **node1

** node2**

** node3**

4、将node1上的Flink分发给node2和node3,并添加软连接(参照上面的命令)

1
2
3
cd /export/server
scp -r flink-1.12.0 node2:$PWD
scp -r flink-1.12.0 node3:$PWD

测试使用:

** 1、启动集群**

** 在node1上执行:/export/server/flink/bin/start-cluster.sh**

** **

** ** jps查看node1

** ** jps查看node2

** ** jps查看node3

或者单独启动:

1
2
3
/export/server/flink/bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all

/export/server/flink/bin/taskmanager.sh start|start-foreground|stop|stop-all

2、启动历史服务器

/export/server/flink/bin/historyserver.sh start

启动历史任务可能会失败:

**查看log **cd /export/server/flink/log:cat flink-root-historyserver-0-node1.log

img

  1. 配置的任务任务归档路径以及历史任务查看路径没有创建,需要手动创建 hdfs dfs -mkdir -p /flink/completed-jobs/
  2. 找不到hdfs:
    vim /etc/profile

** 添加:export HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop**

** source /etc/profile(分发给node2和node3后都要执行这个命令)**

** node1执行jps**

** **img

** 3、访问Flink UI界面**

** **http://node1:8081/#/overview

** **http://node1:8082/#/overview

TaskManager界面:可以查看到当前Flink集群中有多少个TaskManager,每个TaskManager的slots、内存、CPU Core是多少

img

4、测试执行官方示例

1
2
3
4
5

/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar
--input hdfs://node1:8020/wordcount/input/words.txt
--output hdfs://node1:8020/wordcount/output/result.txt  
--parallelism 2

5、查看历史日志

1
2
3
4

http://node1:50070/explorer.html#/flink/completed-jobs

http://node1:8082/#/overview
  1. 停止Flink集群:
1
2
3
/export/server/flink/bin/stop-cluster.sh

/export/server/flink/bin/historyserver.sh stop

Standalone-HA高可用集群模式

** 原理:**

Standalone只有一个主节点,由于主节点负责调度和分配资源,如果主节点出现意外情况(例如宕机),集群就死掉了

但是Standalone-HA高可用集群会有多个主节点,他们都会想zookeeper注册并创建一个临时节点

(一个处于active活跃(工作状态)状态,其他处于standby状态),

如果活跃状态的主机出现意外和zookeeper心跳检测超时,临时节点就会被删除,

zookeeper就会从处于standby状态的节点选一个新的主节点,删除standby的临时节点添加active的临时节点

img

**具体部署: **

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
node1和node2都是主节点JobManager(master),但是有一个是active状态一个是standby状态

node1、node2、node3又都是从节点TaskManager(Slave)

1、首先要启动zookeeper:zkServer.sh start

2、启动hdfs:/export/serves/hadoop/sbin/start-dfs.sh

3、修改node1下的flink-conf.yaml

vim /export/server/flink/conf/flink-conf.yaml
增加下面内容:
  #开启HA,使用文件系统作为快照存储
  state.backend: filesystem
  #启用检查点,可以将快照保存到HDFS
  state.backend.fs.checkpointdir: hdfs://node1:8020/flink-checkpoints
  #使用zookeeper搭建高可用
  high-availability: zookeeper
  #存储JobManager的元数据到HDFS
  high-availability.storageDir: hdfs://node1:8020/flink/ha/
  #配置ZK集群地址
  high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
分发到node2和node3
scp -r /export/server/flink/conf/flink-conf.yaml node2:/export/server/flink/conf/
scp -r /export/server/flink/conf/flink-conf.yaml node3:/export/server/flink/conf/

4、修改masters:vim /export/server/flink/conf/masters(现在node1和node2都是主节点)
  node1:8081
  node2:8081
  分发给node1和node2
  scp -r /export/server/flink/conf/masters node2:/export/server/flink/conf/
  scp -r /export/server/flink/conf/masters node3:/export/server/flink/conf/
5、修改node2上的flink-conf.yaml
vim /export/server/flink/conf/flink-conf.yaml
jobmanager.rpc.address: node2     #因为现在node2也是主节点
6、启动集群
  /export/server/flink/bin/stop-cluster.sh   #停掉集群
  /export/server/flink/bin/start-cluster.sh   #启动集群

** **img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
测试使用:

1、访问WEB UI

http://node1:8081/#/job-manager/config

http://node2:8081/#/job-manager/config

2、测试官方示例

/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar

3、测试kill掉一个master(因为我们不知道哪个master是active状态,所有随便停掉一次测试)

/export/server/flink/bin/jobmanager.sh start|stop

/export/server/flink/bin/taskmanager.sh start||stop

4、重新执行步骤2测试能不能正常执行

5、停掉集群:/export/server/flink/bin/stop-cluster.sh

** 原理:**

** **1、Client上传Flink的jar包和配置文件到HDFS集群上

** **2、Client向Yarn的ResourceManager提交任务和申请资源

** **3、ResourceManager分配Container资源并启动ApplicationMaster

** **4、ApplicationMaster加载Flink的jar包和配置文件构建环境启动Flink-JobManager

** **5、ApplicationMaster向ResourceManager申请任务资源,

** **6、NodeManager加载Flink的jar包和配置文件构建环境并启动TaskManager

** **7、TaskManager启动后会向JobManager发送心跳,并等待JobManager向其分配任务

** **img

Session模式:(适合小任务使用)

1
2
3
4
5
需要先申请资源,启动JobManager和TaskManager

不需要每次提交任务再去申请资源,而是使用已经申请好的资源,从而提高执行效率

任务提交完资源不会被释放,因此一直会占用资源

img

Per-Job模式:(适合使用大任务,且资源充足)

1
2
3
每次提交任务都需要去申请资源,申请资源需要时间,所有影响执行效率(但是在大数据面前都是小事)

每次执行完任务资源就会立刻被释放,不会占用资源

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
具体部署:

1、关闭yarn的内存检查:vim /export/server/hadoop/etc/hadoop/yarn-site.xml

添加:

<!-- 关闭yarn内存检查 -->

<property>

<name>yarn.nodemanager.pmem-check-enabled</name>

<value>false</value>

</property>

<property>

<name>yarn.nodemanager.vmem-check-enabled</name>

<value>false</value>

</property>

2、分发

scp -r /export/server/hadoop/etc/hadoop/yarn-site.xml node2:/export/server/hadoop/etc/hadoop/yarn-site.xml

scp -r /export/server/hadoop/etc/hadoop/yarn-site.xml node3:/export/server/hadoop/etc/hadoop/yarn-site.xml

3.重启yarn

/export/server/hadoop/sbin/stop-yarn.sh

/export/server/hadoop/sbin/start-yarn.sh

测试示例:

yarn-session.sh(开辟资源)+ flink run(提交任务)
1、在yarn上启动一个flink会话

/export/server/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d

2、说明

-n 表示申请两个容器,这里指的是TaskManager

-tm 表示每个TaskManager的内存发小

-s   表示每个TaskManager中slot的数量

-d   表示后台程序方式运行(守护进程)

注意:

该警告不用管

WARN org.apache.hadoop.hdfs.DFSClient - Caught exception

java.lang.InterruptedException

3、查看UI界面:http://node1:8088/cluster

img

1
2
3
4
5
6
7
8
9
10
11
4、使用flink run提交任务

/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar

运行完之后可以继续运行其他的小任务

/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar

5、关闭yarn-session:(http://node1:8088/cluster里面查看id并杀死)

yarn application -kill application_1618886145038_0001

Per-Job分离模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

1、直接提交Job:

/export/server/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 /export/server/flink/examples/batch/WordCount.jar

2、说明:

-m   JobManager的地址

-yjm 1024 指定JobManager的内存大小

-yjm 1024 指定TaskManager的内存大小

注意:

在之前版本中如果使用的是flink on yarn方式,想切换回standalone模式的话,

如果报错需要删除:【/tmp/.yarn-properties-root】

rm -rf /tmp/.yarn-properties-root

为什么大数据任务执行都是提交给Yarn,MR、Spark、Flink都是提交给Yarn???

** **1、yarn在大数据领域中是一个非常成熟稳定并且功能强大的资源管理和任务执行调度框架

大多数公司之前大数据任务都是使用yarn来做调度执行管理的,所以后续的大数据计算框架想要占领市场份额就要去支持On Yarn

** **2、yarn支持多种调度策略(比如FIFO,Fair,Capactity…),比较灵活

** **3、yarn做统一的资源管理

** **4、yarn的资源可以按需使用,提高集群的资源利用率

** **5、yarn的任务有优先级,根据优先级执行任务

** **6、基于yarn调度系统,能够自动化的处理各个角色的Fairover(容错)

JobManager进程和TaskManager进程由Yarn的NodeManager监控

如果JobManager进程出现异常,Yarn的ResourceManager会重新调度JobManager到其他的机器

如果TaskManager进程出现异常,JobManager会收到消息并重新向Yarn的ResourceManager申请资源,重新启动TaskManager

注意:

未来云环境会越来越流行,向Spark On K8S 和 Flink On K8S可能也会流行

Spark|Flink On Yarn不需要单独启动Spark|Flink的Standalone集群,因为Spark|Flink On Yarn的本质是将Spark|Flink任务的jar包放到Yarn的JVM中去执行,是在Yarn集群中启动的一些JVM进程(Spark|Flink的一些角色)