Flink-1.16.0 集群部署(Standalone 和 Flink On Yarn)
- 实操笔记
- 2025-04-09
- 27热度
- 0评论
文章目录[隐藏]
一、Standalone 集群部署
1.集群节点规划
节点IP | 节点名称 | 运行进程 |
192.168.220.30 | hadoop101 | JobManager,TaskManager |
192.168.220.31 | hadoop102 | TaskManager |
192.168.220.32 | hadoop103 | TaskManager |
2.standalone 集群部署
1)hadoop101
虚拟机 /opt/softwares
[root@hadoop101 software]# tar zxvf flink-1.16.0-bin-scala_2.12.tgz -C /opt/apps/
3)配置Master节点
[root@hadoop101 software]# cd /opt/apps/flink-1.16.0/conf/
[root@hadoop101 conf]# vi masters
# 添加如下内容
hadoop101:8081
4)配置Worker节点
[root@hadoop101 conf]# vi workers
# 添加如下内容
hadoop101
hadoop102
hadoop103
flink-conf.yaml
[root@hadoop101 conf]# vi flink-conf.yaml
# 将原文件中对应的属性项修改为如下值:
# Jobmanager 地址
jobmanager.rpc.address: hadoop101
# Jobmanager 地址绑定设置
jobmanager.bind-host: 0.0.0.0
# TaskManager 地址绑定设置
taskmanager.bind-host: 0.0.0.0
# TaskManager 地址(不同 TaskManager 节点 host 配置不同的 host)
taskmanager.host: hadoop101
# 设置每个 TaskManager 的 slot 个数
taskmanager.numberOfTaskSlots: 3
# WEB UI 节点(只需要 JobManager 节点设置,其他 TaskManager 节点无需设置,设置了也没关系)
rest.address: hadoop101
# WEB UI 节点绑定设置(只需在 JobManager 节点设置)
rest.bind-address: 0.0.0.0
hadoop101
节点上的flink安装包分发到hadoop102
、hadoop103
[root@hadoop101 conf]# cd /opt/apps/
# 将 flink 整个安装包分发到 hadoop102、hadoop103两台节点上
[root@hadoop101 apps]# scp -r flink-1.16.0 hadoop102:/opt/apps/
[root@hadoop101 apps]# scp -r flink-1.16.0 hadoop103:/opt/apps/
# 在 hadoop102、hadoop103两台节点上修改 flink-conf.yaml 文件中的 TaskManager
# hadoop102节点
taskmanager.host: hadoop102
# hadoop103节点
taskmanager.host: hadoop103
7)启动Flink集群
# 在 hadoop101 节点中启动 Flink 集群
[root@hadoop101 apps]# cd /opt/apps/flink-1.16.0/bin/
[root@hadoop101 bin]# ./start-cluster.sh
# 输出如下信息:
Starting cluster.
Starting standalonesession daemon on host hadoop101.
Starting taskexecutor daemon on host hadoop101.
Starting taskexecutor daemon on host hadoop102.
8)访问 Flink Web UI 监测页面
http://192.168.220.30:8081
3.提交任务测试
Standalone集群搭建完成后,可以将 Flink 任务提交到 Flink Standalone 集群中运行。
有两种方式提交 Flink 任务,一种是在 Web UI 页面提交 Flink 任务,另一种是通过命令行方式提交。
/opt/apps/flink-1.16.0/examples/batch
目录下的 WordCount.jar
wordcount.txt
[root@hadoop101 flink-1.16.0]# mkdir data
[root@hadoop101 flink-1.16.0]# cd data
[root@hadoop101 data]# vi wordcount.txt
# 将如下信息添加到 wordcount.txt文件中
hadoop flink java
hive spark flink
scala java hadoop
hive hbase mysql
python scala java
# 将创建的data目录分发到其他两台 taskmanager 节点上,否则会报错.
[root@hadoop101 flink-1.16.0]# scp -r data hadoop102:/opt/apps/flink-1.16.0/
[root@hadoop101 flink-1.16.0]# scp -r data hadoop103:/opt/apps/flink-1.16.0/
命令行提交 Flink 任务
[root@hadoop101 flink-1.16.0]# bin/flink run ./examples/streaming/WordCount.jar --input /opt/apps/flink-1.16.0/data --output /opt/apps/flink-1.16.0/result
# 任务执行结果会随机在hadoop102、hadoop103两台节点上其中一台保存
# 查看结果,结果保存在hadoo103上,也可能在hadoop102上
[root@hadoop103 flink-1.16.0]# cat result
flink 2
hadoop 2
hbase 1
hive 2
java 3
mysql 1
python 1
scala 2
spark 1
Web界面提交 Flink 任务
向 Flink 集群提交任务还可以通过 Web UI 方式提交。点击上传 jar 包,进行参数配置并提交任务。
提交任务后,可以通过 Web UI 页面查看提交任务,可以在对应的 TaskManager 节点上看到相应结果。
二、Flink On Yarn(※※重要※※)
Flink On Yarn
1.Flink 不同版本与 Hadoop 整合
Flink 基于 Yarn 提交任务时,需要 Flink 与 Hadoop 进行整合。
-
-
Flink 与 Hadoop 进行整合时,需要在官网上下载对应的 Hadoop 版本的
flink-shaded-hadoop-2-uber-x.x.x-x.x.jar
jar包,然后上传到提交 Flink 任务的客户端对应的$FLINK_HOME/lib
中完成 Flink 与 Hadoop 的整合。 -
在 Flink 1.11 版本之后不再提供任何更新的
flink-shaded-hadoop-x jars
,Flink 与 Hadoop 整合统一基于 Hadoop 2.8.5 编译的 Flink 安装包,支持与 Hadoop-2.8.5及以上 Hadoop 版本(包括Hadoop-3.x.x)整合。 -
在 Flink 1.11 版本后与 Hadoop 整合时还需要配置
HADOOP_CLASSPATH
2.Flink On Yarn 配置及环境准备
Flink 基于 Yarn 提交任务,向 Yarn 集群提交 Flink 任务的客户端需要满足下面两点:
-
-
配置
HADOOP_CLASSPATH
/etc/profile
配置文件中加入以下环境变量:
# vim /etc/profile,加入以下配置
export HADOOP_CLASSPATH=`hadoop classpath`
#source /etc/profile 使环境变量生效
[root@hadoop101 ~]# source /etc/profile
由于基于 Flink On Yarn 模式部署 Flink 时,Flink 集群将作为 YARN 的应用程序运行,所有无需再单独部署 Flink,这里只需要在任意一台虚拟机上安装 Flink 即可,以便后续通过 Flink 提供的脚本文件将 Flink 集群提交到 YARN。
hadoop101
虚拟机上部署 Flink。将Flink的安装包上传到 hadoop101
节点 /opt/software
下并解压:
# 新建一个目录存放 flink 解压包
[root@hadoop101 software]# mkdir /opt/apps/yarn
[root@hadoop101 software]# tar -zxvf ./flink-1.16.0-bin-scala_2.12.tgz -C /opt/apps/yarn
3.提交任务测试
基于 Yarn 运行 Flink 任务只能通过命令行方式进行任务提交,Flink 任务基于 Yarn 运行时有3种任务提交部署模式,这里以 Application 模式来提交任务。步骤如下:
- 启动ZooKeeper集群
#在 hadoop101、hadoop102、hadoop103 节点启动zookeeper
[root@hadoop101 apps]# zkServer.sh start
[root@hadoop102 apps]# zkServer.sh start
[root@hadoop103 apps]# zkServer.sh start
hadoop101
[root@hadoop101 apps]# start-dfs.sh
[root@hadoop101 apps]# start-yarn.sh
-
这里的 Flink 任务还是以读取 Socket 数据做实时 WordCount 任务为例,将打好的
FlinkDataStreamDemo-1.0-SNAPSHOT-jar-with-dependencies.jar
jar包上传到 hadoop101 节点的/opt/apps/yarn/
-
Java 代码如下:
public class SocketWordCount {
public static void main(String[] args) throws Exception {
// 1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.读取 socket 数据
DataStreamSource<String> ds = env.socketTextStream("192.168.220.30", 9999);
// 3.准备k,v格式数据
SingleOutputStreamOperator<Tuple2<String, Integer>> tuplesDS = ds.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {
String[] words = line.split(",");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}).returns(Types.TUPLE(Types.STRING, Types.INT));
// 4.聚合打印结果
tuplesDS.keyBy(tp -> tp.f0).sum(1).print();
// 5.execute 触发执行
env.execute();
}
}
- pom.xml 依赖如下:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.16.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<!-- 设置 false 是去掉 xxx-1.0-SNAPSHOT-jar-dependencies.jar 后的“-jar-dependencies.jar” -->
<!--<appendAssemblyId>false</appendAssemblyId>-->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>cn.datastream.demo.SocketWordCount</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
- 在 hadoop101 节点执行如下命令运行 Flink 作业
[root@hadoop101 ~]# cd /opt/apps/yarn/flink-1.16.0/bin/
# 提交Flink任务
[root@hadoop101 bin]# ./flink run-application -t yarn-application -c cn.datastream.demo.SocketWordCount /opt/apps/yarn/FlinkDataStreamDemo-1.0-SNAPSHOT-jar-with-dependencies.jar
-
Flink 任务 Application 模式提交后,浏览器输入https://192.168.220.30:8088
- 向 hadoop101 scoket 9999 端口输入以下数据并在对应的 Web UI 中查看结果:
#向 hadoop101 socket 9999 端口写入以下数据,如报错:-bash: nc: command not found,则需要安装nc:yum -y install nc
[root@hadoop101 ~]# nc -lk 9999
java,python
java,flink
java,hadoop
flink,python
- 在 Web UI 中找到对应的 Flink TaskManager 节点 Stdout 输出,结果如下: