龙空技术网

大数据 Hadoop(上) 笔记大全 收藏加关注

JackYang1993 337

前言:

眼前兄弟们对“hadoop切片计算”可能比较关切,咱们都需要了解一些“hadoop切片计算”的相关文章。那么小编也在网上汇集了一些关于“hadoop切片计算””的相关内容,希望同学们能喜欢,兄弟们一起来学习一下吧!


BigDate


一、概述大数据概念

大数据是新处理模式才具有更强的决策力,洞察发现力和流程优化能力来适应海量、高增长、多样化得信息资产。


面临的问题

存储:单机存储有限,容错率低,没法对文件进行拆分

分析:单机(cpu)性能有限,可能计算资源分布不均匀(效率低)


大数据的特点(1)数据量大

B-KB-MB-GB-TB-PB-EB-ZB....

各种云存储解决方案,百度云,腾讯微云,OneDriver、GoogleDriver,现有硬件资源可以支撑大数据的存储。

大数据时代产生的数据是极其惊人,支付宝、微博、微信、淘宝、知乎、JD、抖音,互联网时代物质文化需求会催生更多的数据。

(2)数据时效性

天猫、JD

大数据是短时间迅速产生的;数据需要合理的时间内计算完成。

(3)数据多样性数据存储类型的多样性结构化:SQL,文本非结构化:视频、音频、图片数据分析类型的多样性地理位置:来自上海北京设备信息:PC、手机、手表、手环个人喜好:美女 面膜 显卡 数码 游戏社交网络:A认识B,C,则B可能认识C网络身份证:设备MAC+IP+电话号码(4)数据价值

警察叔叔:只关注是否违规

AI研究:只关注对AI 有帮助的数据

所以在提取海量数据中有用的数据最为关键,这个是数据分析的第一步数据清洗(数据降噪|数据预处理)


大数据的应用场景(1)个人推荐

根据用户喜好,推荐相关兴趣内容。

千人一面:范围广、精度粗

一人一面:范围小、精度高

一人千面:兴趣内容范围大,精度高

(2)风控

金融系统、银行、互联网金融

实时流处理

(3)成本预测

根据近期销售和市场数据,预测成本,做出规划


(4)气候预测

根据以往气象信息,预测近期气候变化,和推断以往气候异常。

(5)人工智能

无人汽车:百度、特斯拉、Google

智能助手:小爱

物流机器人:

犯罪预测: 恐怖主义


工作方向

1 业务: 电商推荐系统,智能广告系统,专家系统等2 工作方向 大数据运维工程师,大数据开发工程师 (批处理、流处理、ETL、数据仓库、数据挖掘) 数据分析(算法)


我的启蒙老师


分布式

为了解决存储和分析,单机性能有限,所以需要分布式。使用多台物理机器解决问题。

硬件资源有了?软件上怎么搞?


二、Hadoop

Hadoop: 适合大数据的分布式存储和计算平台 Hadoop不是指具体一个框架或者组件,它是Apache软件基金会下用Java语言开发的一个开源分布式计算平台。实现在大量计算机组成的集群中对海量数据进行分布式计算。适合大数据的分布式存储和计算平台。 Hadoop1.x中包括两个核心组件:MapReduce和Hadoop Distributed File System(HDFS) 其中HDFS负责将海量数据进行分布式存储,而MapReduce负责提供对数据的计算结果的汇总


HDFS:Hadoop Distribute FileSystem

MapReduce:Hadoop中的分布式计算框架,实现对海量数据的并行分析和计算。


Hadoop 生态圈

HDFS:Hadoop Distribute FileSystem

MapReduce:Hadoop中的分布式计算框架,实现对海量数据的并行分析和计算。

HBase:基于HDFS的列式存储 的NoSQL 数据库

Hive:简化大数据开发,可以将SQL 语法翻译成MR任务

Flume:分布式的日志收集系统,用于收集海量数据,将其存储到FS 中

Kafka:分布式的消息系统,实现分布式解耦和海量数据的缓冲

ZooKepper:分布式协调,用于服务注册中心、配置中心、集群选举、状态监测、分布式锁


大数据解决方案

MR:代表基于磁盘的大数据的离线批处理的解决方案-延迟较高 30分钟

Spark:代表基于内存的大数据静态批处理的解决方案-几乎是MR的10倍以上

Storm/Spark Streaming/Flink/Kafka Streaming:实时流处理框架,达到对记录级别的数据显示和毫秒级的处理


三、HDFS 安装和使用基础环境

保证虚拟机可以正常运行

(1)安装Java

统一安装JDK8配置Java 的环境变量,保证其可用 配置用户变量
(2)配置主机名和IP的映射关系
# 配置主机名(hostname|域名|网名)[root@CentOS hadoop-2.6.0-src]# vi /etc/sysconfig/networkNETWORKING=yesHOSTNAME=HadoopNode00
# 更改主机名和IP的映射关系[root@HadoopNode00 ~]# vi /etc/hosts​127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4::1         localhost localhost.localdomain localhost6 localhost6.localdomain6192.168.197.6 CentOS192.168.197.10 HadoopNode00​


(3)关闭防火墙

service iptables stop   #关闭服务chkconfig iptables off  #关闭开机自起

因为搭建分布式服务之间可能产生相互的调度,为了保证正常的通信,一般需要关闭防火墙


(4)配置主机SSH 免密登录

SSH是Secure Shell 的缩写,SSH为建立在应用层基础上的安全协议,专为远程登录好会话和其他网络服务提供安全性的协议

基于口令的安全验证:基于口令用户名/密码

基于密钥的安全验证:需要依靠密钥,也就是你必须为你自己创建一对密钥,并把公用密钥放在需要访问的服务器上。如果你要连接到SSH服务器上,客户端软件就会向服务器发出请求,请求用你的密钥进行安全验证。服务器收到请求之后 ,先在该服务器上你的主目录下寻找你的公用密钥,然后把它和你发过来的公用密钥进行比较。如果两个密钥一致,服务器就用公用密钥加密“质询”(challenge)并把它发送给客户端软件。客户端软件收到“质询”之后就可以用你的私人密钥解密再把它发送给服务器。

