目录
第一部分 Hadoop入门
第二部分 HDFS
第三部分 mapreduce框架原理(非常重要)
Hadoop重点
入门:Hadoop组成
HDFS:HDFS的shell操作、HDFS的数据流
MapReduce:Shuffle机制(出现数据倾斜)、YARN工作机制
第一部分 Hadoop入门
Hadoop: Apache基金会的 分布式基础架构
解决海量数据存储 和 分析计算 问题
MySQL处理极限数据量在百万级别数据查询,大概1TB量
Hadoop:存:HDFS; 算:MapReduce
hadoop解耦后组织形式:
hadoop的组成:
1.x:
MapReduce: 计算 + 资源调度
HDFS: 数据存储
2.x: 解耦,实现计算模块可插拔
MapReduce: 计算
Yarn: 资源调度
HDFS:数据存储
hadoop四大优点:
高可靠性(底层维护多个数据副本)
高扩展性:集群间分配数据,方便扩展数以千计的节点
高效性:在mp思想下,hadoop是并行工作的
高容错性:自动将失败任务重新分配
HDFS
1)NameNode(nn):目录索引,整个系统只有一份
存储文件的元数据,如文件名,文件目录结构,文件属性,以及每个文件的块列表和块所在的DataNode等
2)DataNode(dn):移动硬盘们
在本地文件系统存储文件块数据,以及块数据校验和
3)Secondary NameNode(2nn):是nn的助手,不是单纯的热备!!一般和nn不在一个节点上。
用来监控HDFS状态的辅助后台程序
目的是帮助nn合并编辑日志,减少nn的启动时间
“最关键的是nn,全HDFS就一份,像是所有DataNode的索引”
“nn和2nn之间的关系有点像手术中主刀医生和递剪子的医生,可以辅助帮助,但当nn崩溃了后可以替补,但效果不好”
Yarn
YARN架构:调度内存和cpu这种算力资源
RM:大组长
NM:干活组员
AM:组长安排的工作小组长(非 常驻进程,有job才有) Container:被NM打开、关闭以实现调度所用资源(非 常驻进程)
1)ResourceManager(RM): 集群中只有一个,代表所有资源,组长
(1)处理客户端请求job submission
(2)监控NodeManager
(3)启动或监控ApplicationMaster
(4)资源分配与调度
2)NodeManager(NM): 组员
(1)管理单个节点上的资源
(2)处理来自ResourceManager的命令
(3)处理来自ApplicationMaster的命令
3)ApplicationMaster(AM): 临时负责人,不是常驻进程,有一个job就有一个AM
(1)负责数据切分
(2)为应用程序申请资源并分配给内部的任务
(3)任务的监控与容错
4)Container: 运行所有任务的容器,NM通过生成/关闭容器来调配资源,是资源分配的单元,不是常驻进程
是Yarn中资源的抽象,封装某节点多维度资源,如内存、CPU、磁盘、网络等
MapReduce
分为两个阶段:Map阶段并行处理输入数据;Reduce阶段对Map结果进行汇总
大数据技术生态体系
大数据部门组织结构
集群环境搭建
快照
快照类似游戏存档
一定要在关机状态拍快照,这样只会保存硬盘的状态。
不照开机时的快照是因为会保存内存内容,很占内存空间,而且没法由此克隆虚拟机。
卸载JDK命令
rpm -qa | grep java | xargs sudo rpm -e --nodeps
hadoop 重要目录
share文件夹:
hadoop 全部内容、本体基本都放在hadoop目录下的share文件夹,hadoop的jar包基本上也都放在share文件夹。lib文件夹:
hadoop目录下的lib文件夹中有个native文件夹,存放的是本地库文件,用于hadoop编译,运行时依赖于这些本地库,速度更快。找不到时才会使用java的内建版本,但运行慢。
编写集群分发脚本xsync
rsync 远程同步工具
rsync主要用于备份和镜像。具有速度快、避免复制相同内容和支持符号链接的优点。
rsync和scp区别:用rsync做文件的复制要比scp的速度快,rsync只对差异文件做更新。scp是把所有文件都复制过去。
(1)基本语法rsync -arvl $pdir/$fname $user@hadoop$host:$pdir/$fname
命令 选项参数 要拷贝的文件路径/名称 目的用户@主机:目的路径/名称
选项参数说明1
2
3
4
5选项 功能
-r 递归
-a 归档复制
-v 显示复制过程
-l 拷贝符号连接
(2)案例实操
(a)把hadoop100机器上的/opt/software目录同步到hadoop101服务器的root用户下的/opt/目录[hadoop@hadoop100 opt]$ rsync -arvl /opt/software/ root@hadoop101:/opt/software
xsync集群分发脚本
(1)需求:循环复制文件到所有节点的相同目录下
(2)需求分析:
(a)rsync命令原始拷贝:
rsync -arvl /opt/module root@hadoop102:/opt/
(b)期望脚本:
xsync要同步的文件名称
(3)脚本实现
(a)在/home/hadoop目录下创建bin目录,并在bin目录下xsync创建文件,文件内容如下:1
2
3
4[hadoop@hadoop101 ~]$ mkdir bin
[hadoop@hadoop101 ~]$ cd bin/
[hadoop@hadoop101 bin]$ touch xsync
[hadoop@hadoop101 bin]$ vi xsync
在该文件中编写如下代码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25#!/bin/bash
#1 获取输入参数个数,如果没有参数,直接退出
pcount=$#
if((pcount==0)); then
echo no args;
exit;
fi
#2 获取文件名称
p1=$1
fname=`basename $p1`
echo fname=$fname
#3 获取上级目录到绝对路径
pdir=`cd -P $(dirname $p1); pwd`
echo pdir=$pdir
#4 获取当前用户名称
user=`whoami`
#5 循环
for((host=101; host<109; host++)); do
echo ------------------- hadoop$host --------------
rsync -rvl $pdir/$fname $user@hadoop$host:$pdir
done
(b)修改脚本 xsync 具有执行权限
[hadoop@hadoop101 bin]$ chmod 777 xsync
(c)调用脚本形式:xsync 文件名称
[hadoop@hadoop101 bin]$ xsync /home/hadoop/bin
hadoop 运行模式
本地模式(Standalone Operation):用于debug,测试用
伪分布式(Pseudo-Distributed Operation):只有一个节点的分布式
完全分布式(Fully-Distributed Operation):有多个节点的分布式,实际开发的环境,以下内容全为完全分布式配置:
集群部署规划如下所示:1
2
3hadoop101:NN,DN,NM
hadoop102:DN,RM,DM
hadoop103:DN,NM,2NN,JobHistoryServer
须修改的配置文件如下:3个.sh
、4 个.xml
、1个slaves文件
core-site.xml:1
2
3
4
5
6
7
8
9
10
11<!-- 指定HDFS中NameNode的地址 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop101:9000</value>
</property>
<!-- 指定Hadoop运行时产生文件的存储目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/module/hadoop-2.7.2/data/tmp</value>
</property>
hadoop-env.sh:export JAVA_HOME=/opt/module/jdk1.8.0_144
hdfs-site.xml:1
2
3
4
5
6
7
8
9
10<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<!-- 指定Hadoop辅助名称节点主机配置 -->
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>hadoop103:50090</value>
</property>
yarn-env.sh:export JAVA_HOME=/opt/module/jdk1.8.0_144
yarn-site.xml:1
2
3
4
5
6
7
8
9
10
11<!-- Reducer获取数据的方式 -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!-- 指定YARN的ResourceManager的地址 -->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hadoop102</value>
</property>
mapred-env.sh:export JAVA_HOME=/opt/module/jdk1.8.0_144
mapred-site.xml:
首先复制mapred-site.xml.template为mapred-site.xmlcp mapred-site.xml.template mapred-site.xml
然后1
2
3
4
5<!-- 指定MR运行在Yarn上 -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
完全分布式下 启动和关闭 hdfs和yarn服务:
首先:
若集群是第一次启动,需要格式化NN
在hadoop101上,执行hadoop namenode -format
其次:
NN(hadoop101):起:start-dfs.sh
关:stop-dfs.sh
RM(hadoop102):起:start-yarn.sh
关:stop-yarn.sh
完全分布式下执行wordcount程序:
首先:在wcinput目录下
下新建wc.input
文件,其中写入需要统计单词的文本,并上传至集群中mkdir wcinput
cd wcinput
vim wc.input
hadoop fs -put wcinput /
其次:运行hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar wordcount /wcinput /output
(输出文件夹一定要不存在)
最后:查看输出结果hadoop fs -cat /output/part-r-00000
免密登录ssh
免密登录原理:非对称加密rsa算法,公钥加密,私钥解密。
首先:在hadoop101上:
(1)生成公钥和私钥:ssh-keygen -t rsa
然后敲(三个回车),就会生成两个文件id_rsa(私钥)、id_rsa.pub(公钥)
(2)将公钥拷贝到要免密登录的目标机器上ssh-copy-id hadoop101
ssh-copy-id hadoop102
ssh-copy-id hadoop103
其次:在haoop102和hadoop103上也执行上述操作
或者在学习过程中,用集群分发脚本直接复制分发.ssh文件xsync ~/.ssh
!!但不能在开发和工作环境中执行此操作,会破坏安全性!!
另:ssh文件夹下(~/.ssh)的文件功能解释known_hosts
记录ssh访问过计算机的公钥(public key)id_rsa
生成的私钥id_rsa.pub
生成的公钥authorized_keys
存放授权过得无密登录服务器公钥
/etc/profile 和/etc/bashrc的区别
群起集群
(1)配置slaves文件
hadoop101的slaves:1
2
3hadoop101
hadoop102
hadoop103
同步所有节点配置文件[hadoop@hadoop101 hadoop]$ xsync slaves
(2)启动集群
- 如果集群是第一次启动,需要格式化NameNode(注意格式化之前,一定要先停止上次启动的所有namenode和datanode进程,然后再删除data和log数据)
[hadoop@hadoop101 hadoop-2.7.2]$ bin/hdfs namenode -format
- 启动HDFS
1
2
3
4
5
6
7
8
9
10
11
12
13[hadoop@hadoop101 hadoop-2.7.2]$ sbin/start-dfs.sh
[hadoop@hadoop101 hadoop-2.7.2]$ jps
4166 NameNode
4482 Jps
4263 DataNode
[hadoop@hadoop102 hadoop-2.7.2]$ jps
3218 DataNode
3288 Jps
[hadoop@hadoop103 hadoop-2.7.2]$ jps
3221 DataNode
3283 SecondaryNameNode
3364 Jps 启动YARN
注意:NameNode和ResourceManger如果不是同一台机器,不能在NameNode上启动 YARN,应该在ResouceManager所在的机器上启动YARN。[hadoop@hadoop102 hadoop-2.7.2]$ sbin/start-yarn.sh
Web端查看SecondaryNameNode
(a)浏览器中输入:http://hadoop103:50090/status.html
(b)查看SecondaryNameNode信息
(3)各个服务组件逐一启动/停止
分别启动/停止HDFS组件hadoop-daemon.sh start / stop namenode / datanode / secondarynamenode
启动/停止YARNyarn-daemon.sh start / stop resourcemanager / nodemanager
各个模块分开启动/停止(配置ssh是前提)常用
整体启动/停止HDFSstart-dfs.sh / stop-dfs.sh
整体启动/停止YARNstart-yarn.sh / stop-yarn.sh
配置历史服务器和日志聚集功能
mapred-site.xml:1
2
3
4
5
6
7
8
9
10
11<!-- 历史服务器端地址 -->
<property>
<name>mapreduce.jobhistory.address</name>
<value>hadoop103:10020</value>
</property>
<!-- 历史服务器web端地址 -->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>hadoop103:19888</value>
</property>
yarn-site.xml:1
2
3
4
5
6
7
8
9
10
11<!-- 日志聚集功能使能 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<!-- 日志保留时间设置7天 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>
启动日志服务器命令:
在日志服务器端(hadoop103)mr-jobhistory-daemon.sh start historyserver
关闭日志服务器命令:mr-jobhistory-daemon.sh stop historyserver
“出错不要慌张,先冷静,然后去log目录下看.log文件定位错误!”
“读日志是一项很重要的技能!”
时间同步服务
时间服务器配置(hadoop101、必须root用户)
(1)检查ntp是否安装(使用root用户)rpm -qa | grep ntp
出现ntp-4.2.6p5-10.el6.centos.x86_64
和 ntpdate-4.2.6p5-10.el6.centos.x86_64
类似文件就证明已安装
(2)检查ntp运行状态service ntp status
若正在运行则停止:service ntp stop
取消服务自启动:chkconfig ntpd off
查看服务状态:chkconfig --list ntpd
上面两条在centos7.x以后分别是:systemctl disable ntpd.service
systemctl status ntpd.service
(3)修改有关的配置文件
首先进入/etc/ntp.config文件:sudo vim /etc/ntp.conf
a)修改1(授权192.168.1.0-192.168.1.255网段上的所有机器可以从这台机器上查询和同步时间)1
2#restrict 192.168.1.0 mask 255.255.255.0 nomodify notrap为
restrict 192.168.1.0 mask 255.255.255.0 nomodify notrap
b)修改2(集群在局域网中,不使用其他互联网上的时间)1
2
3
4
5
6
7
8server 0.centos.pool.ntp.org iburst
server 1.centos.pool.ntp.org iburst
server 2.centos.pool.ntp.org iburst
server 3.centos.pool.ntp.org iburst为
#server 0.centos.pool.ntp.org iburst
#server 1.centos.pool.ntp.org iburst
#server 2.centos.pool.ntp.org iburst
#server 3.centos.pool.ntp.org iburst
c)添加3(当该节点丢失网络连接,依然可以采用本地时间作为时间服务器为集群中的其他节点提供时间同步)
server 127.127.1.0
fudge 127.127.1.0 stratum 10
其次进入/etc/sysconfig/ntpd文件:sudo vim /etc/sysconfig/ntpd
增加内容如下(让硬件时间与系统时间一起同步)
SYNC_HWCLOCK=yes
(4)重启ntpd服务systemctl start ntpd.service
systemctl status ntpd.service
(5)设置ntpd服务开机启动systemctl enable ntpd.service
其他机器配置(hadoop102和hadoop103、必须root用户)
(1)在其他机器配置10分钟与时间服务器同步一次crontab -e
编写定时任务如下:*/10 * * * * /usr/sbin/ntpdate hadoop102
(2)修改任意机器时间date -s "2017-9-11 11:11:11"
(3)十分钟后查看机器是否与时间服务器同步date
说明:测试的时候可以将10分钟调整为1分钟,节省时间。
第二部分 HDFS
Hadoop Distributed File System
HDFS的使用场景:适合一次写入、多次读出的场景,且不支持文件的修改
优缺点
优点:高容错性(可恢复的多副本)、适合处理大数据(PB级别数据量、百万规模文件数量)、可构建在廉价机器上(和超算相比,后者就是硬件牛逼)
缺点:不适合低延时数据访问(运行慢)、无法高效对大量小文件进行存储(NN的条目太多)、不支持并发写入和文件随机修改(单线程写,仅支持数据追加append,修改只能删除重写)
HDFS组成架构
1)NameNode(nn):Master,主管、管理者
(1)管理HDFS的名称空间
(2)配置副本策略
(3)管理数据块(Block)映射信息
(4)处理客户端读写请求
2)DataNode(dn):slave,nn下达命令,dn执行实际的操作
(1)存储实际的数据块
(2)执行数据块的读/写操作
3)Client:客户端
(1)文件切分。将文件切分为block,再上传
(2)与nn交互,获取文件位置
(3)与dn交互,读写数据
(4)提供命令管理HDFS,如nn格式化
(5)通过命令访问HDFS,如对HDFS进行CRUD
4)Secondary NameNode(2nn):不是nn的热备。不能在nn挂掉时,立刻替换
(1)辅助nn,分担工作量
HDFS文件块大小(面试重点)
块的大小可通过配置参数(dfs.blocksize)来规定
默认大小在Hadoop2.x版本中是128M,老版本是64M原因:
块平均寻址时间为10ms
因此传输时间为:
10ms/0.01=1000ms=1s
而目前磁盘的传输速率普遍为100MB/s
所以取2的幂:128M
块不能设置太小,也不能设置太大!
太小:增加寻址时间,很慢
太大:会导致MapReduce处理不方便,很慢
HDFS的Shell操作(开发重点)
hadoop fs
和 hdfs dfs
的效果一模一样,因为源码中调用的是同一个.sh文件
Hadoop fs 命令分类:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24本地——》HDFS
put
copyFromLocal(和put功能一模一样,只是为了兼容以前的命令)
查看命令帮助:`hadoop fs -copyFromLocal -help`
moveFromLocal(上面复制,这个是剪切)
appendToFile
HDFS——》HDFS
cp
mv
chown
chgrp
chmod
mkdir
du
df
cat
rm
setrep(注:任何情况下,副本数都不可能比dn结点数多。一个dn至多只能存同一个数据的一份副本)
HDFS——》本地
get
getmerge:合并下载成一个文件
copyToLocal(和get功能一模一样,兼容以前的命令)
小技巧:cat << EOF >>1.txt创建文件
IDEA环境设置
在Intelij IDEA中修改maven源为国内(阿里)镜像
Maven简要介绍
Maven是用来管理项目的工具
B站Mvn回顾
有一个专门搜Maven依赖的网站
MVNRepository
HDFS客户端开发(IDEA)
HDFS的Shell和客户端是开发的重点,须多加练习
客户端操作本质上就是封装了shell语句的java程序,实现和逻辑都是shell
HDFS客户端练习
HDFS的数据流(面试重点)
HDFS上传流程
首先获得HDFS抽象封装,开启本地文件输入流和FSDataOutputStream输出流,然后:
1.client向NN请求上传请求
2.NN返回给client应答是否上传结果
3.获得许可后,client逻辑切分文件为多个block
4.client向NN请求上传第一个block
5.NN给client返回多个DN(最近节点和其他节点)
6.client依次向所有DN发送请求建立通道
7.DN依次判断后向client返回应答成功
8.client依次向所有DN以 packet(64KB) 形式上传数据,直到发送成功第一个block,并存储在DN中。这里传输会出现多种情况,具体可看视频。
9.依次类推,直到发送完毕所有block
最后,上传文件结束,释放文件输入流和FSDataOutputStream输出流
具体上传过程可看:
B站详细HDFS上传流程 01:38开始
拓扑距离(最近节点)和机架感知(其他节点)
HDFS下载流程
首先获得HDFS抽线封装,开启本地文件输出流和FSDataInputStream输入流,然后:
1.client向NN请求下载请求
2.NN返回给client应答文件是否存在
3.获得许可后,client请求下载第一个block(0-128M)
4.NN返回给client返回多个DN
5.client首先向第1个DN请求建立通道
6.DN1判断并向client返回应答成功,并传输packet数据,直到传输成功第1个block
7.如果第一个DN应答失败才会向其他DN请求建立通道,每次请求一个DN,进行接下来的操作
8.以此类推,接收完所有block
最后,下载文件结束,释放文件输出流和FSDataInputStream输入流
具体下载过程可看:
B站详细HDFS下载流程
NN和2NN工作机制(面试开发重点)
需要应对内存中的数据的持久化问题
NameNode的内存大小一般在64G到128G之间
如此大块数据的持久化,有样例:Redis的持久化,有两个策略:
RDB:内存镜像放在磁盘上,类似存档;
特点: 持久化慢,占空间偏小;
(加载高效,生成较慢)
安全性低;
AOF:命令操作流程记录在AOF文件中,类似命令小纸条
特点:持久化快,占空间偏大;
(加载慢,生成较快)
安全性高;
NN和2NN关系类比(老板和秘书)
NN和Redis策略相似,有Fsimage(类RDB)和edits.log(类AOF)
Fsimage类似旧存档,edits.log类似旧存档之后还没保存的操作
2NN定期将Fsimage和edits.log整合成一个新的存档Fsimage_chkpoint,返回给NN恢复内存状态
定期(条件满足一个即可):
1.定时,通常每隔1小时2NN执行一次
2.edits满,通常1分钟2NN检查一次,当操作次数达到1百万时,2NN执行一次
3.NN刚启动时2NN也会执行一次
B站详细讲解NN和2NN 从25:23开始
Fsimage和Edits解析
Fsimage和Edits文件在/opt/module/hadoop-2.7.2/data/tmp/dfs/name/current/
Fsimage只保留一个最新的和第二新的,Edits都保留
查看Fsimage命令:hdfs oiv -p XML -i fsimage_xxx -o /opt/module/hadoop/fsimage.xml
查看Edits命令:hdfs oev -p XML -i edits_xxx -o /opt/module/hadoop/edits.xml
可以再使用sz命令
拷贝到宿主机win10上查看
DN工作机制(面试开发重点)
DN上的块在/opt/module/hadoop-2.7.2/data/tmp/dfs/name/current/
其中不仅有块数据,还有块对应元数据meta文件(数据长度、校验和、时间戳)
1.DN向NN注册
2.DN注册成功
3.DN除在最开始,而且每周期(1h)上报所有块信息
4.每3秒一次心跳(last contact),心跳返回的结果带有NN给DN的命令
5.默认超过(10分钟+30秒)没有DN的心跳,则认为该节点不可用
掉线时限参数设置:TimeOut = 2 * dfs.namenode.heartbeat.recheck-interval(5min) + 10 * dfs.heartbeat.interval(3s)
可以修改上面两个参数
数据校验算法:保证数据完整性
crc(散列值:32bit):HDFS传输一般数据
md5(128bit):HDFS传输元数据
sha1(160bit):安全性高,算一次消耗cpu多,传输代价高
B站详解DN
服役新节点、退役旧节点、多目录配置
服役新节点比较简单,之前从伪分布式到完全分布式时已经做过,只须做到以下几点,新节点就能接入集群:
1.环境准备,如jdk和hadoop环境。rsync
一下hadoop有关的配置文件
2.更改ip和主机名,保证在
4.source一下配置文件:source /etc/profile
5.单点启动DN时:hadoop-daemon.sh start datanode
和yarn-daemon.sh start nodemanager
,如果遇到数据不均衡,还可以使用start-balancer.sh
命令实现集群的再平衡
退役旧节点有两种策略:添加白名单、黑名单退役。
HDFS使用的一般是黑名单。设置都在NN所在主机操作
白名单机制一般是保证集群准入机制,保证集群自身的安全性,不多用于退役节点。
黑名单机制比较温和,主要是添加hdfs-site.xml
中dfs.hosts.exclude
属性与值:
1.创建黑名单文件blacklist
,写入要退役的主机名
2.在hdfs-site.xml
中添加dfs.hosts.exclude
属性,值就是黑名单文件blacklist
的地址
3.刷新NameNode、刷新ResourceManagerhdfs dfsadmin -refreshNodes
yarn rmadmin -refreshNodes
4.检查web浏览器,退役节点状态显示为decommission in progress(退役中)
,过会儿显示为decommissioned(所有块复制完成,退役结束)
,但此时心跳仍然存在,只是集群不存放和处理这个节点的数据。如要完全退出就单点退出hdfs和yarn
5.遇到数据不均匀,还可以使用start-balancer.sh
命令实现集群再平衡
白名单机制比较激进,主要是添加hdfs-site.xml
中dfs.host
属性与值
1.创建白名单文件whitelist
,写入可以进入集群的主机名
2.在hdfs-site.xml
中添加dfs.hosts
属性,值就是白名单whitelist
的地址
3.配置文件分发到所有节点:xsync hdfs-site.xml
4.刷新NameNode、刷新ResourceManagerhdfs dfsadmin -refreshNodes
yarn rmadmin -refreshNodes
5.在web浏览器查看当前DN节点情况
6.遇到数据不均匀,还可以使用start-balancer.sh
命令实现集群再平衡
多目录配置主要是为了解决数据增大,硬盘容量不足的问题
DataNode也可以配置成多个目录,每个目录存储的数据不一样。即:数据不是副本
具体配置在hdfs-site.xml
1
2
3
4<property>
<name>dfs.datanode.data.dir</name>
<value>file:///${hadoop.tmp.dir}/dfs/data1,file:///${hadoop.tmp.dir}/dfs/data2</value>
</property>
data2文件夹类似于在该挂载点处多挂载了一块新的硬盘
第三部分:MapReduce框架原理(非常重要)
Map(映射):将不好处理的数据格式 改变映射为 好处理的数据格式
Reduce(规约合并):针对映射后的数据来做规约或合并处理
MapReduce简述与序列化
优点:简单;缺点:慢
一个完整的MapReduce程序在分布式运行时有三类实例进程:
1.MrAppMaster:负责整个程序的过程调度及状态协调
2.MapTask:负责Map阶段整个数据处理流程(映射)
3.ReduceTask:负责Reduce阶段整个数据处理流程(规约合并)
常用数据序列化类型
Java类型 Hadoop Writable类型
boolean BooleanWritable
byte ByteWritable
int IntWritable
float FloatWritable
long LongWritable
double DoubleWritable
String Text
map MapWritable
array ArrayWritable
动手在IDEA上写一个MapReduce程序实现WordCount
IDEA手写示例程序wordcount并打包运行自定义序列化接口
IDEA手写自定义bean对象实现序列化接口Writable以上可知:
用户编写的程序分成三个部分:Mapper、Reducer和Driver。
Mapper阶段:
1.用户自定义Mapper要继承框架父类Mapper
2.Mapper的输入数据是KV对形式,类型可自定义
3.Mapper的业务逻辑写在map()方法中
4.Mapper的输出数据是KV对形式,类型可自定义
5.map()方法(MapTask进程)对每个< K,V >调用一次
Reducer阶段:
1.用户自定义Reducer要继承框架父类Reducer
2.Reducer的输入数据类型对应Mapper的输出数据类型,是KV对形式
3.Reducer的业务逻辑写在reduce()方法中
4.Reducer的输出数据是KV对形式,类型可自定义
5.ReduceTask进程对每组相同K的< K,V >调用一次reduce()方法
Driver阶段:
相当于YARN集群客户端,用于提交我们程序到YARN,提交是封装了MapReduce程序相关运行参数的job对象
MapReduce的数据流
数据切片与MapTask并行度决定机制,切片由InputFormat负责
InputFormat数据输入
InputFormat主要只干两件事:切片、输出KV值1
2
3
4
5
6
7Inputformat 切片方法 KV方法
Text FIF的切片方法 LineRecordReader
KeyValue FIF的切片方法 KeyValueLineRecordReader
NLine 自定义,N行一片 LineRecordReader
CombineText 自定义,跨小文件切分 CombineFileRecordReader
FixedLength FIF的切片方法 FixedLengthRecordReader
SequenceFile FIF的切片方法 SequenceFileRecordReader
IDEA自定义InputFormat案例实操
Shuffle机制(非常重要)
Map方法结束后,数据进入环形缓冲区。当环形缓冲区满时,数据溢写要落盘,落盘前,先得到一个逻辑分区号,然后需完成分区和排序的工作,本质上是一个工作,二排序(先分区号,后按Key)完之后相同key相邻就完成了分区。
分区结束后,得到分区且区内有序文件,排序方法使用快排。
得到多个上述分区文件后,按照分区号,可进入Combiner进行一次分区的归并排序,减少之后IO的操作。
数据多次溢写文件,会多次输出文件。这些溢写文件再进行一次归并排序,进入Combiner之后落盘,得到按照分区且区内有序的文件。这就是一个MapTask的输出。当有n个MapTask的输出文件时,启动和分区数m个相同的ReduceTask,并按分区号取出分区文件内容。每个ReduceTask都会处理所有n个MapTask相应分区的数据。最后在内存缓存中产生m个文件,内存缓存满了才会使用磁盘。
接着,在内存缓存中再进行归并合成一整个输出文件,特点是key有序或按照自定义规则排好序,之后分组,并输入到Reduce方法中。其中分组阶段可以使用自定义分组规则,但注意之前排序规则就要比分组规则写的更细。如分组是按订单分组,那排序则需要先按订单排序,订单内部再按其他规则排序,这样才不会出现逻辑问题。
最后将分组文件交给OutputFormat,再经过RecordWriter输出。其中,一个ReduceTask对应一个输出文件。
整个Shuffle过程有三次排序过程。环形缓冲区本质上就是内存。环形最大特点是没头没尾,所以在哪里写入都一样。默认是100M大小。写入数据时,任意一点开始,右边写入KV值,左边同时写入KV值对应的位置索引Index。遍历时,遍历索引index,因为占内存小,遍历省时。
当写入占环形缓冲容量的80%左右时,会发生溢写过程。当溢写时,Map方法来的数据写入剩余20%部分。环形的设计还可保持处理和写入的动态平衡。
默认80%参数可调,如果Map方法逻辑复杂,参数可调小;如果任务IO剧烈,压力比较大,参数调大一些。总之保证写入和处理动态平衡。
其实溢写过程的分区和排序也就是发生在环形缓冲区中。排序时,实际上不交换KV值,是交换索引Index,好处是Index的IO交换代价小。
Partition分区案例实操
ReduceTask的并行度是手动设置的,由job.setNumReduceTasks()方法决定。
分区是告诉数据应该被哪个ReduceTask处理。且分区号要从0开始,逐一累加,不能跳。
如果分区数量比ReduceTask的数量多,程序会报IOException:Illegal partition异常。
如果分区数量比ReduceTask的数量少,程序可正常运行,但是会有资源浪费,有ReduceTask空转,没有有效输出。
IDEA手写Partition分区案例实操
WritableComparable排序
排序是MapReduce框架中最重要的操作之一。
MapTask结束后,必然已经对得到的分区文件按key值排好序了。
IDEA手写WritableComparable排序案例实操
Combiner合并
Conbiner的父类是Reducer。与Reducer的区别在于:
Combiner是在每个MapTask所在的节点运行;
Reducer是接收全局所有Mapper的输出结果。
Combiner启用的意义就是对每个MapTask的输出进行局部汇总,目的是减少IO
Combiner默认是不启动的,能够启用的前提是不能影响最终的业务逻辑。而且Combiner输入和输出的类型必须相同,因为不能改
变Mapper输出的类型。
Combiner对数据要求是数据本身可分组,是有序且有重复的。
故其在第一次排序——快排和第二次排序——归并起效。
IDEA手写Combiner合并案例实操
GroupingComparator分组排序器,辅助排序
何时分组:Reduce阶段,在之前排序阶段之后
何时启用GroupingComparator:分组规则和排序规则不一样时,不希望默认按照Key比较规则。排序规则的粒度要更细,要体现出分组的规则。
IDEA手写GroupingComparator分组案例实操
reduce()方法输入原理
Reduce输入端只有一个空键对象Key和一个空值对象Value,所有数据在框架中都是通过KV值序列化的流传递,通过反序列化不断更新复用对象,达到遍历所有对象的需求。
数据的输入和分组是同时完成的。因为数据量非常大时,使用序列化方法复用对象,可以避免重复产生大量对象消耗资源。并且,反序列化遍历对象时,每个ReduceTask会默认调用Reduce方法,输出排序后每组第一个key所对应的对象(context.write(key,value.get()))。
如果想要取每组若干名,则可使用迭代器方法逐个在组内反序列化。详情可看IDEA手写GroupingComparator分组案例实操中Reducer对象中reduce方法中的写法。
MapReduce详细工作流程
自己必须可以画出整个工作流程!
详细过程细节理解还可以参考这个网站:MapReduce过程详解(基于hadoop2.x架构)
自定义RecordWriter
FileInputFormat是文件到KV值
FileOutputFormat是KV值到文件
IDEA自定义OutputFormat案例实操
MapReduce应用
ReduceJoin案例实操
数据库中Join操作是连接两张表的操作,大致可分为内连接、外连接,左连接、右连接和自然连接。
MySQL的JOIN(一):用法
ReduceJoin的含义是实际的Join工作是在Reduce端完成的,因为Reduce有数据汇总的步骤。
Map端为不同表或文件的KV值,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,进行输出。
Reduce端将每个分组中来源于不同文件的记录分开,然后分别合并。
MapJoin案例实操
MapJoin的含义是实际的Join工作是在Map端完成的。
MapJoin适用于一张表十分小(或者几张小表,但是能全部进内存)、一张表很大的场景。
优点:在Map端就完成了join操作,所以就不需要Reduce端的工作,也不需要shuffle,也不会因为shuffle而引起数据倾斜。
数据清洗ETL案例实操
在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。
IDEA数据清洗ETL实操案例-简单版
IDEA数据清洗ETL实操案例-复杂版
MapReduce开发总结
MapReduce的重点在Shuffle机制
HDFS HA高可用
1.x版本存在单点故障(SPOF)
2.x版本实现了高可用(High Available),即7*24小时不中断服务
实现高可用最关键的策略是消除单点故障。HA严格应分为各个组件的HA机制:HDFS的HA 和 YARN的HA。
四大部分:
NN(active和standby)、第三方的edits文件管理系统QJM、ZK集群、Zk集群在NN上的客户端ZKfc
主要防止:split brain/脑裂 问题(两个NN导致的数据不安全问题)
内容过于细节,参考讲解文档,实现细节略有不同。
YARN HA高可用
YARN和HA都是2.x版本中的新特性,互相都在迭代过程中,所以很多东西已经设计在一起了。
同样内容过于细节,参考详解文档,实现细节略有不同。
HDFS Federation架构设计
主要解决由于DN数量增大,导致的NN内存不够的问题。
Federation的含义是:多个NN组成一个Block Pools,共同管理一片元数据,每个NN负责其中一个部分。
Hadoop 数据压缩
适用的基本原则:
运算密集型的job,少用压缩;
IO密集型的job,多用压缩。
选择压缩方法的最关键因素:压缩和解压的快慢
Hadoop中常用的方法是:Lzo和Snappy
压缩阶段:InputFormat、shuffle、outputFormat
YARN工作机制
(1)MR程序提交到客户端所在的节点。
(2)YarnRunner向ResourceManager申请一个Application。
(3)RM将该应用程序的资源路径返回给YarnRunner。
(4)该程序将运行所需资源提交到HDFS上。
(5)程序资源提交完毕后,申请运行mrAppMaster。
(6)RM将用户的请求初始化成一个Task。
(7)其中一个NodeManager领取到Task任务。
(8)该NodeManager创建容器Container,并产生MRAppmaster。
(9)Container从HDFS上拷贝资源到本地。
(10)MRAppmaster向RM 申请运行MapTask资源。
(11)RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。
(12)MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。
(13)MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
(14)ReduceTask向MapTask获取相应分区的数据。
(15)程序运行完毕后,MR会向RM申请注销自己。
YARN资源调度器
Hadoop作业调度器主要有三种:FIFO、Capacity Scheduler和Fair Scheduler。Hadoop2.7.2默认的资源调度器是Capacity Scheduler。
FIFO: 优点:调度策略简单; 缺点:不利于短作业
Capacity Scheduler: 优点:多个FIFO调度,事先决定分配; 缺点:不灵活
Fair Scheduler:优点: 多个FIFO公平分配资源,也可针对某个FIFO分配资源,比较灵活; 缺点:设置复杂
MapReduce扩展案例
TopN案例
本案例需求:
对 序列化操作案例 输出结果进行加工,输出流量使用量在前10的用户信息
解决思路:
将所有输出归总到同一个组中,只取前10个。所以要指定GroupComparator的compare分组方法。
实现细节与讲解文档略有不同。
IDEA上TopN案例
倒排索引案例
倒排索引是搜索引擎中非常重要的部分。
倒排的含义是:从 单词 到 文章 的索引。 正排:从文章到单词的索引
本案例需求:
对关键词出现的文档和该文档中出现的频率进行统计。
解决思路:
实际上是一个多job串联,每个job实现一个wordcount功能
第一次针对每个单词每篇文章做一次wordcount,key是单词-文章名,value是出现的次数;
第二次对结果文件进行处理,将key重新划分为单词,将value重新划分为文章名和对应出现次数的拼接字符串。
实现细节与讲解文档略有不同。
找博客共同好友案例
共同好友的延申:推荐共同好友
本案例需求:
以下是博客的好友列表数据,冒号前是一个用户,冒号后是该用户的所有好友(数据中的好友关系是单向的)
求出哪些人两两之间有共同好友,及他俩的共同好友都有谁?
解决思路:
首先第一次输出先找出A、B、C等都被谁关注。转变视角从我关注了谁,到谁关注了我。
其次,再从输出文件中每行的value两两找出关注者。
实现细节与讲解文档略有不同。
IDEA上找博客共同好友案例