news 2026/5/8 15:47:53

flink完整教程

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
flink完整教程

3.1、广播变量

就是一个公共变量,作用范围比全局变量要大!

引入这个的目的,节省内存资源。

实现的步骤

1、创建数据集,并创建为广播变量

2、重写map方法,获取广播变量,然后数据转换一下得到想要的数据

package day04; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.MapOperator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import java.util.ArrayList; public class test { /** * date 2023-03-28 * * arm: 广播变量的用法 */ public static void main(String[] args) { ArrayList<Tuple2<Integer,String>> list = new ArrayList<>(); list.add(new Tuple2<>(1,"张三")); list.add(new Tuple2<>(2,"李四")); list.add(new Tuple2<>(3,"王五")); ArrayList<Tuple3<Integer,String,Integer>> list1 = new ArrayList<>(); list1.add(new Tuple3<>(1, "语文", 50)); list1.add(new Tuple3<>(2, "数学", 70)); list1.add(new Tuple3<>(3, "英文", 86)); ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource<Tuple2<Integer, String>> student = env.fromCollection(list); DataSource<Tuple3<Integer,String,Integer>> score = env.fromCollection(list1); MapOperator<Tuple3<Integer, String, Integer>, Tuple3<String, String, Integer>> result = score.map(new mapFunction()).withBroadcastSet(student, "student"); try { result.print(); } catch (Exception e) { e.printStackTrace(); } } }
package day04; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import java.util.HashMap; import java.util.List; public class mapFunction extends RichMapFunction<Tuple3<Integer,String,Integer>,Tuple3<String,String,Integer> > { static HashMap<Integer, String> map =new HashMap<Integer, String>(); @Override public void open(Configuration parameters) throws Exception { //获取广播变量 List<Tuple2<Integer, String>> lists = getRuntimeContext().getBroadcastVariable("student"); for(int i=0;i<lists.size();i++){ System.out.println(lists.get(i).f0+"--->"+lists.get(i).f1); map.put(lists.get(i).f0,lists.get(i).f1); } } @Override public void close() throws Exception { super.close(); } @Override public Tuple3<String, String, Integer> map(Tuple3<Integer, String, Integer> tups) throws Exception { return new Tuple3(map.get(tups.f0),tups.f1,tups.f2); } }

6.1、flink集群使用yarn per-job提交jar任务

需求

把flink的资源调度管理交给专业的框架yarn框架去做!

怎么做?

  1. 搭建好hadoop集群,启动hdfs集群和yarn集群

  1. 安装好flink集群

  1. 进入flink集群下面,用per-job的方式启动

bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 /home/bigdata/server/jar/productDownLine-1.0-SNAPSHOT.jar

出了一个错误,java.lang.IllegalStateException: No Executor found. Please make sure to export the HADOOP_CLASSPATH environment variable or have hadoop in your classpath. For more information refer to the "Deployment" section of the official Apache Flink documentation.

百度以后的解决方案是

link 1.11开始官方不再维护flink-shaded-hadoop-2-uber jar,可以使用之前版本的shade,社区建议导入hadoop classpath,执行这行代码,不需要改任何东西

执行这个命令 export HADOOP_CLASSPATH=`hadoop classpath`

解决这个问题以后,继续启动刚才的命令

还是报错,但是报的是另一种错误

这个错误的原因是没有指定入口类的地址,如org.example.job.T4301MesDownline

bin/flink run -t yarn-per-job -c org.example.job.T4301MesDownline /home/bigdata/server/jar/productDownLine-1.0-SNAPSHOT.jar

启动成功不报错!我们看看页面的情况吧

查看yarn集群正在运行的任务有哪些?

yarn application -list

杀掉正在进行的任务

yarn application -kill a_id

有一个疑问,当我把flink任务提交给yarn集群进行资源调度管理的时候,

  1. 为什么flink集群里面没有任何任务显示?

因为此时用的是hadoop的yarn的集群,而不是自带的集群!!

  1. Tracking UI根本点不开?为什么?

无法访问含有host主机名的任何链接,但是把主机名改成ip就可以了!!

浏览器的主机没有设置host与ip识别

设置hosts文件可以修改权限

然后添加集群的ip与hostname进去,保存退出

现在可以正常访问了


8.1、zookeeper集群的搭建

一、下载安装zookeeper

http://archive.apache.org/dist/zookeeper/

下载最新版本2.8.1

http://archive.apache.org/dist/zookeeper/zookeeper-3.8.1/

二、上传安装包到服务器上并且解压,重命名

tar -zxvf apache-zookeeper-3.8.1-bin.tar.gz

mv apache-zookeeper-3.8.1-bin zookeeper

三、配置环境变量

vi /etc/profile

保存退出,使得环境变量生效

source /etc/profile

四、修改zoo.cfg文件

cd conf/

cp zoo_sample.cfg zoo.cfg

vi zoo.cfg

dataDir 是存放数据的位置,这个zkData文件夹需要后面创建

server.1=flinka:2888:3888

1序号是代表服务器的编号,后面用到,对应zkData/myid文件内的数字

flinka是hostname

port1: follower与leader交互的port

port2: 选举期间使用的port

保存退出

创建保存数据的文件夹zkData 与conf同一个父级目录

记住,这里的1与flinka 是一一对应的

配置一个好了以后,记得分发给另外两台服务器

并且把/etc/profile也分发给其他服务器,

在flinkb flinkc 服务器中输入 source /etc/profile

在flinkb服务器的zkData中写入2 echo 2 > myid

在flinkc服务器的zkData中写入 echo 3 > myid

去三台服务器上启动这个服务

[root@flinka server]# cd zookeeper/bin/

[root@flinka bin]# ./zkServer.sh start

[root@flinkb server]# cd zookeeper/bin/

[root@flinkb bin]# ./zkServer.sh start

[root@flinkc server]# cd zookeeper/bin/

[root@flinkc bin]# ./zkServer.sh start

查看各个服务器的状态

./zkServer status 你会发现有一个leader,两个follower

到此 zookeeper集群搭建成功!!!


8.2、hadoop3.*集群搭建,小白必看

hadoop广义上讲是一个大数据生态圈,接受大量处理、处理大量数据的一个全套的框架!

hadoop3.x版本以后,主要有三大模块,HDFS、YARN、mapReduce这三大核心组成!

什么是HDFS?

分布式文件系统,hadoop集群的功能类似于三个臭皮匠抵一个诸葛亮,把很多配置低、廉价的服务器组织到一起,协调好发挥出最大的作用。

分布式文件系统就是把存储文件到可用的服务器上,你不用查看计划应该存储到哪个服务器上,HDFS管家帮你规划实现!!

什么是YARN?

简称资源调度框架,流水线的组长,活多的时候分配多一点的人,活少的时候分配少一点的人,不会浪费人力也不会让活积压干不完。 YARN就是流水线的组长,业务流程数据就是活,电脑内存就是工人!

什么是MapReduce?

计算组件,计算处理数据的封装底层的代码,你写代码的时候,调用可以省事情,但是发展很多年弊端很多,逐渐被弃用!

三个服务器的hostname分别是flinka flinkb flinkc flinka是我们集群的主机

flinka--->flinkb

flinka-->flinkc

flinka-->flinka

都做了免密登录,至于怎么实现,自行百度。很简单!!!

创建目录

在/dev/bigdata下创建三个同级别的文件夹

server 安装位置

data 存储数据位置

export 其他

一、下载hadoop、解压hadoop

1.1、配置hdfs-env.sh文件,这里是配置jdk环境变量以及指定各个进程用户名的地方

目录/dev/bigdata/server/hadoop-3.3.0/etc/hadoop

export JAVA_HOME=你服务器jdk安装的目录位置 export HDFS_NAMENODE_USER=root export HDFS_DATANODE_USER=root export HDFS_JOURNALNODE_USER=root export HDFS_ZKFC_USER=root export YARN_RESOURCEMANAGER_USER=root export YARN_NODEMANAGER_USER=root

输入G,然后就到了文件的末尾,复制以上的代码,然后保存退出

1.2、配置core-site.xml文件

初始是这样的

复制以下的代码

<configuration> <!-- 配置分布式文件系统的类型 --> <property> <name>fs.defaultFS</name> <value>hdfs://flinka:8020</value> </property> <!-- 配置hadoop本地保存数据的位置 --> <property> <name>hadoop.tmp.dir</name> <value>/dev/bigdata/data/hadoop</value> </property> <!-- 设置HDFS WEB UI用户身份 --> <property> <name>hadoop.http.staticuser.user</name> <value>root</value> </property> <!-- 整合hive用户代理设置 --> <property> <name>hadoop.proxyuser.root.hosts</name> <value>*</value> </property> <property> <name>hadoop.proxyuser.root.groups</name> <value>*</value> </property> <!-- 删除文件后先放到.Trash目录 --> <property> <name>fs.trash.interval</name> <value>1440</value> <description>单位是分钟,1440/60 = 24 小时,保留一天时间</description> </property> </configuration>

1.3、配置hdfs-site.xml文件,这个是配置hdfs的备份服务器的

<property> <name>dfs.namenode.secondary.http-address</name> <value>flinkb:9868</value> </property>

1.4、配置mapred-site.xml

配置计算程序运行的模式,是yarn还是local

<property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> <property> <name>mapreduce.jobhistory.address</name> <value>flinka:10020</value> </property> <property> <name>mapreduce.jobhistory.webapp.address</name> <value>flinka:19888</value> </property> <property> <name>yarn.app.mapreduce.am.env</name> <value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value> </property> <property> <name>mapreduce.map.env</name> <value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value> </property> <property> <name>mapreduce.reduce.env</name> <value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value> </property>

1.5、yarn-site.xml

<property> <name>yarn.resourcemanager.hostname</name> <value>flinka</value> </property> <!-- Reducer获取数据的方式 --> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.nodemanager.pmem-check-enabled</name> <value>true</value> </property> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>true</value> </property> <property> <name>yarn.log.server.url</name> <value>http://flinka:19888/jobhistory/logs</value> </property> <!--多长时间聚合删除一次日志 此处--> <property> <name>yarn.log-aggregation.retain-seconds</name> <value>2592000</value><!--30 day--> </property>

1.6、works

1.7、配置hadoop的环境变量

vi /etc/profile

保存文件:wq

source /etc/profile

  1. 分发hadoop到其他机器上

scp -r /dev/bigdata root@flinkc:/dev/

scp -r /dev/bigdata root@flinkb:/dev/

  1. 分发环境变量给其他的机器

在其他机器上生效环境变量配置

source /etc/profile

检查是否配置成功

hadoop version

  1. 启动hadoop集群

最重要的命令 初始化NameNode 这个只能在初次启动hadoop集群的时候初始化,后期如果初始化可能会出现数据清除以及集群的节点之间相互不认识,需要重新搭建集群!!

在主机上运行hdfs namenode -format

如果报错的话,说明配置的文件存在问题,根据相应的错误检查一下配置

出现sucessfully formatted 说明初始化完成,只需要在一台master机器上初始化一次就够了!

启动集群 start-dfs.sh

报错

百度了很多,这个答案是对的

删除了/dev/bigdata/data/hadoop hadoop本地数据存储位置下的所有数据,然后把其他机器上的hadoop安装以及数据全部删除!!!

重新分发数据到其他机器上,然后在初始化数据!

再启动start-dfs.sh

通过jps查看进程

flinka 下面有NameNode

这里有一个问题,flinka是主节点,但是里面只有NameNode没有DataNode?

原因,在生成免密登录的密钥的时候,没有对自身分发

ssh-copy-id flinka

回车

flinkb 下面有

flinkc

启动yarn集群

start-yarn.sh

HFDS的网页是 flinka:9870

YARN的网页是flinka:8088

到这里hadoop集群的搭建就结束了!!!!

让我们开始体验大数据的快乐把!


版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/8 15:47:44

保姆级教程:用EMQX 5.0在Windows上为Tasmota设备搭建本地MQTT服务器

零基础Windows搭建EMQX 5.0&#xff1a;本地MQTT服务器全指南 在智能家居DIY领域&#xff0c;摆脱云服务依赖、实现设备完全本地化控制正成为越来越多技术爱好者的首选方案。想象一下&#xff1a;当你用手机控制客厅灯光时&#xff0c;所有指令仅在你的家庭网络内流转&#xff…

作者头像 李华
网站建设 2026/5/8 15:47:41

半导体设备通信的 .NET 解决方案:5分钟掌握 SECS4Net 核心功能

半导体设备通信的 .NET 解决方案&#xff1a;5分钟掌握 SECS4Net 核心功能 【免费下载链接】secs4net SECS-II/HSMS-SS/GEM implementation on .NET 项目地址: https://gitcode.com/gh_mirrors/se/secs4net 在半导体制造领域&#xff0c;设备间的可靠通信是自动化生产线…

作者头像 李华
网站建设 2026/5/8 15:47:38

实用指南:3步让OBS直播画面从普通到专业级特效

实用指南&#xff1a;3步让OBS直播画面从普通到专业级特效 【免费下载链接】obs-StreamFX StreamFX is a plugin for OBS Studio which adds many new effects, filters, sources, transitions and encoders! Be it 3D Transform, Blur, complex Masking, or even custom shade…

作者头像 李华
网站建设 2026/5/8 15:46:44

初创公司如何利用 Taotoken 低成本试验多种大模型

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 初创公司如何利用 Taotoken 低成本试验多种大模型 对于初创公司而言&#xff0c;在有限的资源下找到最适合自身业务需求的大模型&a…

作者头像 李华