安全验证的方式:基于口令:用户、密码基于密钥的安全验证

[root@HadoopNode00 ~]# ssh-keygen -t rsa       #  生成公私钥[root@HadoopNode00 ~]# ssh-copy-id HadoopNode00  # 拷贝公钥到指定的机器中authorized_keys(公钥列表)


安装HDFS(1)解压至自定义目录

#创建文件# 解压[root@HadoopNode00 ~]# cp /home/hadoop/hadoop-2.6.0-src/hadoop-dist/target/hadoop-2.6.0.tar.gz /root[root@HadoopNode00 ~]# tar -zxvf hadoop-2.6.0.tar.gz -C /home/hadoop/
(2)配置Hadoop环境变量
[root@HadoopNode00 ~]# vi .bashrcexport HADOOP_HOME=/home/hadoop/hadoop-2.6.0export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin

HADOOP_HOME环境变量被第三方产品所依赖例如:hbase/Hive/Flume/Spark在集成Hadoop的时候,是通过读取HADOOP_HOME环境确定Hadoop位置

(3)etc/hadoop/core-site.xml

/home/hadoop/hadoop-2.6.0/etc/hadoop/

<property>  <name>fs.defaultFS</name>  <value>hdfs://HadoopNode00:9000</value></property><!-- ${user.name} 提取当前的用户名 --><property>  <name>hadoop.tmp.dir</name>  <value>/home/hadoop/hadoop-2.6.0/hadoop-${user.name}</value></property>
(4)配置etc/hadoop/hdfs-site.xml

/home/hadoop/hadoop-2.6.0/etc/hadoop/

<property>  <name>dfs.replication</name>  <value>1</value></property>


(5)启动格式化NameNode

如果第一次启动HDFS 需要格式化NameNode

为什么?

# 格式化namenode[root@HadoopNode00 ~]# hdfs namenode -format # 这一步跳过  展示文件夹目录结构[root@HadoopNode00 ~]# tree /home/hadoop/hadoop-2.6.0/hadoop-root//home/hadoop/hadoop-2.6.0/hadoop-root/└── dfs    └── name        └── current            ├── fsimage_0000000000000000000            ├── fsimage_0000000000000000000.md5            ├── seen_txid            └── VERSION3 directories, 4 files
启动HDFS
[root@HadoopNode00 ~]# start-dfs.sh     # 启动[root@HadoopNode00 ~]# jps  # 如果启动成功  则有三个相关进程9095 Jps4297 NameNode4573 SecondaryNameNode4415 DataNode[root@HadoopNode00 ~]# stop-dfs.sh		#  关闭

需要在windows下面配置 C:\Windows\System32\drivers\etc\hosts文件,配置主机名和IP的映射关系

用户可以通过浏览器访问hdfs的web管理界面:


HDFS Shell 客户端操作

[root@HadoopNode00 ~]# hadoop fsUsage: hadoop fs [generic options]        [-appendToFile <localsrc> ... <dst>]        [-cat [-ignoreCrc] <src> ...]        [-checksum <src> ...]        [-chgrp [-R] GROUP PATH...]        [-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]        [-chown [-R] [OWNER][:[GROUP]] PATH...]        [-copyFromLocal [-f] [-p] [-l] <localsrc> ... <dst>]        [-copyToLocal [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]        [-count [-q] [-h] <path> ...]        [-cp [-f] [-p | -p[topax]] <src> ... <dst>]        [-createSnapshot <snapshotDir> [<snapshotName>]]        [-deleteSnapshot <snapshotDir> <snapshotName>]        [-df [-h] [<path> ...]]        [-du [-s] [-h] <path> ...]        [-expunge]        [-get [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]        [-getfacl [-R] <path>]        [-getfattr [-R] {-n name | -d} [-e en] <path>]        [-getmerge [-nl] <src> <localdst>]        [-help [cmd ...]]        [-ls [-d] [-h] [-R] [<path> ...]]        [-mkdir [-p] <path> ...]        [-moveFromLocal <localsrc> ... <dst>]        [-moveToLocal <src> <localdst>]        [-mv <src> ... <dst>]        [-put [-f] [-p] [-l] <localsrc> ... <dst>]        [-renameSnapshot <snapshotDir> <oldName> <newName>]        [-rm [-f] [-r|-R] [-skipTrash] <src> ...]        [-rmdir [--ignore-fail-on-non-empty] <dir> ...]        [-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>]]        [-setfattr {-n name [-v value] | -x name} <path>]        [-setrep [-R] [-w] <rep> <path> ...]        [-stat [format] <path> ...]        [-tail [-f] <file>]        [-test -[defsz] <path>]        [-text [-ignoreCrc] <src> ...]        [-touchz <path> ...]        [-usage [cmd ...]]Generic options supported are-conf <configuration file>     specify an application configuration file-D <property=value>            use value for given property-fs <local|namenode:port>      specify a namenode-jt <local|resourcemanager:port>    specify a ResourceManager-files <comma separated list of files>    specify comma separated files to be copied to the map reduce cluster-libjars <comma separated list of jars>    specify comma separated jar files to include in the classpath.-archives <comma separated list of archives>    specify comma separated archives to be unarchived on the compute machines.The general command line syntax isbin/hadoop command [genericOptions] [commandOptions]
上传文件
[root@HadoopNode00 ~]# hadoop fs -copyFromLocal /root/install.log  /1.log[root@HadoopNode00 ~]# hadoop fs -put  /root/install.log  /2.log
下载文件
[root@HadoopNode00 ~]# hadoop fs -get /1.log  /root/baizhi1.log[root@HadoopNode00 ~]# hadoop fs -copyToLocal  /1.log  /root/baizhi2.log
删除文件
[root@HadoopNode00 ~]# hadoop fs -rm -r /1.log19/12/04 00:30:26 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.Deleted /1.log
创建文件夹
[root@HadoopNode00 ~]# hadoop fs -mkdir /baizhi[root@HadoopNode00 ~]# hadoop fs -ls /Found 3 items-rw-r--r--   1 root supergroup       8901 2019-12-04 00:25 /2.logdrwxr-xr-x   - root supergroup          0 2019-12-04 00:31 /baizhi-rw-r--r--   1 root supergroup       8901 2019-12-03 23:53 /install.log
移动|重命名
[root@HadoopNode00 ~]# hadoop fs -mv /2.log /baizhi
滚动查看文件内容
[root@HadoopNode00 ~]# hadoop fs -tail -f /install.log
追加文件内容
[root@HadoopNode00 ~]# hadoop fs -appendToFile baizhi1.log /install.log


