Flink-1.16.0 集群部署(Standalone 和 Flink On Yarn)

一、Standalone 集群部署

1.集群节点规划

节点IP 节点名称 运行进程
192.168.220.30 hadoop101 JobManager,TaskManager
192.168.220.31 hadoop102 TaskManager
192.168.220.32 hadoop103 TaskManager

2.standalone 集群部署

1)上传安装包到hadoop101虚拟机 /opt/softwares 目录下

2)解压安装包到指定目录/opt/apps

[root@hadoop101 software]# tar zxvf flink-1.16.0-bin-scala_2.12.tgz -C /opt/apps/

3)配置Master节点

[root@hadoop101 software]# cd /opt/apps/flink-1.16.0/conf/
[root@hadoop101 conf]# vi masters
# 添加如下内容
hadoop101:8081

4)配置Worker节点

[root@hadoop101 conf]# vi workers
# 添加如下内容
hadoop101
hadoop102
hadoop103

5)配置flink-conf.yaml文件

[root@hadoop101 conf]# vi flink-conf.yaml
# 将原文件中对应的属性项修改为如下值:

# Jobmanager 地址
jobmanager.rpc.address: hadoop101

# Jobmanager 地址绑定设置
jobmanager.bind-host: 0.0.0.0

# TaskManager 地址绑定设置
taskmanager.bind-host: 0.0.0.0

# TaskManager 地址(不同 TaskManager 节点 host 配置不同的 host)
taskmanager.host: hadoop101

# 设置每个 TaskManager 的 slot 个数
taskmanager.numberOfTaskSlots: 3

# WEB UI 节点(只需要 JobManager 节点设置,其他 TaskManager 节点无需设置,设置了也没关系)
rest.address: hadoop101

# WEB UI 节点绑定设置(只需在 JobManager 节点设置)
rest.bind-address: 0.0.0.0

6)将hadoop101节点上的flink安装包分发到hadoop102hadoop103节点上

[root@hadoop101 conf]# cd /opt/apps/
# 将 flink 整个安装包分发到 hadoop102、hadoop103两台节点上
[root@hadoop101 apps]# scp -r flink-1.16.0 hadoop102:/opt/apps/
[root@hadoop101 apps]# scp -r flink-1.16.0 hadoop103:/opt/apps/

# 在 hadoop102、hadoop103两台节点上修改 flink-conf.yaml 文件中的 TaskManager
# hadoop102节点
taskmanager.host: hadoop102
# hadoop103节点
taskmanager.host: hadoop103

7)启动Flink集群

# 在 hadoop101 节点中启动 Flink 集群
[root@hadoop101 apps]# cd /opt/apps/flink-1.16.0/bin/
[root@hadoop101 bin]# ./start-cluster.sh
# 输出如下信息:
Starting cluster.
Starting standalonesession daemon on host hadoop101.
Starting taskexecutor daemon on host hadoop101.
Starting taskexecutor daemon on host hadoop102.

8)访问 Flink Web UI 监测页面

使用浏览器访问:http://192.168.220.30:8081,访问页面如下:

访问 Flink WEB UI 监测页面

3.提交任务测试

Standalone集群搭建完成后,可以将 Flink 任务提交到 Flink Standalone 集群中运行。

有两种方式提交 Flink 任务,一种是在 Web UI 页面提交 Flink 任务,另一种是通过命令行方式提交。

这里用 Flink 安装包/opt/apps/flink-1.16.0/examples/batch目录下的 WordCount.jar任务提交到 Flink 集群运行。

用模拟生成的数据进行测试。模拟数据如下,将如下数据保存在wordcount.txt中:

[root@hadoop101 flink-1.16.0]# mkdir data
[root@hadoop101 flink-1.16.0]# cd data
[root@hadoop101 data]# vi wordcount.txt
# 将如下信息添加到 wordcount.txt文件中
hadoop flink java
hive spark flink
scala java hadoop
hive hbase mysql
python scala java

