3.1、广播变量
就是一个公共变量,作用范围比全局变量要大!
引入这个的目的,节省内存资源。
实现的步骤
1、创建数据集,并创建为广播变量
2、重写map方法,获取广播变量,然后数据转换一下得到想要的数据
package day04; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.MapOperator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import java.util.ArrayList; public class test { /** * date 2023-03-28 * * arm: 广播变量的用法 */ public static void main(String[] args) { ArrayList<Tuple2<Integer,String>> list = new ArrayList<>(); list.add(new Tuple2<>(1,"张三")); list.add(new Tuple2<>(2,"李四")); list.add(new Tuple2<>(3,"王五")); ArrayList<Tuple3<Integer,String,Integer>> list1 = new ArrayList<>(); list1.add(new Tuple3<>(1, "语文", 50)); list1.add(new Tuple3<>(2, "数学", 70)); list1.add(new Tuple3<>(3, "英文", 86)); ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource<Tuple2<Integer, String>> student = env.fromCollection(list); DataSource<Tuple3<Integer,String,Integer>> score = env.fromCollection(list1); MapOperator<Tuple3<Integer, String, Integer>, Tuple3<String, String, Integer>> result = score.map(new mapFunction()).withBroadcastSet(student, "student"); try { result.print(); } catch (Exception e) { e.printStackTrace(); } } }package day04; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import java.util.HashMap; import java.util.List; public class mapFunction extends RichMapFunction<Tuple3<Integer,String,Integer>,Tuple3<String,String,Integer> > { static HashMap<Integer, String> map =new HashMap<Integer, String>(); @Override public void open(Configuration parameters) throws Exception { //获取广播变量 List<Tuple2<Integer, String>> lists = getRuntimeContext().getBroadcastVariable("student"); for(int i=0;i<lists.size();i++){ System.out.println(lists.get(i).f0+"--->"+lists.get(i).f1); map.put(lists.get(i).f0,lists.get(i).f1); } } @Override public void close() throws Exception { super.close(); } @Override public Tuple3<String, String, Integer> map(Tuple3<Integer, String, Integer> tups) throws Exception { return new Tuple3(map.get(tups.f0),tups.f1,tups.f2); } }6.1、flink集群使用yarn per-job提交jar任务
需求
把flink的资源调度管理交给专业的框架yarn框架去做!
怎么做?
搭建好hadoop集群,启动hdfs集群和yarn集群
安装好flink集群
进入flink集群下面,用per-job的方式启动
bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 /home/bigdata/server/jar/productDownLine-1.0-SNAPSHOT.jar出了一个错误,java.lang.IllegalStateException: No Executor found. Please make sure to export the HADOOP_CLASSPATH environment variable or have hadoop in your classpath. For more information refer to the "Deployment" section of the official Apache Flink documentation.
百度以后的解决方案是
link 1.11开始官方不再维护flink-shaded-hadoop-2-uber jar,可以使用之前版本的shade,社区建议导入hadoop classpath,执行这行代码,不需要改任何东西
执行这个命令 export HADOOP_CLASSPATH=`hadoop classpath`
解决这个问题以后,继续启动刚才的命令
还是报错,但是报的是另一种错误
这个错误的原因是没有指定入口类的地址,如org.example.job.T4301MesDownline
bin/flink run -t yarn-per-job -c org.example.job.T4301MesDownline /home/bigdata/server/jar/productDownLine-1.0-SNAPSHOT.jar
启动成功不报错!我们看看页面的情况吧
查看yarn集群正在运行的任务有哪些?
yarn application -list杀掉正在进行的任务
yarn application -kill a_id
有一个疑问,当我把flink任务提交给yarn集群进行资源调度管理的时候,
为什么flink集群里面没有任何任务显示?
因为此时用的是hadoop的yarn的集群,而不是自带的集群!!
Tracking UI根本点不开?为什么?
无法访问含有host主机名的任何链接,但是把主机名改成ip就可以了!!
浏览器的主机没有设置host与ip识别
设置hosts文件可以修改权限
然后添加集群的ip与hostname进去,保存退出
现在可以正常访问了
8.1、zookeeper集群的搭建
一、下载安装zookeeper
http://archive.apache.org/dist/zookeeper/
下载最新版本2.8.1
http://archive.apache.org/dist/zookeeper/zookeeper-3.8.1/
二、上传安装包到服务器上并且解压,重命名
tar -zxvf apache-zookeeper-3.8.1-bin.tar.gz
mv apache-zookeeper-3.8.1-bin zookeeper
三、配置环境变量
vi /etc/profile
保存退出,使得环境变量生效
source /etc/profile
四、修改zoo.cfg文件
cd conf/
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
dataDir 是存放数据的位置,这个zkData文件夹需要后面创建
server.1=flinka:2888:3888
1序号是代表服务器的编号,后面用到,对应zkData/myid文件内的数字
flinka是hostname
port1: follower与leader交互的port
port2: 选举期间使用的port
保存退出
创建保存数据的文件夹zkData 与conf同一个父级目录
记住,这里的1与flinka 是一一对应的
配置一个好了以后,记得分发给另外两台服务器
并且把/etc/profile也分发给其他服务器,
在flinkb flinkc 服务器中输入 source /etc/profile
在flinkb服务器的zkData中写入2 echo 2 > myid
在flinkc服务器的zkData中写入 echo 3 > myid
去三台服务器上启动这个服务
[root@flinka server]# cd zookeeper/bin/
[root@flinka bin]# ./zkServer.sh start
[root@flinkb server]# cd zookeeper/bin/
[root@flinkb bin]# ./zkServer.sh start
[root@flinkc server]# cd zookeeper/bin/
[root@flinkc bin]# ./zkServer.sh start
查看各个服务器的状态
./zkServer status 你会发现有一个leader,两个follower
到此 zookeeper集群搭建成功!!!
8.2、hadoop3.*集群搭建,小白必看
hadoop广义上讲是一个大数据生态圈,接受大量处理、处理大量数据的一个全套的框架!
hadoop3.x版本以后,主要有三大模块,HDFS、YARN、mapReduce这三大核心组成!
什么是HDFS?
分布式文件系统,hadoop集群的功能类似于三个臭皮匠抵一个诸葛亮,把很多配置低、廉价的服务器组织到一起,协调好发挥出最大的作用。
分布式文件系统就是把存储文件到可用的服务器上,你不用查看计划应该存储到哪个服务器上,HDFS管家帮你规划实现!!
什么是YARN?
简称资源调度框架,流水线的组长,活多的时候分配多一点的人,活少的时候分配少一点的人,不会浪费人力也不会让活积压干不完。 YARN就是流水线的组长,业务流程数据就是活,电脑内存就是工人!
什么是MapReduce?
计算组件,计算处理数据的封装底层的代码,你写代码的时候,调用可以省事情,但是发展很多年弊端很多,逐渐被弃用!
三个服务器的hostname分别是flinka flinkb flinkc flinka是我们集群的主机
flinka--->flinkb
flinka-->flinkc
flinka-->flinka
都做了免密登录,至于怎么实现,自行百度。很简单!!!
创建目录
在/dev/bigdata下创建三个同级别的文件夹
server 安装位置
data 存储数据位置
export 其他
一、下载hadoop、解压hadoop
1.1、配置hdfs-env.sh文件,这里是配置jdk环境变量以及指定各个进程用户名的地方
目录/dev/bigdata/server/hadoop-3.3.0/etc/hadoop
export JAVA_HOME=你服务器jdk安装的目录位置 export HDFS_NAMENODE_USER=root export HDFS_DATANODE_USER=root export HDFS_JOURNALNODE_USER=root export HDFS_ZKFC_USER=root export YARN_RESOURCEMANAGER_USER=root export YARN_NODEMANAGER_USER=root输入G,然后就到了文件的末尾,复制以上的代码,然后保存退出
1.2、配置core-site.xml文件
初始是这样的
复制以下的代码
<configuration> <!-- 配置分布式文件系统的类型 --> <property> <name>fs.defaultFS</name> <value>hdfs://flinka:8020</value> </property> <!-- 配置hadoop本地保存数据的位置 --> <property> <name>hadoop.tmp.dir</name> <value>/dev/bigdata/data/hadoop</value> </property> <!-- 设置HDFS WEB UI用户身份 --> <property> <name>hadoop.http.staticuser.user</name> <value>root</value> </property> <!-- 整合hive用户代理设置 --> <property> <name>hadoop.proxyuser.root.hosts</name> <value>*</value> </property> <property> <name>hadoop.proxyuser.root.groups</name> <value>*</value> </property> <!-- 删除文件后先放到.Trash目录 --> <property> <name>fs.trash.interval</name> <value>1440</value> <description>单位是分钟,1440/60 = 24 小时,保留一天时间</description> </property> </configuration>1.3、配置hdfs-site.xml文件,这个是配置hdfs的备份服务器的
<property> <name>dfs.namenode.secondary.http-address</name> <value>flinkb:9868</value> </property>1.4、配置mapred-site.xml
配置计算程序运行的模式,是yarn还是local
<property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> <property> <name>mapreduce.jobhistory.address</name> <value>flinka:10020</value> </property> <property> <name>mapreduce.jobhistory.webapp.address</name> <value>flinka:19888</value> </property> <property> <name>yarn.app.mapreduce.am.env</name> <value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value> </property> <property> <name>mapreduce.map.env</name> <value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value> </property> <property> <name>mapreduce.reduce.env</name> <value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value> </property>1.5、yarn-site.xml
<property> <name>yarn.resourcemanager.hostname</name> <value>flinka</value> </property> <!-- Reducer获取数据的方式 --> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.nodemanager.pmem-check-enabled</name> <value>true</value> </property> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>true</value> </property> <property> <name>yarn.log.server.url</name> <value>http://flinka:19888/jobhistory/logs</value> </property> <!--多长时间聚合删除一次日志 此处--> <property> <name>yarn.log-aggregation.retain-seconds</name> <value>2592000</value><!--30 day--> </property>1.6、works
1.7、配置hadoop的环境变量
vi /etc/profile
保存文件:wq
source /etc/profile
分发hadoop到其他机器上
scp -r /dev/bigdata root@flinkc:/dev/
scp -r /dev/bigdata root@flinkb:/dev/
分发环境变量给其他的机器
在其他机器上生效环境变量配置
source /etc/profile
检查是否配置成功
hadoop version
启动hadoop集群
最重要的命令 初始化NameNode 这个只能在初次启动hadoop集群的时候初始化,后期如果初始化可能会出现数据清除以及集群的节点之间相互不认识,需要重新搭建集群!!
在主机上运行hdfs namenode -format
如果报错的话,说明配置的文件存在问题,根据相应的错误检查一下配置
出现sucessfully formatted 说明初始化完成,只需要在一台master机器上初始化一次就够了!
启动集群 start-dfs.sh
报错
百度了很多,这个答案是对的
删除了/dev/bigdata/data/hadoop hadoop本地数据存储位置下的所有数据,然后把其他机器上的hadoop安装以及数据全部删除!!!
重新分发数据到其他机器上,然后在初始化数据!
再启动start-dfs.sh
通过jps查看进程
flinka 下面有NameNode
这里有一个问题,flinka是主节点,但是里面只有NameNode没有DataNode?
原因,在生成免密登录的密钥的时候,没有对自身分发
ssh-copy-id flinka
回车
flinkb 下面有
flinkc
启动yarn集群
start-yarn.sh
HFDS的网页是 flinka:9870
YARN的网页是flinka:8088
到这里hadoop集群的搭建就结束了!!!!
让我们开始体验大数据的快乐把!