开启回收站

etc/hadoop/core-site.xml

<property>  <name>fs.trash.interval</name>  <value>1</value></property>

设置一分钟延迟 1分钟后文件将被HDFS彻底删除 防止用户误操作

[root@HadoopNode00 ~]# hadoop fs -rm -r /baizhi19/12/04 00:43:10 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 1 m                                            inutes, Emptier interval = 0 minutes.Moved: 'hdfs://HadoopNode00:9000/baizhi' to trash at: hdfs://HadoopNode00:9000/user/root/.Trash/Cur                                            rent[root@HadoopNode00 ~]# hadoop fs -ls /user/root/.Trash/CurrentFound 1 itemsdrwxr-xr-x   - root supergroup          0 2019-12-04 00:32 /user/root/.Trash/Current/baizhi[root@HadoopNode00 ~]# hadoop fs -ls /user/root/.Trash/Current/baizhiFound 1 items-rw-r--r--   1 root supergroup       8901 2019-12-04 00:25 /user/root/.Trash/Current/baizhi/2.log[root@HadoopNode00 ~]# hadoop fs -ls /user/root/.Trash/CurrentFound 1 itemsdrwxr-xr-x   - root supergroup          0 2019-12-04 00:32 /user/root/.Trash/Current/baizhi[root@HadoopNode00 ~]# hadoop fs -ls /user/root/.Trash/Current/baizhils: `/user/root/.Trash/Current/baizhi': No such file or directory[root@HadoopNode00 ~]# hadoop fs -ls /user/root/.Trash/Current/ls: `/user/root/.Trash/Current/': No such file or directory[root@HadoopNode00 ~]# hadoop fs -ls /user/root/.Trash/Found 1 itemsdrwx------   - root supergroup          0 2019-12-04 00:43 /user/root/.Trash/191204004400[root@HadoopNode00 ~]# hadoop fs -ls /user/root/.Trash/191204004400Found 1 itemsdrwxr-xr-x   - root supergroup          0 2019-12-04 00:32 /user/root/.Trash/191204004400/baizhi[root@HadoopNode00 ~]# hadoop fs -ls /user/root/.Trash/Found 1 itemsdrwx------   - root supergroup          0 2019-12-04 00:43 /user/root/.Trash/191204004400[root@HadoopNode00 ~]# hadoop fs -ls /user/root/.Trash/Found 1 itemsdrwx------   - root supergroup          0 2019-12-04 00:43 /user/root/.Trash/191204004400[root@HadoopNode00 ~]# hadoop fs -ls /user/root/.Trash/191204004400ls: `/user/root/.Trash/191204004400': No such file or directory


在Windows下配置Hadoop环境解压 Hadoop安装包到任意目录中将winutils.exe和hadoop.dll拷贝到win下的hadoop的bin目录中window下配置hadoop环境变量在win下配置主机名和IP的映射关系


HDFS Java API依赖

<!--  --><dependency>    <groupId>org.apache.hadoop</groupId>    <artifactId>hadoop-hdfs</artifactId>    <version>2.6.0</version></dependency> <!--  --><dependency>    <groupId>org.apache.hadoop</groupId>    <artifactId>hadoop-common</artifactId>    <version>2.6.0</version></dependency>
权限不足解决方案
org.apache.hadoop.security.AccessControlException: Permission denied: user=Administrator, access=WRITE, inode="/":root:supergroup:drwxr-xr-x	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:272)	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:257)	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:238)	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:179)	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6512)	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6494)


方案1

通过System.setProperty 设置系统用户名

  System.setProperty("HADOOP_USER_NAME","root");
方案2

通过-DHADOOP_USER_NAME=root 设置系统用户名

-DHADOOP_USER_NAME=root
方案3

将权限检查机制关闭

etc/hadoop/hdfs-site.xml

<property>  <name>dfs.permissions.enabled</name>  <value>false</value></property>

建议全部使用方案3 因为web界面上有user会进不去 所以需要使用方案3