# 将创建的data目录分发到其他两台 taskmanager 节点上,否则会报错.
[root@hadoop101 flink-1.16.0]# scp -r data hadoop102:/opt/apps/flink-1.16.0/
[root@hadoop101 flink-1.16.0]# scp -r data hadoop103:/opt/apps/flink-1.16.0/
实际任务会被分发到 taskmanage 的机器中,计算结果根据会保存到 taskmanage 的机器下,不会在 jobmanage 的机器下。 因此需要把所读取的本地文件发送到每台taskmanage机器和jobmanage机器中,否则会报错FileNotFoundException和IOException,比如 does not exist or the user running Flink ('root') has insufficient permissions to access it.

命令行提交 Flink 任务

[root@hadoop101 flink-1.16.0]# bin/flink run ./examples/streaming/WordCount.jar --input /opt/apps/flink-1.16.0/data --output /opt/apps/flink-1.16.0/result

# 任务执行结果会随机在hadoop102、hadoop103两台节点上其中一台保存
# 查看结果,结果保存在hadoo103上,也可能在hadoop102上
[root@hadoop103 flink-1.16.0]# cat result
flink 2
hadoop 2
hbase 1
hive 2
java 3
mysql 1
python 1
scala 2
spark 1

Web界面提交 Flink 任务

向 Flink 集群提交任务还可以通过 Web UI 方式提交。点击上传 jar 包,进行参数配置并提交任务。

Flink Web UI页面提交作业

提交任务后,可以通过 Web UI 页面查看提交任务,可以在对应的 TaskManager 节点上看到相应结果。

通过 Flink Web UI页面查看结果

二、Flink On Yarn(※※重要※※)

Flink 可以基于 Yarn 来执行任务,Yarn 作为资源提供方,可以根据 Flink 任务资源需求动态地启动 TaskManager 来提供计算资源。Flink 基于 Yarn 提交任务通过叫做 Flink On Yarn,Yarn 资源调度框架运行需要有 Hadoop 集群,最低版本是2.8.5。

1.Flink 不同版本与 Hadoop 整合

Flink 基于 Yarn 提交任务时,需要 Flink 与 Hadoop 进行整合。

  • Flink 1.8 版本之前,Flink 与 Hadoop 整合是通过 Flink 官方提供的基于对应 Hadoop 版本编译的安装包来实现,比如:flink-1.7.2-bin-hadoop24-scala_2.11.tgz,在 Flink 1.8 版本之后不再支持基于不同 Hadoop 版本的编译安装包。

  • Flink 与 Hadoop 进行整合时,需要在官网上下载对应的 Hadoop 版本的 flink-shaded-hadoop-2-uber-x.x.x-x.x.jarjar包,然后上传到提交 Flink 任务的客户端对应的 $FLINK_HOME/lib 中完成 Flink 与 Hadoop 的整合。

  • 在 Flink 1.11 版本之后不再提供任何更新的 flink-shaded-hadoop-x jars ,Flink 与 Hadoop 整合统一基于 Hadoop 2.8.5 编译的 Flink 安装包,支持与 Hadoop-2.8.5及以上 Hadoop 版本(包括Hadoop-3.x.x)整合。

  • 在 Flink 1.11 版本后与 Hadoop 整合时还需要配置 HADOOP_CLASSPATH 环境变量来完成对 Hadoop 的支持。通过设置HADOOP_CLASSPATH环境变量来与Hadoop集群进行集成。

2.Flink On Yarn 配置及环境准备

Flink 基于 Yarn 提交任务,向 Yarn 集群提交 Flink 任务的客户端需要满足下面两点:

  • 需要安装 Hadoop-2.8.5+版本的Hadoop;

  • 配置HADOOP_CLASSPATH环境变量

需要在安装 Hadoop-2.8.5 版本主机 /etc/profile 配置文件中加入以下环境变量:

# vim /etc/profile,加入以下配置
export HADOOP_CLASSPATH=`hadoop classpath`

#source /etc/profile 使环境变量生效
[root@hadoop101 ~]# source /etc/profile

