使用CDH6.3.2安装了hadoop集群,但是CDH不支持flink的安装,网上有CDH集成flink的文章,大都比较麻烦;但其实我们只需要把flink的作业提交到yarn集群即可,接下来以CDH yarn为基础,flink on yarn模式的配置步骤。

一、部署flink
1、下载解压

官方下载地址:Downloads | Apache Flink

注意:CDH6.3.2是使用的scala版本是2.11(可以去CHD中spark目录lib下,看一下scala版本),所以下载的flink也要scala_2.11版本的。

2、解压

cd /data/softs tar -zxvf flink-1.14.5-bin-scala_2.11.tgz

#修改名称

mv softs/flink-1.14.5 /data/flink-yarn

3、修改flink配置

vim conf/flink-conf.yaml

#配置java环境变量

env.java.home: /usr/local/jdk1.8.0_281/

#以下为高可用配置

yarn.application-attempts: 3
high-availability: zookeeper
high-availability.storageDir: hdfs://master1:8020/flink/yarn/ha
high-availability.zookeeper.quorum: master1:2181,node1:2181,node2:2181
high-availability.zookeeper.path.root: /flink-yarn
high-availability.cluster-id: /cluster_flink_yarn

4、修改操作用户(针对以session模式启动flink)

vim bin/yarn-session.sh

#操作hdfs的用户

export HADOOP_USER_NAME=hdfs

5、分发到其它节点

将配置好的flink分发到其它两个节点(我的集群是三个节点)

scp -r flink-yarn node1:/data/

scp -r flink-yarn node2:/data/

6、配置全局环境变量

想要让 Flink 服务运行与 YARN 之上,首先需要让 Flink 能够发现 YARN 和 HDFS 的相关配置,因此,需要通过HADOOP_CLASSPATH、HADOOP_CONF_DIR 属性来指定 Hadoop 配置文件所在目录;

因此需要在各个节点配置这两个属性的去全局变量。

vim /etc/profile

#添加如下两行

export HADOOP_CLASSPATH=`hadoop classpath`

export HADOOP_CONF_DIR=/etc/hadoop/conf.cloudera.yarn/

#刷新

source /etc/profile

7、设置归属用户

因为flink需要将作业提交到yarn集群上,即需要访问或者操作hadoop集群,所以需要有hdfs用户的权限(CDH集群默认hdfs用户有操作hadoop的权限),所以要将flink的归属用户设置为hdfs,且后续都必须用hdfs用户提交flink的作业。在各个节点执行如下操作:

chown -R hdfs:hdfs flink-yarn

二、提交flink作业
1、上传作业jar包

这里使用的是一个单词统计的jar包,使用时需要传入一个服务器IP作为监听的对象

rz flink-on-k8s-demo-1.0-SNAPSHOT.jar

2、在被监听服务器上发送消息

#在172.16.12.103 这台服务器上执行,并输入单词

nc -lk 7777

3、使用application模式启动flink作业

./bin/flink run-application -t yarn-application \#指定flink作业的启动方式
-c com.yale.StreamWordCount \ #指定程序的入口类
../softs/flink-on-k8s-demo-1.0-SNAPSHOT.jar \ #程序jar包
172.16.12.103#入参(被监听的服务器IP)

4、查看作业执行情况

打开yarn的webUI

可以看到一个正在运行的任务,点击 applicationId 进去,可以看到有两个容器,

点击logs进去

再点击taskmanager.out,可以看到单词统计的结果,说明成功了!!

三、遇到的问题
1、org.apache.flink.client.deployment.ClusterDeploymentException

答:flink的scala版本和CDH的scala版本不一致,将flink换成scala_2.11版本。

2、Please specify JAVA_HOME. Either in Flink config ./conf/flink-conf.yaml or as system-wide JAVA_HOME

答:在flink-conf.yaml文件中添加env.java.home属性指定java home。