获取客户端对象

 Configuration configuration;    FileSystem fileSystem;    @Before    public void getClient() throws Exception {        configuration = new Configuration();        /*         * 第一种方式 手动设置需要连接的信息         * */        //configuration.set("fs.defaultFS","");        configuration.addResource("core-site.xml");        configuration.addResource("hdfs-site.xml");        fileSystem = FileSystem.newInstance(configuration);    }    @After    public void close() throws Exception {        fileSystem.close();    }
上传文件
    @Test    public void upload01() throws Exception {        fileSystem.copyFromLocalFile(new Path("F:\\大数据\\笔记\\Day02-Hadoop\\2019-12-03_143359.png"), new Path("/3.png"));    }
 @Test    public void upload02() throws Exception {        /*         *         * 此时本地文件为输入流         * hdfs上的文件为输出流         * */        //本地文件        FileInputStream inputStream = new FileInputStream(new File("F:\\大数据\\笔记\\Day02-Hadoop\\2019-12-03_143359.png"));        //HDFS文件        FSDataOutputStream outputStream = fileSystem.create(new Path("/4.png"));        IOUtils.copyBytes(inputStream, outputStream, 1024, true);    }
下载文件
       @Test    public void download01() throws Exception {        /*         *(1) hdfs上的文件         *(2)本地上文件         * */        fileSystem.copyToLocalFile(new Path("/1.png"), new Path("D:\\"));    }
@Test    public void download02() throws Exception {        /*         *         * 输入流 hdfs 上文件         * 输出流 本地上的文件         * */        /*         * hdfs 输入流         * */        FSDataInputStream inputStream = fileSystem.open(new Path("/1.png"));        /*         * 本地文件 输出流         * */        FileOutputStream outputStream = new FileOutputStream(new File("D:\\2.png"));        IOUtils.copyBytes(inputStream, outputStream, 1024, true);    }
删除文件
  @Test    public void delete() throws Exception{        boolean delete = fileSystem.delete(new Path("/baizhi"), true);        if (delete){            System.out.println("删除成功");        }else {            System.out.println("删除失败");        }    }



创建文件夹

   @Test    public void mkdir() throws Exception{        fileSystem.mkdirs(new Path("/baizhi"));    }
显示所有文件
@Test    public void list() throws Exception {		// 获取到文件的迭代器对象  可以理解为存储文件信息的集合         RemoteIterator<LocatedFileStatus> remoteIterator = fileSystem.listFiles(new Path("/"), true);    // 遍历集合        while (remoteIterator.hasNext()) {// 拿到集合中每一个对象            LocatedFileStatus fileStatus = remoteIterator.next();            // 拿到path            Path path = fileStatus.getPath();// 输出文件            System.out.println(path.toString());        }    }
测试是否存在
 @Test    public void exist() throws Exception{        boolean exists = fileSystem.exists(new Path("/11.png"));        if (exists) {            System.out.println("yes");        } else {            System.out.println("no");        }}


回收站

 @Test    public void huishouzhan() throws Exception {        //  获取 Trash对象        Trash trash = new Trash(fileSystem, configuration);		//  通过trash 对象删除 文件 到回收站中        boolean b = trash.moveToTrash(new Path("/2.png"));        if (b) {            System.out.println("删除成功");        } else {            System.out.println("删除失败");        }    }
四、HDFS 架构

HDFS是一种能够运行在商业硬件上的分布式文件系统,与目前市面上文件系统有很多相似之处,但是又是不同的软件系统。在HDFS中,使用的架构是主从架构(Aactive|Standby),针对的是实现NameNode的高可用。

NameNode会启动一个(或者说在一个集群中,只有一个NameNode在运行), 管理文件系统的命名空间和控制外界客户端对文件系统的访问;

DataNode是负责管理在每一个节点上存储的文件。


NameNode:存储系统的元数据(用于描述数据的数据|比如说块到DataNode的映射的数据),负责管理DataNode

DataNode:用于存储的数据块的节点,负责响应客户端对块的读写请求,向NameNode汇报块信息

block:数据块,是对文件拆分的最小单元,默认情况下文件拆分大小为128MB 每个数据块有3个副本

rack:机架 使用机架对存储节点的进行物理编排 用于优化存储和计算


什么是Block块

默认为128MB 为一个块 是HDFS中对文件拆分的最小单元

<property>  <name>dfs.blocksize</name>  <value>134217728</value>  <description>      The default block size for new files, in bytes.      You can use the following suffix (case insensitive):      k(kilo), m(mega), g(giga), t(tera), p(peta), e(exa) to specify the size (such as 128k, 512m, 1g, etc.),      Or provide complete size in bytes (such as 134217728 for 128 MB).  </description></property>
<property>  <name>dfs.replication</name>  <value>3</value>  <description>Default block replication.   The actual number of replications can be specified when the file is created.  The default is used if replication is not specified in create time.  </description></property>
(1)为什么块的大小为128MB

在2.X以前,HDFS的块的大小默认为64MB

工业限制,硬件限制(在06年到10年间)

软件优化 通常认为最佳状态是 :寻址时间为传输时间的100分之一

(2)Block块能否随意设置?

答案当然是不能 如果Block块设置的过小,必然会导致集群中存在几百万个小文件 ,增加寻址时间,效率低下。

如果设置的块过大,会造成剩余空间的浪费,还会造成存取的时间过长。


什么是机架

机架感知

For the common case, when the replication factor is three, HDFS’s placement policy is to put one replica on one node in the local rack, another on a different node in the local rack, and the last on a different node in a different rack. This policy cuts the inter-rack write traffic which generally improves write performance. The chance of rack failure is far less than that of node failure; this policy does not impact data reliability and availability guarantees. However, it does reduce the aggregate network bandwidth used when reading data since a block is placed in only two unique racks rather than three. With this policy, the replicas of a file do not evenly distribute across the racks. One third of replicas are on one node, two thirds of replicas are on one rack, and the other third are evenly distributed across the remaining racks. This policy improves write performance without compromising data reliability or read performance.

The current, default replica placement policy described here is a work in progress.


在默认情况下 ,当副本因子为3时,第一个副本存在本地一个机架上,第二存在本地的机架的另外一个节点上,第三个存储除本地机架以外的节点上。

此操作减少了机架间的数据流量传出,降低了聚合带宽的压力。

NameNode和SecondaryNameNode 的关系 (重点)

Fsimage:元数据信息的备份(持久化),会加载在内存中

edits:Edits文件帮助记录文件增加和更新操作,提高效率


NameNode在启动的时候需要加载edits(日志文件)和fsimage(文件)

所以在第一启动namenode时候需要格式化namenode

当用户上传文件或者进行其他文件的操作的时候,会将数据写入至edits文件中,这样edits和fsimage加起来的数据永远是最新的。

如果此时用户一直进行操作,会导致edits文件过于庞大,这就导致了在下次启动的时候(因为启动时需要加载两个文件),时间会相当的长。

为了解决这个问题,出现了SecondaryNodenode,将当前NameNode的edits和fsimage文件加载过来,将文件持久化到fsimage之后,将新的fsimage上传至NameNode。

但是这个时候还会出现另外一个问题,当SecondaryNamenode进行文件持久化的时候,用户可能在这个期间需要进行操作,直接将数据写入edits日志文件 话会导致数据的紊乱,所以解决方案是将数据写入另外一个叫做edits_inprogress文件当中

值得注意的是:SecondaryNamenode是对Namenode的优化方案


检查点

检查点机制决定NN启动后的fsimage和edits文件的合并时机

dfs.namenode.checkpoint.period(默认设置为1小时)指定两个连续检查点之间的最大延迟dfs.namenode.checkpoint.txns(默认设置为100万)定义了NameNode上的非检查点事务数,即使尚未达到检查点期限,该事务也会强制执行紧急检查点。

相关配置

SecondaryNameNode每隔一小时执行一次

<property>  <name>dfs.namenode.checkpoint.period</name><value>3600</value></property>


一分钟检查一次操作次数,当操作次数达到1百万时,SecondaryNameNode执行一次。

<property>  <name>dfs.namenode.checkpoint.txns</name>  <value>1000000</value><description>操作动作次数</description></property><property>  <name>dfs.namenode.checkpoint.check.period</name>  <value>60</value><description> 1分钟检查一次操作次数</description></property >
安全模式

在集群启动的时候,因为会加载fsimage文件和edits文件,所以会短暂的开启安全模式(SafeMode),此时集群对外是只读模式,只允许就行读取操作,禁止其他操作。

同时,用户也可以通过命令开启只读模式。

# 开启只读模式|安全模式[root@HadoopNode00 ~]# hdfs dfsadmin  -safemode enterSafe mode is ON# 上传文件不被允许[root@HadoopNode00 ~]# hadoop fs -put /root/baizhi1.log  /put: Cannot create file/baizhi1.log._COPYING_. Name node is in safe mode.# 关闭只读模式|安全模式[root@HadoopNode00 ~]# hdfs dfsadmin  -safemode leaveSafe mode is OFF# 上传文件被允许 [root@HadoopNode00 ~]# hadoop fs -put /root/baizhi1.log  /# 查看文件已经存在[root@HadoopNode00 ~]# hadoop fs -ls /Found 6 items-rw-r--r--   1 root          supergroup      16809 2019-12-04 18:27 /1.png-rw-r--r--   1 root          supergroup      16809 2019-12-04 18:23 /2019-12-03_143359.png-rw-r--r--   1 Administrator supergroup      16809 2019-12-04 18:34 /3.png-rw-r--r--   1 Administrator supergroup      16809 2019-12-04 18:43 /4.png-rw-r--r--   1 root          supergroup       8901 2019-12-05 01:40 /baizhi1.logdrwx------   - root          supergroup          0 2019-12-04 19:30 /user


为什么说HDFS 不擅长存储小文件?


文件namenode内存占用datanode磁盘占用单个128MB文件一个block块的元数据信息 1KB128MB128M 10000个文件10000个block块的元数据信息 10MB128MB

因为NameNode是使用单机的内存存储元数据,因此导致Namanode的内存紧张


引申出来的问题:

(1)Namenode的所在物理节点内存应该给的多一点

(2)理论上hdfs是可以无限扩充,因为可以在横向上拓展无数个节点。但是因为Namenode实际只能一个在运行,所以hdfs 的上限容量受制namenode的内存上限容量。


HDFS 读写流程(了解)写流程


读流程


DataNode的工作机制(了解)


五、MapReduce概述

MapReduce是一种编程模型,用于大规模数据集 (大于1TB)的并行运算。概念"Map(映射)"和"Reduce(归约)",是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。

一是 软件框架,二是并行处理,三是可靠容错,四是大规模集群,五是海量数据


MapReduce为什么擅长处理大数据?这是由它的设计思想发觉的,MapReduce的思想就是分而治之

Map负责分:即把复杂的任务分解成若干个简单的任务来处理,简单的含义包含三层:是数据计算规模相对于原来的规模要大大减少是就近计算原则,任务会被分配到所需数据的节点上进行计算是这些小任务可以并行计算,彼此间没有依赖关系Reduce负责治(对于map阶段计算结果进行汇总)

一个比较形象语言解释MapReduce:统计图书馆所有书 你数1号书架 我数2号书架,这就是Map 我们人越多,数书的速度越快,现在我们一起,将所有的人统计数加在一起 这就是reduce


为什么使用MR?


package hdfs;import java.io.*;public class CleanApp {    public static void main(String[] args) throws IOException {        File file = new File("E:\\home\\logs\\access.tmp2019-05-19-10-28.log");        BufferedReader bufferedReader = new BufferedReader(new FileReader(file));        FileWriter fileWriter = new FileWriter("E:\\home\\logs\\clean_access.tmp2019-05-19-10-28.log");        while (true) {            String line = bufferedReader.readLine();            if (line == null) return;            boolean contains = line.contains("thisisshortvideoproject'slog");            if (contains) {                String s = line.split("thisisshortvideoproject'slog")[0];                fileWriter.write(s+"\n");                fileWriter.flush();            }        }            }}


在数据量不大的时候,处理的速度是相当的块。但是经过测试,90mb的数据就需要2-3S,一旦数据量增长到以GB|TB为计量单位的时候,此单机程序将不再适应场景的计算计算需求。


MR的优缺点优点易于编写分布式应用程序有良好的拓展性适合离线批处理高容错性缺点:延迟高不适合流处理不擅长图计算


YARN 环境搭建

ResoueceManager:统筹计算资源 管理所有的NodeManager,进行资源分配

NodeManager:管理着主机上的计算资源,负责向RM汇报自身的状态

App Master:计算任务的Master 负责向RM申请资源,协调计算任务

YarnChild:负责做实际计算的任务

Container :计算资源的抽象(代表着一组内存、cpu、硬盘、网络),无论是AppMaster还是YarnChild都需要消耗一个Container


配置 etc/hadoop/yarn-site.xml

<property>    <name>yarn.nodemanager.aux-services</name>    <value>mapreduce_shuffle</value></property><!--Resource Manager--><property>    <name>yarn.resourcemanager.hostname</name>    <value>HadoopNode00</value></property>

配置 etc/hadoop/mapred-site.xml

此文件需要自己复制

[root@HadoopNode00 ~]# cp /home/hadoop/hadoop-2.6.0/etc/hadoop/mapred-site.xml.template /home/hadoop/hadoop-2.6.0/etc/hadoop/mapred-site.xml
<property>    <name>mapreduce.framework.name</name>    <value>yarn</value></property>
启动
[root@HadoopNode00 ~]# start-yarn.shstarting yarn daemonsstarting resourcemanager, logging to /home/hadoop/hadoop-2.6.0/logs/yarn-root-resourcemanager-HadoopNode00.outlocalhost: starting nodemanager, logging to /home/hadoop/hadoop-2.6.0/logs/yarn-root-nodemanager-HadoopNode00.out[root@HadoopNode00 ~]# jps21520 NodeManager21668 Jps21284 ResourceManager11269 SecondaryNameNode10985 NameNode11082 DataNode
入门案例 -WordCount依赖
<dependency>    <groupId>org.apache.hadoop</groupId>    <artifactId>hadoop-mapreduce-client-core</artifactId>    <version>2.6.0</version></dependency><dependency>    <groupId>org.apache.hadoop</groupId>    <artifactId>hadoop-mapreduce-client-jobclient</artifactId>    <version>2.6.0</version></dependency>
需求
单词计数gjf zkf rxx whp jzz gzy cpxzkf rxx jzz gzy cpxgjf rxx jzz cpxgjf zkf rxx whp jzz cpxgjf zkf rxx whp jzz gzy cpxcpx	5gjf	4gzy	3jzz	5rxx	5whp	3zkf	4
Map代码
package com.baizhi.mr.test01;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;import java.io.Serializable;/* * 原来的键值对 * keyIn 输入文本行的偏移量 Long   LongWritable * valueIn String 当前行的文本 * * 新的键值对 * keyOut * valueOut * * */public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {    @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        /*        * 获得到每一行 文本 将文本按照空格进行切割 拿到数组 数组中每一个数据就是一个名字        * */        String[] names = value.toString().split(" ");        /*        * 遍历数组        * */        for (String name : names) {            /*            * 拿到每一个名字 写出的格式为(name,1)            * */            context.write(new Text(name), new IntWritable(1));        }    }}
Reduce代码
package com.baizhi.mr.test01;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/* * MAP 处理过后的数据 * keyIn * valueIn * 数据类型要一致 * */public class WCReducer extends Reducer<Text, IntWritable, Text, LongWritable> {    @Override    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {        /*         *         * (gjf, [1,1,1])         *  key  Iterable<IntWritable> values         * */        /*        * 准备累加数据        * */        long sum = 0;        /*        * 遍历迭代器 获得每一个数据      * */        for (IntWritable value : values) {            /*             * 做累加操作             * */            sum += value.get();        }        /*        * 写出数据 数据格式为 (name,出现的次数)        * */        context.write(key, new LongWritable(sum));    }}


Job封装

package com.baizhi.mr.test01;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class WCJob {    public static void main(String[] args) throws Exception {        /*         * Job 封装         * */        /*         * 准备Configuration对象         * */        Configuration conf = new Configuration();        /*         * 获得到Job对象 需要填入必备参数Configuration         * */        Job job = Job.getInstance(conf, "wc01");        /*         * 输入组件 | 输出组件         * */        job.setInputFormatClass(TextInputFormat.class);        job.setOutputFormatClass(TextOutputFormat.class);        /*         * 设置数据读入|输入路径         * */        TextInputFormat.setInputPaths(job, new Path("/emp.txt"));        /*         * 设置数据输出|写出路径         * */        TextOutputFormat.setOutputPath(job, new Path("/out01"));        /*         * 设置map和reduce的计算逻辑         * */        job.setMapperClass(WCMapper.class);        job.setReducerClass(WCReducer.class);        /*         * 设置 map端输出的key和value类型         * */        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(IntWritable.class);        /*         * 设置 reducer端输出的key和value类型         * */        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(LongWritable.class);        /*         * job的提交         * */        //job.submit();        job.waitForCompletion(true);    }}


我的同学


Job运行(1)远程Jar 包部署

   // 设置jar 类加载器 否则找不到Mapper和Reducer   job.setJarByClass(WCJob.class);
打包 mvn clean package上传jar包到Linux 文件系统中保证HDFS 和 Yarn 都正常启动和运行通过Hadoop JAR  命令运行[root@HadoopNode00 ~]# hadoop jar Hadoop_Test-1.0-SNAPSHOT.jar com.baizhi.mr.test01.WCJob


(2)本地仿真

可能会出现的问题:


覆盖NativeIO文件,如果关联源码557,没有关联278,将对应行代码给短路

public static boolean access(String path, NativeIO.Windows.AccessRight desiredAccess) throws IOException {    return true;    //  return access0(path, desiredAccess.accessRight());}


直接右键运行

请注意:不管是HDFS还是本地文件系统 ,输出目录都不能重复


引入Log4j

 <dependency>   <groupId>log4j</groupId>    <artifactId>log4j</artifactId>   <version>1.2.17</version> </dependency>


log4j.properties

log4j.rootLogger=info,stdout#consolelog4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern= [%d{yyyy-MM-dd HH:mm:ss a}]:%p %l%m%n


注意的问题:

在路径前面加上是为了让MR程序识别成本地的文件


(3)跨平台提交

从win下直接右键运行,此MR程序会通过网络传输到Linux 集群下进行运行

需要将四个配置文件拷贝到Resource目录中core-site.xmlhdfs-site.xmlyarn-site.xmlmapred-site.xmlconf.addResource("conf2/core-site.xml"); conf.addResource("conf2/hdfs-site.xml"); conf.addResource("conf2/mapred-site.xml"); conf.addResource("conf2/yarn-site.xml");设置允许跨平台提交二选一在代码中添加 conf.set("mapreduce.app-submission.cross-platform", "true"); 配置mapred-site.xml在本地的配置文件添加即可<property> <description>If enabled, user can submit an application cross-platform i.e. submit an application from a Windows client to a Linux/Unix server or vice versa. </description> <name>mapreduce.app-submission.cross-platform</name> <value>false</value> </property>设置系统用户名为root System.setProperty("HADOOP_USER_NAME", "root");设置Jar包路径 conf.set(MRJobConfig.JAR,":\\大数据\\代码\\BigData\\Hadoop_Test\\target\\Hadoop_Test-1.0-SNAPSHOT.jar");


可能会出现的问题:

在进行跨平台提交的时候,会把hdfs上文件识别成本地问题

解决办法:

在hdfs文件之前加上hdfs://hadoopNode00:9000/


自定义Bean对象

在开发中引入实体类的概念是为了简化开发,Hadoop中只有Long,Int这样的基础类型有内置的基于Hadoop的实现好的LongWritable,IntWritable即时是最基础的数据类型,在网络中传输需要就进行序列化,在Hadoop中使用对象也需要进行序列化操作。为什么不用Java内置的序列化机制?JDK内置的序列化是一个重量级的框架,所以不适合hadoop大数据计算需要高效且快速的计算需求。Hadoop有一套自己的序列化方案,叫做Writable
需求

流量统计

电话号码      上传流量  下载流量18611781163  30 	  10      bj18611781163  20 	  30      bj18611781163  41 	  20      bj18611781163  32 	  52      bj15617826008  23       53       zz15617826008  20       54       zz15617826008  20       55       zz15617826008  20       56       zz18711671153  15       61      tj18711671153  15       62      tj18711671153  15       63      tj16519756181  50       54      sh16519756181  50       54      sh16519756181  50       56      sh

最终的结果:

电话        上行      下行     总流量1861781163  ?        ?       ?


package com.baizhi.test03;import com.baizhi.test02.job01.Job01;import com.baizhi.test02.job01.Job01Mapper;import com.baizhi.test02.job01.Job01Reducer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.MRJobConfig;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.junit.Before;public class BeanJob {    public static void main(String[] args) throws Exception {        System.setProperty("HADOOP_USER_NAME", "root");        Configuration conf = new Configuration();        conf.addResource("conf2/core-site.xml");        conf.addResource("conf2/hdfs-site.xml");        conf.addResource("conf2/mapred-site.xml");        conf.addResource("conf2/yarn-site.xml");        conf.set("mapreduce.app-submission.cross-platform", "true");        conf.set(MRJobConfig.JAR, "F:\\大数据\\代码\\BigData\\Hadoop_Test\\target\\Hadoop_Test-1.0-SNAPSHOT.jar");        Job job = Job.getInstance(conf);        /*         * 设置类加载器         * */        job.setJarByClass(BeanJob.class);        job.setInputFormatClass(TextInputFormat.class);        job.setOutputFormatClass(TextOutputFormat.class);        //TextInputFormat.setInputPaths(job, new Path("F:\\大数据\\笔记\\Day02-Hadoop\\作业\\数据\\log.txt"));        TextInputFormat.setInputPaths(job, new Path("/flow.txt"));        //TextOutputFormat.setOutputPath(job, new Path("F:\\大数据\\笔记\\Day02-Hadoop\\作业\\数据\\out1"));        TextOutputFormat.setOutputPath(job, new Path("/out111111"));        job.setMapperClass(BeanMapper.class);        job.setReducerClass(BeanReducer.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(FlowBean.class);        job.setOutputKeyClass(FlowBean.class);        job.setOutputValueClass(NullWritable.class);        job.waitForCompletion(true);    }}
package com.baizhi.test03;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/* * * keyOut 电话号码 * */public class BeanMapper extends Mapper<LongWritable, Text, Text, FlowBean> {    @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        // 拿到数据 使用空格进行分割        String[] flowinfos = value.toString().split(" ");        /*         * 拿到下标为0的电话         * */        String phone = flowinfos[0];        /*         * 拿到下标为1的上传流量         * */        Long upFlow = Long.valueOf(flowinfos[1]);        /*         * 拿到下标为2的下载流量         * */        Long downFlow = Long.valueOf(flowinfos[2]);        /*         * 准备bean对象         * */        FlowBean flowBean = new FlowBean();        /*         * */        flowBean.setPhone(phone);        flowBean.setUpFlow(upFlow);        flowBean.setDownFlow(downFlow);        /*         * 设置总流量 上传+下载         * */        flowBean.setSumFlow(upFlow + downFlow);        /*         * 数据写出         * */        context.write(new Text(phone), flowBean);    }}
package com.baizhi.test03;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class BeanReducer extends Reducer<Text, FlowBean, FlowBean, NullWritable> {    @Override    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {        /*         *         * (18611781163,[FlowBean{18611781163,30,20,50},FlowBean{18611781163,30,20,50}FlowBean{18611781163,30,20,50}])         * */        /*         *         * 准备上传字段*/        Long up = 0L;        /*         *         * 准备下载字段*/        Long down = 0L;        /*         *         * 准备总流量字段*/        Long sum = 0L;        /*         * 遍历数组进行累加         * */        for (FlowBean value : values) {            up += value.getUpFlow();            down += value.getDownFlow();            sum += value.getSumFlow();        }        /*         * 准备keyOut对象 填充数据         * */        FlowBean flowBean = new FlowBean(key.toString(), up, down, sum);        /*         * 输出         * */        context.write(flowBean, NullWritable.get());    }}


package com.baizhi.test03;import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.io.Serializable;public class FlowBean implements Writable {    private String phone;    private Long upFlow;    private Long downFlow;    private Long sumFlow;    public FlowBean() {    }    public FlowBean(String phone, Long upFlow, Long downFlow, Long sumFlow) {        this.phone = phone;        this.upFlow = upFlow;        this.downFlow = downFlow;        this.sumFlow = sumFlow;    }    public String getPhone() {        return phone;    }    public void setPhone(String phone) {        this.phone = phone;    }    public Long getUpFlow() {        return upFlow;    }    public void setUpFlow(Long upFlow) {        this.upFlow = upFlow;    }    public Long getDownFlow() {        return downFlow;    }    public void setDownFlow(Long downFlow) {        this.downFlow = downFlow;    }    public Long getSumFlow() {        return sumFlow;    }    public void setSumFlow(Long sumFlow) {        this.sumFlow = sumFlow;    }    @Override    public String toString() {        return "FlowBean{" +                "phone='" + phone + '\'' +                ", upFlow=" + upFlow +                ", downFlow=" + downFlow +                ", sumFlow=" + sumFlow +                '}';    }    /*     * 往外写 序列化  编码     * */    public void write(DataOutput dataOutput) throws IOException {        dataOutput.writeUTF(this.phone);        dataOutput.writeLong(this.upFlow);        dataOutput.writeLong(this.downFlow);        dataOutput.writeLong(this.sumFlow);    }    /*     * 往里读 反序列化  解码     * */    public void readFields(DataInput dataInput) throws IOException {        this.phone = dataInput.readUTF();        this.upFlow = dataInput.readLong();        this.downFlow = dataInput.readLong();        this.sumFlow = dataInput.readLong();    }}


MapReduce 计算流程(重点)


后花园


(1)程序员所编写的MR代码,一旦运行就可以称之为一个Job

(2)Job在启动之后,会首先向RM注册相关信息

(3)如果注册通过 则向共享文件系统(HDFS)拷贝先关资源的信息

(4)提交完成的Job信息给RM

(5)拿到Job信息,根据Job的情况,消耗资源连接到某个节点的上NodeManager去启动MR AppMaster

(6)MR AppMaster 首先会初始化Job

(7)去共享文件系统中获取输入切片相关的信息

(8)MR AppMaster向RM申请资源去进行计算

(9)拿到资源后,连接到某个NodeManager去启动Yarn Child

(10)Yarn Child去共享文件系统获取完成的Job信息

(11)Yarn Child根据任务阶段启动MapTask或者ReduceTask进程进行真正的计算任务的执行,直至计算任务完成,此两个进程完全关闭,客户端停止等待,结束运行。


Job 提交流程(重点)Connect

如果是本地模式则建立本地 Local 相关的对象 如果集群模式提交则建立 Yarn 相关的对象

Submit(1)校验空间

package org.apache.hadoop.mapreduce.lib.output; 包

FileOutputFormat.class 类

getOutputPath 方法

72~82行

规定输出路径不能为空

规定输出路径不能重复

public static Path getOutputPath(JobContext job) {        String name = job.getConfiguration().get("mapreduce.output.fileoutputformat.outputdir");        return name == null ? null : new Path(name); }public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException, IOException {        // 通过getOutputPath 拿到路径        Path outDir = getOutputPath(job);        if (outDir == null) { // 如果输出路径为空 则输出Output directory not set.            throw new InvalidJobConfException("Output directory not set.");        } else {            TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[]{outDir}, job.getConfiguration());            // 规定输出路径不能重复            if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {                throw new FileAlreadyExistsException("Output directory " + outDir + " already exists");            }        }    }
(2)创建Staging 路径

Staging 路径是存放当前集群下运行的Job所需产生或者需要的资源文件

package org.apache.hadoop.mapreduce; 包

JobSubmissionFiles 类

getStagingDir 方法

60-82行

 public static Path getStagingDir(Cluster cluster, Configuration conf) throws IOException, InterruptedException {     	// 获取到 Staging 路径        Path stagingArea = cluster.getStagingAreaDir();        FileSystem fs = stagingArea.getFileSystem(conf);        UserGroupInformation ugi = UserGroupInformation.getLoginUser();        String realUser = ugi.getShortUserName();        String currentUser = UserGroupInformation.getCurrentUser().getShortUserName();     	//  判断当前Staging 路径 是否存在        if (fs.exists(stagingArea)) {            FileStatus fsStatus = fs.getFileStatus(stagingArea);            String owner = fsStatus.getOwner();            if (!owner.equals(currentUser) && !owner.equals(realUser)) {                throw new IOException("The ownership on the staging directory " + stagingArea + " is not as expected. " + "It is owned by " + owner + ". The directory must " + "be owned by the submitter " + currentUser + " or " + "by " + realUser);            }            if (!fsStatus.getPermission().equals(JOB_DIR_PERMISSION)) {                LOG.info("Permissions on staging directory " + stagingArea + " are " + "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " + "to correct value " + JOB_DIR_PERMISSION);                fs.setPermission(stagingArea, JOB_DIR_PERMISSION);            }        } else {            //  不存在则创建此 Staging 路径            fs.mkdirs(stagingArea, new FsPermission(JOB_DIR_PERMISSION));        }        return stagingArea;    }


(3)创建Job路径

package org.apache.hadoop.mapreduce; 包

JobSubmitter 类

copyAndConfigureFiles 方法

138行

// 创建Job 路径 FileSystem.mkdirs(this.jtFs, submitJobDir, mapredSysPerms);


(4)拷贝资源信息

package org.apache.hadoop.mapreduce; 包

JobSubmitter 类

copyAndConfigureFiles 方法

     151行     if (files != null)      179行     if (libjars != null)      193行     if (archives != null)      220行     if (jobJar != null)
(5)生成切片文件

小发现:在本地模式运行下切片的切分是以32MB进行切分

package org.apache.hadoop.mapreduce; JobSubmitter writeSplits

472-482行

 private int writeSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException {        JobConf jConf = (JobConf)job.getConfiguration();        int maps;        if (jConf.getUseNewMapper()) {            // 中间过程为具体如何进行切片的划分  后续会讲到            maps = this.writeNewSplits(job, jobSubmitDir);        } else {            maps = this.writeOldSplits(jConf, jobSubmitDir);        }        return maps;    }
(6)写XML 配置文件

package org.apache.hadoop.mapreduce; 包 JobSubmitter 类 writeConf 方法

440-449行

private void writeConf(Configuration conf, Path jobFile) throws IOException {        FSDataOutputStream out = FileSystem.create(this.jtFs, jobFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));        try {            conf.writeXml(out);        } finally {            out.close();        }    }
Map Reduce 组件解析概述

通过WC 案例的编写,我们不难发现,其实是按照一定规则执行程序的输入和输出目录,将作业提交到集群中。

Hadoop 是将数据切分成若干个输入切片(Input Split),并将每个切片交由每一个MapTask处理;

MapTask不断从Input Split的文件中解析出一个一个KeyValue,并调用Map()函数处理,处理之后交由ReduceTask处理。根据ReduceTask的个数,将数据结果分成若干个分片(partition)先写到磁盘。

同时,每个ReduceTask从每个MapTask的节点上读取属于自己的分区(partition),然后使用基于排序的方法将key相同的数据聚集在一起。调用Reduce函数,最终输出到硬盘中。


通过上面的描述,我们不难发现,上面的程序还缺少或者体现除了三个组件。

(1)指定文件格式,将输入数据切分成若干个Split,且将每一个Split中数据解析成一个一个的符合map函数输入需求的keyvalue数据。

(2)确定map() 函数产生的每个keyvalue发送给那个ReduceTask函数处理。

(3)确定输出格式,即将每个keyvalue对以何种形式进行保存。

标签: #hadoop切片计算