由于基于 Flink On Yarn 模式部署 Flink 时,Flink 集群将作为 YARN 的应用程序运行,所有无需再单独部署 Flink,这里只需要在任意一台虚拟机上安装 Flink 即可,以便后续通过 Flink 提供的脚本文件将 Flink 集群提交到 YARN。

这里选择在 hadoop101 虚拟机上部署 Flink。将Flink的安装包上传到 hadoop101 节点 /opt/software 下并解压:

# 新建一个目录存放 flink 解压包
[root@hadoop101 software]# mkdir /opt/apps/yarn
[root@hadoop101 software]# tar -zxvf ./flink-1.16.0-bin-scala_2.12.tgz -C /opt/apps/yarn

3.提交任务测试

基于 Yarn 运行 Flink 任务只能通过命令行方式进行任务提交,Flink 任务基于 Yarn 运行时有3种任务提交部署模式,这里以 Application 模式来提交任务。步骤如下:

  • 启动ZooKeeper集群
#在 hadoop101、hadoop102、hadoop103 节点启动zookeeper
[root@hadoop101 apps]# zkServer.sh start
[root@hadoop102 apps]# zkServer.sh start
[root@hadoop103 apps]# zkServer.sh start
  • 启动HDFS集群:在 hadoop101 虚拟机上启动 HDFS
[root@hadoop101 apps]# start-dfs.sh
[root@hadoop101 apps]# start-yarn.sh
  • 将 Flink 任务对应的 jar 包上传到 hadoop101 节点

    这里的 Flink 任务还是以读取 Socket 数据做实时 WordCount 任务为例,将打好的 FlinkDataStreamDemo-1.0-SNAPSHOT-jar-with-dependencies.jar jar包上传到 hadoop101 节点的 /opt/apps/yarn/ 目录下。

  • Java 代码如下:

public class SocketWordCount {
    public static void main(String[] args) throws Exception {
        // 1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2.读取 socket 数据
        DataStreamSource<String> ds = env.socketTextStream("192.168.220.30", 9999);
        // 3.准备k,v格式数据
        SingleOutputStreamOperator<Tuple2<String, Integer>> tuplesDS = ds.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {
            String[] words = line.split(",");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.INT));
        // 4.聚合打印结果
        tuplesDS.keyBy(tp -> tp.f0).sum(1).print();
        // 5.execute 触发执行
        env.execute();
    }
}
  • pom.xml 依赖如下:
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>1.16.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>1.16.0</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>2.6</version>
            <configuration>
               <!-- 设置 false 是去掉 xxx-1.0-SNAPSHOT-jar-dependencies.jar 后的“-jar-dependencies.jar” -->
               <!--<appendAssemblyId>false</appendAssemblyId>-->
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <mainClass>cn.datastream.demo.SocketWordCount</mainClass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>assembly</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>8</source>
                <target>8</target>
            </configuration>
        </plugin>
    </plugins>
</build>
  • 在 hadoop101 节点执行如下命令运行 Flink 作业
[root@hadoop101 ~]# cd /opt/apps/yarn/flink-1.16.0/bin/

# 提交Flink任务
[root@hadoop101 bin]# ./flink run-application -t yarn-application -c cn.datastream.demo.SocketWordCount /opt/apps/yarn/FlinkDataStreamDemo-1.0-SNAPSHOT-jar-with-dependencies.jar
  • 查看 Web UI 及运行结果

    Flink 任务 Application 模式提交后,浏览器输入https://192.168.220.30:8088 登录 Yarn Web UI,找到提交的任务,点击对应的 Tracking UI"ApplicationMaster" 进入到 Flink Web UI 任务页面。

Flink Application 提交作业到Yarn

Flink 作业监测页面

  • 向 hadoop101 scoket 9999 端口输入以下数据并在对应的 Web UI 中查看结果:
#向 hadoop101 socket 9999 端口写入以下数据,如报错:-bash: nc: command not found,则需要安装nc:yum -y install nc
[root@hadoop101 ~]# nc -lk 9999
java,python 
java,flink
java,hadoop
flink,python
  • 在 Web UI 中找到对应的 Flink TaskManager 节点 Stdout 输出,结果如下:

taskmanager运行状态