Hadoop基础

大数据处理流程

下图展现了大数据生态中设计的各种组件及工具。

大数据处理的主要流程包括数据收集、数据存储、数据处理、数据应用等主要环节。

数据采集

在数据采集阶段,现有的中大型项目通常采用微服务架构进行分布式部署,所以数据采集需要在多台服务器上进行,且采集过程不能影响正常业务的开展。基于上述需求,就衍生了多种日志收集工具,如Flume、Logstash、Kibana等,它们都能通过简单的配置完成复杂的数据收集和数据聚合。

数据存储

在数据存储阶段,大家所熟知的例如MySQL、Oracle等传统的关系型数据库,其优点在于能够快速存储结构化的数据,并且支持随机访问,但大数据的数据结构通过是半结构化(如日志数据)、非结构化数据(如音频、视频数据),为了解决海量半结构化和非结构化数据的存储,Hadoop HDFS、KFS、GFS等分布式文件系统应运而生,它们都能够支持结构化、半结构和非结构化数据的存储,并可以通过增加机器进行横向扩展。

分布式文件系统完美地解决了海量数据存储的问题,但是一个优秀的数据存储系统需要同时考虑数据存储和访问两方面的问题,比如你希望能够对数据进行随机访问,这是传统的关系型数据库所擅长的,但却不是分布式文件系统所擅长的,那么有没有一种存储方案能够同时兼具分布式文件系统和关系型数据库的优点,基于这种需求,就产生了HBase、MongoDB。

数据分析

在数据分析阶段通常分为两种:批处理和流处理。

  • 批处理:对数据进行离线处理的方式,数据会按照一定的时间间隔或者数据量进行批量处理,对应的处理框架有Hadoop MapReduce、Spark、Flink等。批处理可以对大量数据进行高效处理和分析,适用于需要对历史数据进行分析和挖掘的场景,如离线数仓、批量报表、离线推荐等场景。

  • 流处理:对数据进行实时处理的方式,数据会以流的形式不断地产生和处理,对应的处理框架有Storm、Spark Streaming、Flink Streaming等。流处理可以快速响应数据的变化,及时地进行数据处理和分析,适用于需要实时处理数据的场景,如实时数仓、实时监控、实时推荐等场景。

流处理和批处理都是常用的数据处理方式,它们各有优劣。流处理通常用于需要实时响应的场景,如在线监控和警报系统等;批处理则通常用于离线数据分析和挖掘等大规模数据处理场景。

同时,为了能够让熟悉SQL的人员也能够进行数据的分析,查询分析框架应运而生,常用的有Hive 、Spark SQL 、Flink SQL、Pig、Phoenix等。这些框架都能够使用标准的SQL或者类SQL语法灵活地进行数据的查询分析。这些SQL经过解析优化后转换为对应的作业程序来运行,如Hive本质上就是将SQL转换为MapReduce作业,Spark SQL将SQL转换为一系列的弹性分布式数据集(Resilient Distributed Dataset,RDD)和转换关系(Transformations),Phoenix将SQL查询转换为一个或多个HBase Scan。

数据应用

数据应用的领域广泛且多样,例如数据可视化广泛应用于科学研究、医疗健康、交通管理等领域,帮助专业人士更好地分析和理解数据;个性化推荐通过分析用户的行为数据、兴趣偏好等信息,为用户提供个性化的内容推荐,如短视频个性化推荐、电商商品推荐、头条新闻推荐等;数据也可以应用于机器学习模型训练,如金融领域,银行可以利用客户的历史交易数据训练机器学习模型,用于评估客户的信用风险,从而更好地进行信贷审批。

Hadoop

Hadoop起源

在大数据时代,针对大数据处理的新技术也在不断地开发和运用中,并逐渐成为数据处理挖掘行业广泛使用的主流技术。Hadoop作为处理大数据的分布式存储和计算框架,已在国内外大、中、小型企业中得到了广泛应用。

Hadoop是由Apache的Lucence项目创始人道格·卡廷创建的,Lucence是一个应用广泛的文本搜索系统库。Hadoop起源于开源的网络搜索引擎Nutch,Nutch本身也是Lucence项目的一部分,Hadoop的发展历史如下图所示。

Hadoop核心组件

Hadoop的核心组件主要包括Hadoop Common、HDFS(Hadoop Distributed File System)、YARN(Yet Another Resource Negotiator)和MapReduce,这些组件共同构成了Hadoop的基础架构,提供了分布式存储和计算的能力。

  • Hadoop Common提供库和工具,支持其他Hadoop模块运行,包括文件系统抽象、工具和库,用于访问文件系统,以及运行Hadoop的守护进程。
  • HDFS是Hadoop的分布式文件系统,用于存储数据。HDFS将大文件切分成多个数据块,并将这些数据块分布式地存储在集群的多个节点上,以提供高容错性和可扩展性。
  • YARN是Hadoop的资源管理器,负责集群资源的调度和管理。YARN将集群的计算资源划分为多个容器(Containers),并分配给不同的应用程序进行处理。
  • MapReduce是Hadoop的计算框架,用于分布式处理数据。MapReduce模型将计算任务分解为Map和Reduce两个阶段。Map阶段将输入数据划分为多个片段,并在集群的不同节点上并行处理;Reduce阶段将Map阶段的结果进行合并和汇总,生成最终的输出结果。

Hadoop环境搭建

创建一个Hadoop的文件夹,结合配置docker-compose文件和hadoop.env文件,通过Docker Compose来部署一个Hadoop集群。

在docker-compose的配置中定义了5个服务:

  • namenode:HDFS的NameNode,负责管理文件系统的元数据
  • datanode:HDFS的DataNode,负责存储实际的数据块
  • resourcemanager:YARN的ResourceManager,负责集群资源管理和任务调度
  • nodemanager:YARN的NodeManager,负责管理单个节点上的资源和任务
  • historyserver:YARN的HistoryServer,负责保存和展示已完成任务的历史信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
version: "3"

services:
namenode:
image: "bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8"
container_name: "namenode"
networks:
- "hadoop-network"
restart: "always"
ports:
- "9870:9870"
- "9000:9000"
volumes:
- "hadoop_namenode:/hadoop/dfs/name"
environment:
CLUSTER_NAME: "docker-hadoop-cluster"
env_file:
- "./hadoop.env"

datanode:
image: "bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8"
container_name: "datanode"
networks:
- "hadoop-network"
restart: "always"
volumes:
- "hadoop_datanode:/hadoop/dfs/data"
environment:
SERVICE_PRECONDITION: "namenode:9870"
env_file:
- "./hadoop.env"

resourcemanager:
image: "bde2020/hadoop-resourcemanager:2.0.0-hadoop3.2.1-java8"
container_name: "resourcemanager"
networks:
- "hadoop-network"
restart: "always"
ports:
- "8088:8088" # Web UI
environment:
SERVICE_PRECONDITION: "namenode:9000,namenode:9870,datanode:9864"
env_file:
- "./hadoop.env"

nodemanager:
image: "bde2020/hadoop-nodemanager:2.0.0-hadoop3.2.1-java8"
container_name: "nodemanager"
networks:
- "hadoop-network"
restart: "always"
environment:
SERVICE_PRECONDITION: "namenode:9000,namenode:9870,datanode:9864,resourcemanager:8088"
env_file:
- "./hadoop.env"

historyserver:
image: "bde2020/hadoop-historyserver:2.0.0-hadoop3.2.1-java8"
container_name: "historyserver"
networks:
- "hadoop-network"
restart: "always"
ports:
- "8188:8188"
environment:
SERVICE_PRECONDITION: "namenode:9000,namenode:9870,datanode:9864,resourcemanager:8088"
volumes:
- "hadoop_historyserver:/hadoop/yarn/timeline"
env_file:
- "./hadoop.env"

volumes:
hadoop_namenode:
hadoop_datanode:
hadoop_historyserver:

networks:
hadoop-network:
driver: "bridge"
# ipam:
# config:
# - subnet: "172.23.0.0/24"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
CORE_CONF_fs_defaultFS=hdfs://namenode:9000
CORE_CONF_hadoop_http_staticuser_user=root
CORE_CONF_hadoop_proxyuser_hue_hosts=*
CORE_CONF_hadoop_proxyuser_hue_groups=*
CORE_CONF_io_compression_codecs=org.apache.hadoop.io.compress.SnappyCodec

HDFS_CONF_dfs_webhdfs_enabled=true
HDFS_CONF_dfs_permissions_enabled=false
HDFS_CONF_dfs_namenode_datanode_registration_ip___hostname___check=false

YARN_CONF_yarn_log___aggregation___enable=true
YARN_CONF_yarn_log_server_url=http://historyserver:8188/applicationhistory/logs/
YARN_CONF_yarn_resourcemanager_recovery_enabled=true
YARN_CONF_yarn_resourcemanager_store_class=org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore
YARN_CONF_yarn_resourcemanager_scheduler_class=org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler
YARN_CONF_yarn_scheduler_capacity_root_default_maximum___allocation___mb=8192
YARN_CONF_yarn_scheduler_capacity_root_default_maximum___allocation___mb=8192
YARN_CONF_yarn_resourcemanager_fs_state___store_uri=/rmstate
YARN_CONF_yarn_resourcemanager_system___metrics___publisher_enabled=true
YARN_CONF_yarn_resourcemanager_hostname=resourcemanager
YARN_CONF_yarn_resourcemanager_address=resourcemanager:8032
YARN_CONF_yarn_resourcemanager_scheduler_address=resourcemanager:8030
YARN_CONF_yarn_resourcemanager_resource__tracker_address=resourcemanager:8031
YARN_CONF_yarn_timeline___service_enabled=true
YARN_CONF_yarn_timeline___service_generic___application___history_enabled=true
YARN_CONF_yarn_timeline___service_hostname=historyserver
YARN_CONF_mapreduce_map_output_compress=true
YARN_CONF_mapred_map_output_compress_codec=org.apache.hadoop.io.compress.SnappyCodec
YARN_CONF_yarn_nodemanager_resource_memory___mb=16384
YARN_CONF_yarn_nodemanager_resource_cpu___vcores=8
YARN_CONF_yarn_nodemanager_disk___health___checker_max___disk___utilization___per___disk___percentage=98.5
YARN_CONF_yarn_nodemanager_remote___app___log___dir=/app-logs
YARN_CONF_yarn_nodemanager_aux___services=mapreduce_shuffle

MAPRED_CONF_mapreduce_framework_name=yarn
MAPRED_CONF_mapred_child_java_opts=-Xmx4096m
MAPRED_CONF_mapreduce_map_memory_mb=4096
MAPRED_CONF_mapreduce_reduce_memory_mb=8192
MAPRED_CONF_mapreduce_map_java_opts=-Xmx3072m
MAPRED_CONF_mapreduce_reduce_java_opts=-Xmx6144m
MAPRED_CONF_yarn_app_mapreduce_am_env=HADOOP_MAPRED_HOME=/opt/hadoop-3.2.1/
MAPRED_CONF_mapreduce_map_env=HADOOP_MAPRED_HOME=/opt/hadoop-3.2.1/
MAPRED_CONF_mapreduce_reduce_env=HADOOP_MAPRED_HOME=/opt/hadoop-3.2.1/

YARN作业测试,通过下面的命令提交一个示例MapReduce作业来进行验证,如果任务成功运行,在HistoryServer Web UI,可以看到一个运行完成状态的作业。

1
hadoop jar /opt/hadoop-3.2.1/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.2.1.jar pi 10 100

HDFS

HDFS设计原则

高容错性

  1. 数据冗余存储
    • HDFS会将文件分割成多个数据块(block),默认情况下每个数据块会在集群中的不同节点上存储三份副本。例如,一个大小为256MB的文件,假设HDFS的块大小为128MB,那么这个文件会被分成两个块。这两个块各自在不同的数据节点上存储三份副本。这样即使某个数据节点出现故障,数据也不会丢失,因为还有其他副本可以使用。
    • 当检测到某个副本丢失(如数据节点故障或数据损坏)时,HDFS会自动重新复制数据块,以恢复到设定的副本数量。这种机制使得HDFS能够在硬件故障频繁的廉价硬件集群环境中可靠地存储数据。
  2. 错误检测与恢复
    • HDFS采用校验和(checksum)来检测数据块是否损坏。在写入数据块时,会计算校验和并存储。当读取数据块时,会再次计算校验和并与存储的校验和进行比较。如果发现校验和不一致,就认为该数据块损坏。此时,HDFS会从其他副本读取数据块,并且会尝试修复损坏的副本,通过重新复制未损坏的副本到原来存储损坏副本的节点或者新的节点。

大规模数据集存储

  1. 大文件存储优化
    • HDFS针对大文件存储进行了优化。它采用大块(默认128MB)来存储数据,这比传统文件系统(如常见的4KB一块)更适合处理大规模数据集。对于大文件,大块可以减少元数据(如块的索引等信息)的管理开销。
  2. 高吞吐量访问
    • 在设计上,HDFS更注重数据的吞吐量而不是低延迟。它适合于大规模数据的顺序读写操作。当进行大规模数据读取时,HDFS可以通过并行读取多个数据块来提高数据传输速率。例如,在进行数据挖掘或者大数据分析任务时,需要读取大量的数据,HDFS能够利用集群中的多个节点并行地读取数据块,从而实现高吞吐量的数据访问。

简单一致性

  1. 一次写入多次读取
    • HDFS采用的是“一次写入,多次读取”的一致性模型。文件一旦被写入后,其内容是不可变的,只能追加数据。例如,在数据仓库场景中,数据一旦被加载到HDFS中,就可以被多个分析任务安全地读取,而不用担心数据在读取过程中被修改导致一致性问题。
  2. 命名空间一致性
    • HDFS的命名空间(包括文件和目录的层级结构)在集群范围内是一致的,所有的客户端看到的文件系统结构是相同的。

流式数据访问

  1. 数据本地性优化
    • HDFS会尽量将计算任务调度到存储数据的数据节点上执行,这就是数据本地性(data-locality)原则,以减少数据在网络中的传输,提高数据处理效率。例如,在运行MapReduce任务时,Hadoop会优先将map任务分配到存储输入数据块的节点上,如果无法在数据本地节点执行,也会尽量在同一个机架内的节点上执行,以减少跨机架的数据传输开销。
  2. 数据流失写入与读取
    • HDFS支持数据的流式写入和读取。对于写入操作,客户端可以像写入一个普通文件一样,连续不断地将数据写入HDFS。HDFS会将这些数据分块存储到集群中。在读取时,客户端也可以以流的方式读取数据块。这种流式访问方式非常适合于处理大规模数据,例如在日志数据收集系统中,日志数据可以源源不断地写入HDFS,后续的数据处理系统可以以流的方式读取这些日志数据进行分析。

可移植性

  1. 跨平台
    • HDFS是用Java语言编写的,因此它具有很好的可移植性。它可以运行在多种操作系统之上,如Linux、Unix和Windows等。
  2. 硬件兼容
    • HDFS被设计为可以在廉价的商用硬件上运行。它不需要高端的存储设备,能够充分利用普通的服务器硬件来构建大规模的存储集群。这种硬件兼容性使得企业可以以较低的成本构建HDFS集群,同时也可以方便地进行硬件扩展。

HDFS架构

HDFS采用master/slave架构,一个HDFS集群是由一个Namenode和一定数目的Datanodes组成。

其中,Namenode是一个中心服务器,负责管理文件系统的命名空间(namespace)以及客户端对文件的访问;集群中的Datanode一般是一个节点一个,负责管理它所在节点上的存储。

HDFS暴露了文件系统的命名空间,用户能够以文件的形式在上面存储数据。从内部看,一个文件其实被分成一个或多个数据块,这些块存储在一组Datanode上,Namenode执行文件系统的命名空间操作,比如打开、关闭、重命名文件或目录,它也负责确定数据块到具体Datanode节点的映射;Datanode负责处理文件系统客户端的读写请求,在Namenode的统一调度下进行数据块的创建、删除和复制。

  • Client:Client是HDFS的客户端,代表用户与HDFS交互,执行文件的读写操作。向NameNode发送文件读写请求,获取文件的元数据信息;根据NameNode返回的元数据信息,与DataNode进行实际的数据读写操作。
  • NameNode:NameNode是HDFS的主节点,负责管理文件系统的元数据。维护文件系统的目录树结构和文件的元数据,包括文件的权限、位置、大小等信息;管理数据块的分配和副本策略,确保每个数据块有足够数量的副本(默认3份);处理客户端的文件读写请求,提供文件的元数据信息,指导客户端到相应的DataNode进行数据读写;监控DataNode的状态,通过心跳机制检测DataNode是否正常工作,管理集群的健康状态。
  • SecondaryNameNode:SecondaryNameNode是NameNode的辅助节点,主要用于协助NameNode进行元数据的检查点操作,防止元数据丢失。定期从NameNode获取fsimage和edit log文件,合并生成新的fsimage文件,作为元数据的备份;在NameNode故障时,可以提供最近的元数据备份,帮助快速恢复NameNode。
  • DataNode:DataNode是HDFS中的工作节点,负责存储和管理实际的数据块。DataNode存储文件的数据块,这些数据块是文件的物理存储单元;响应客户端的读写请求,从磁盘读取或写入数据块;定期向NameNode发送块报告(Block Report),报告其存储的数据块信息,以便NameNode更新元数据;根据NameNode的指令,创建、删除、复制数据块,以保证数据的冗余性和一致性。

HDFS读写

作为一个文件系统,文件的读和写是最基本的需求,本部分了解客户端是如何与HDFS进行交互的,也就是客户端与HDFS,以及构成HDFS的两类节点(NameNode和DataNode)之间的数据流是怎样的。

  • Block:是最大的单位,文件上传前需要分块,这个块就是Block,一般为128MB。一般不推荐修改块大小,因为块太小时寻址时间占比过高;块太大时Map任务数太少,作业执行速度变慢。
  • Packet:是第二大的单位,它是Client向DataNode,或DataNode之间数据传输的基本单位,默认64KB。
  • Chunk:是最小的单位,它是Client向DataNode,或DataNode之间进行数据校验的基本单位,默认512Byte。因为用作校验,故每个Chunk需要带有4Byte的校验位,所以实际每个Chunk写入Packet的大小为516Byte。

HDFS写数据流程

  1. 客户端通过Distributed FileSystem模块向NameNode请求上传文件,NameNode检查目标文件是否已存在,父目录是否存在;
  2. NameNode返回是否可以上传;
  3. 客户端请求第一个Block上传到哪几个DataNode服务器上;
  4. NameNode依据机架感知策略(默认副本数为3)返回节点,例如DataNode1、DataNode2、DataNode3;
  5. 客户端通过FSDataOutputStream模块请求DataNode1上传数据,DataNode1收到请求会继续调用DataNode2,然后DataNode2调用DataNode3,形成传输管道(Pipeline);
  6. DataNode1、DataNode2、DataNode3逐级应答客户端;
  7. 客户端开始往DataNode1上传第一个Block(先从磁盘读取数据放到一个本地内存缓存),以Packet为单位,DataNode1收到一个Packet就会传给DataNode2,DataNode2传给DataNode3,每个节点接收Packet后会写入本地磁盘并加入应答队列;
  8. 当一个Block传输完成之后,客户端再次请求NameNode上传第二个Block的服务器。(重复执行3-7步);
  9. 直至所有块上传完毕,最终客户端通知NameNode完成上传,NameNode更新元数据(如FsImage和EditLog)。

HDFS读数据流程

  1. 客户端通过DistributedFileSystem向NameNode请求下载文件,NameNode通过查询元数据,找到文件块所在的DataNode地址;
  2. 挑选一台DataNode(就近原则,然后随机)服务器,请求读取数据;
  3. DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以Packet为单位来做校验);
  4. 客户端以Packet为单位接收,先在本地缓存,然后写入目标文件。

DFSOutputStream内部原理

  1. 创建Packet
    Client写数据时,会将字节流数据缓存到内部的缓冲区中,当长度满足一个Chunk大小(512B)时,便会创建一个Packet对象,然后向该Packet对象中写ChunkChecksum校验和数据,以及实际数据块ChunkData,校验和数据是基于实际数据块计算得到的。每次满足一个Chunk大小时,都会向Packet中写上述数据内容,直到达到一个Packet对象大小(64K),就会将该Packet对象放入到dataQueue队列中,等待DataStreamer线程取出并发送到DataNode节点。
  2. 发送Packet
    DataStreamer线程从dataQueue队列中取出Packet对象,放到ackQueue队列中,然后向DataNode节点发送这个Packet对象所对应的数据。
  3. 接收Ack
    发送一个Packet数据包以后,会有一个用来接收Ack的ResponseProcessor线程,如果收到成功的ack,则表示一个Packet发送成功;如果成功,则ResponseProcessor线程会将ackQueue队列中对应的Packet删除。

读写过程中如何保证数据完整性

HDFS在读写过程中采取多种机制来保证数据完整性。在写入数据时,HDFS会将文件分割成多个固定大小的Block,每个Block会被复制多份(通常默认是三份),这些副本会被存储在不同的数据节点上。当数据写入时,NameNode会负责协调块的分配和副本的存储位置。数据首先写入到一个数据节点,然后通过内部的管道机制将数据顺序地复制到其他副本节点。在复制过程中,每个数据节点会对块数据计算校验和,并将校验和信息与数据块一起存储。当读取数据时,客户端会从NameNode获取块的位置信息,然后直接从数据节点读取数据。在读取过程中,客户端会利用校验和来验证数据的完整性。如果发现某个块的校验和与存储时计算的校验和不一致,说明该块数据可能损坏。此时,客户端可以从其他副本节点读取该块数据,因为每个块都有多个副本存储在不同的节点上。通过这种数据块的多副本存储、校验和验证以及副本间的冗余机制,HDFS能够在分布式环境下有效保证数据的完整性,即使部分数据节点出现故障或者数据在传输过程中发生损坏,也能够通过副本恢复和校验来确保数据的准确性和完整性。

副本放置策略

HDFS的副本放置策略是其保证数据可靠性和性能的关键机制之一,以下是其主要策略:

  1. 默认副本数量
    1. 默认值:HDFS默认将每个数据块复制为3个副本,这种多副本机制可以有效提高数据的可靠性和容错能力。
    2. 可配置性:用户可以根据实际需求调整副本数量。例如,在对数据可靠性要求极高的场景中,可以增加副本数量;在对存储空间要求较高的场景中,可以适当减少副本数量。
  2. 副本放置规则
    1. 副本1:第一份副本通常放置在上传数据的节点所在的机架上,这样可以减少数据传输的延迟,提高写入性能;
    2. 副本2:第二份副本放置在与第一副本不同的机架上,这种跨机架放置策略可以有效防止整个机架故障导致数据丢失;
    3. 副本3:第三份副本放置在与第二副本相同的机架上,但与第二副本不同的节点上,这种放置策略可以在保证数据可靠性的同时,避免跨机架通信的高成本;
    4. 副本N:如果配置了更多的副本,HDFS会继续按照上述规则进行放置,尽量分散到不同的机架和节点上,以进一步提高数据的可靠性。

HDFS常用命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
hdfs dfs -ls	# 列出当前目录下的文件和子目录信息
hdfs dfs -ls / # 查看根目录下的文件和子目录
hdfs dfs -ls -l / # 列出根目录下的文件和子目录,并显示详细信息(权限、大小、修改时间等)
hdfs dfs -ls -lh / # 以更易读的格式显示文件大小(如KB、MB、GB)
hdfs dfs -ls -R / # 递归显示目录和子目录中的所有文件
hdfs dfs -ls -h # 以易读格式显示文件大小
hdfs dfs -du -h # 查看目录或文件的磁盘空间占用情况
hdfs dfs -du -h / # 查看根目录下各目录的磁盘占用情况

hdfs dfs -mkdir /user/hadoop # 在HDFS的/user目录下创建hadoop目录
hdfs dfs -mkdir -p /user/hadoop/dir1/dir2 # 递归创建目录,即使父目录不存在也会创建

hdfs dfs -rm /user/hadoop/file.txt # 删除指定文件
hdfs dfs -rm -r /user/hadoop/dir1 # 递归删除指定目录及其内容
hdfs dfs -rm -r -f /user/hadoop/dir1 # 强制递归删除指定目录及其内容,不会提示确认

hdfs dfs -cat /user/hadoop/file.txt # 查看文件的内容
hdfs dfs -tail /user/hadoop/file.txt # 查看文件末尾的内容,默认显示最后4KB
hdfs dfs -text /user/hadoop/file.txt # 查看文件内容,适用于文本文件

hdfs dfs -put localfile /user/hadoop/ # 将本地文件localfile上传到HDFS的/user/hadoop目录下
hdfs dfs -put -f localfile /user/hadoop/ # 强制覆盖目标目录下同名文件
hdfs dfs -copyFromLocal localfile /user/hadoop/ # 将本地文件上传到HDFS,功能与-put相同

hdfs dfs -get /user/hadoop/file.txt localdir/ # 将HDFS中的file.txt下载到本地目录localdir
hdfs dfs -getmerge /user/hadoop/output localfile # 将HDFS目录下的多个文件合并到本地一个文件中

hdfs dfsadmin -report # 查看HDFS的文件系统状态,包括数据块总数、已用空间、剩余空间等
hdfs dfs -df -h # 以易读格式显示HDFS文件系统的磁盘使用情况

hdfs dfs -chmod 755 /user/hadoop/file.txt # 更改文件的权限为755
hdfs dfs -chown hadoop /user/hadoop/file.txt # 更改文件的所有者为hadoop
hdfs dfs -chgrp hadoopgroup /user/hadoop/file.txt # 更改文件的所属组为hadoopgroup

hdfs dfs -stat /user/hadoop/file.txt # 查看文件的基本信息(如大小、权限、最后修改时间等)
hdfs dfs -hadoop fsck / -files -blocks # 检查文件系统的完整性并列出文件和块信息

YARN

YARN基本架构

YARN是Hadoop中用于进行集群资源管理的重要组件,主要由Resource Manager(RM)、Node Manager(NM)、Application Master(AM)和Container四个部分组成。

Resource Manager是整个Hadoop集群中资源的最高管理者。客户端将MapReduce任务提交给Resource Manager,Resource Manager不断地处理客户端提交的请求。同时,Resource Manager还在时刻着监控Hadoop集群所有Node Manager节点的状态。客户端将MapReduce任务提交给Resource Manager后,首先进行资源的分配和调度,然后Resource Manager会启动Application Master运行这些MapReduce任务。Application Master上运行着MapReduce任务,并且每隔一段时间向Resource Manager发送MapReduce任务运行的状态信息,Resource Manager负责收集并监控Application Master的状态。Resource Manager的主要作用如下:

  1. 处理客户端请求
  2. 监控Node Manager
  3. 启动或监控Application Master
  4. 资源的分配和调度

Node Manager是单个节点上资源的最高管理者。但是Node Manager在分配和管理资源之前,首先要向Resource Manager申请资源,同时还要每隔一段时间向Resource Manager上报资源使用情况。当Node Manager收到来自Application Master的资源申请时,就会向Application Master分配和调度所需资源。Node Manager的主要作用如下:

  1. 管理所在节点上的资源
  2. 处理来自Resource Manager的命令
  3. 处理来自Application Master的命令

Application Master主要负责为每一个任务进行资源的申请、调度和分配。向Resource Manager申请资源,与Node Manager进行交互,监控并汇报任务的运行状态、申请的资源的使用情况和作业的进度等信息。同时,跟踪任务状态和进度,定时向Resource Manager发送心跳消息,上报资源的使用情况和应用的进度信息。此外,Application Master还负责本作业内的任务的容错。Application Master的主要作用如下:

  1. 负责数据的切分
  2. 为应用程序申请资源并分配给其包含的任务
  3. 任务的监控与容错

Container是Yarn中资源的抽象,它封装了某个Node Manager节点上多维度资源,例如CPU、内存、磁盘、网络等。

YARN工作机制

对于一个MapReduce程序,用户首先将该程序的jar提交到客户端所在的节点,该行为由Yarn的上游发起,接着流程交给Yarn进行工作处理:

  1. 申请Application:Resource Manager的主要作用之一就是负责处理客户端发来的请求。当用户将一个MapReduce程序的jar提交到客户端所在的节点后,位于该节点上的YarnRunner会向整个Hadoop集群中资源的最高管理者Resource Manager发送一次请求,申请一个Application。
  2. 分配资源提交路径:当Resource Manager接收到客户端发送来的Application申请后,Resource Manager会为客户端分配一个Application资源提交的路径,以及Application编号application_id。该资源提交路径实质为HDFS分布式文件系统的目录,也就是说Yarn会利用HDFS为MapReduce程序的运行提供存储资源。
  3. 提交运行资源:当Resource Manager会为客户端分配一个Application资源提交的路径后,客户端会向该路径提交任务运行所需要的所有资源,例如job.split、job.xml和MapReduce程序的jar包。其中,job.split表示切片的规划,job.xml表示job运行时的配置文件。
  4. 申请运行MRAppmaster:当客户端会向Application资源提交路径提交任务运行所需要的所有资源后,客户端会再次向Resource Manager发送一次申请,申请运行MRAppmaster。
  5. 初始化Task:当Resource Manager接收到客户端发送来的运行MRAppmaster申请后,Resource Manager会将该申请初始化为一个Task,并将该Task放入FIFO调度队列中,等待任务调度。
  6. 领取Task任务:当Task放入FIFO调度队列后,等待任务调度。Resource Manager除了能够处理来自客户端的请求外,还能够监控Node Manager的资源使用情况和状态。当Resource Manager监控到某个Node Manager正处于空闲状态并资源充足,Resource Manager会从FIFO调度队列的头部拉取Task任务,然后分配给Node Manager。此时,单个节点上资源的最高管理者Node Manager就会从FIFO调度队列的头部领取Task任务,然后处理该Task任务。
  7. 创建容器Container:当Node Manager就会从FIFO调度队列的头部领取Task任务后,Node Manager会将运行该Task任务所需的CPU、RAM等资源和MRAppmaster封装到一个Container中。
  8. 下载任务资源到本地:Container中的MRAppmaster实质上就是Application Master。Application Master主要负责为每一个任务进行资源的申请、调度和分配。当Application Master去运行任务时,首先要去下载任务的切片信息、任务运行时的配置文件以及任务运行的MapReduce程序jar包。
  9. 申请运行MapTask容器:当MRAppmaster将任务资源下载完毕后,MRAppmaster会向Resource Manager申请运行MapTask容器。Resource Manager接收到客户端发送来的运行MapTask容器申请后,会将该申请初始化为一个MapTask,并将该Task放入FIFO调度队列中。当Resource Manager监控到某个Node Manager正处于空闲状态并资源充足,Resource Manager会从FIFO调度队列的头部拉取MapTask任务,然后分配给Node Manager。
  10. 领取MapTask任务,创建MapTask容器:当Node Manager就会从FIFO调度队列的头部领取MapTask任务后,Node Manager会将运行该MapTask任务所需的CPU、RAM等资源和jar包封装到一个Container中。
  11. 发送程序启动脚本:当Node Manager将运行MapTask任务所需的CPU、RAM等资源和jar包封装到一个Container后,MRAppmaster会向Node Manager发送程序启动脚本,启动MapTask。
  12. 申请运行ReduceTask容器:当Node Manager运行完MapTask任务后,MRAppmaster会向Resource Manager申请资源来运行ReduceTask容器。
  13. 获取分区数据:当MRAppmaster向Resource Manager申请资源来运行ReduceTask容器后,ReduceTask容器会向MapTask容器获取相应分区的数据。
  14. 注销MRAppmaster:当ReduceTask容器向MapTask容器获取相应分区的数据并运行完ReduceTask后,MRAppmaster向Resource Manager发出注销请求。Resource Manager接收到注销请求后,会立马注销MRAppmaster,并释放相关资源。

资源调度器

在集群软硬件配置都确定的情况下,集群性能将取决于ResourceManager的性能,更准确的说是取决于资源管理器ResourceManager中的调度器Scheduler给不同计算作业任务job分配计算容器Container的规则机制,因此调度器Scheduler组件的任务资源调度机制将很大程度地影响着集群整体性能效率的发挥。

目前,Hadoop集群提供的作业调度器有先进先出调度器(FIFO Scheduler)、容量调度器(Capacity Scheduler)和公平调度(Fair Scheduler)。

FIFO Scheduler

先进先出调度器(FIFO Scheduler)的工作机制相对简单,就是将计算作业任务job按照其提交的先后顺序加入到容器Container的使用队列中,jobs根据其加入队列的先后顺序依次执行。具体在资源分配时,优先给队列头部的job分配资源,在job所需资源得到满足时就会开始执行该job。待前面任务job执行完毕后,就会继续为后面的job分配资源。如下图所示,在job1提交到集群后,先进先出调度器将优先将全部资源分配给job1。虽然在job1执行期间job2也被提交到集群,但由于job1仍在执行,所以先进先出调度器只有在job1执行完毕后才将全部资源分配给job2,继而执行job2。

Capacity Scheduler

容量调度器(CapacityScheduler)的目标是实现多租户安全地共享一个大型集群,同时最大限度地提高集群的吞吐量和利用率,Hadoop3.2.2默认的资源调度器就是容量调度器。容量调度器主要是通过引入“队列”概念来实现资源的有效分配和管理,其中可以将“队列”理解为多个容器Container组成的集合。通过将大集群的整体资源划分为多个队列,不同队列拥有不同数量的资源容器Container,队列间的计算作业相互隔离,每个队列以既定原则(如FIFO)调度其内部的计算作业任务。

容量调度器为了提高集群资源的利用率,允许队列访问其他队列未使用的任何过剩资源容量。这是以经济有效的方式为组织提供了弹性,简单理解就是队列的资源容量可以弹性变化,称为“弹性队列”。但这种弹性共享并不是无限制的,在日常开发中一般会增加配置项,用来限制每个队列最大的资源占比。

如下图所示,容量调度器将集群容量划分为队列A和B,队列A占用的资源比例较大,队列B占用的资源比例较小。队列A开始执行job1时需要较多资源,队列B没有执行任务而出现资源空闲,此时队列B可以将其占有的资源共享给队列A,加快job1的执行速度。当job2被提交后,队列B会从队列A处要回其原来本应拥有的资源,用于执行job2。虽然job1需要占用较多资源,但由于job1和job2属于不同的队列,因此job1的执行不会影响job2的开始。当job2执行完毕后,队列B又可以将其占有的资源共享给队列A。从图上观察发现队列A并不会占用队列B的全部资源,因为这集群会限制队列A占用资源的比例。

Fair Scheduler

公平调度器(FairScheduler)是由Facebook公司开发的实现多用户共享集群资源的调度机制,允许YARN计算作业任务在大型集群中公平地共享集群资源。默认情况下,公平调度器只基于集群内存资源进行调度,但用户可以自行配置为基于集群内存和CPU的资源调度。当只有单个计算作业任务运行时,该任务使用整个集群的资源;当有新的计算作业任务提交到集群时,集群将释放部分资源分配给新任务,最终实现每个任务从集群中获得大致相同数量的资源;如果有更多新的计算作业任务提交到集群时,也会依次类推的实现每个任务从集群中获得大致相同数量的资源。

如下图所示,job1提交到集群时,由于集群没有执行其他job,job1将占用集群的全部资源来执行任务。在job1执行期间job2被提交到集群,job1会释放部分资源用于执行job2,此时job1和job2各占用集群资源一半。在job2执行完毕后集群又会将全部资源用于执行job1。在公平调度器的调度下,可以让部分小任务在合理的时间内完成,也不会使大任务长期无法完成。

如下图所示,job1提交到集群时,由于集群没有执行其他job,job1占用集群的全部资源来执行任务。在job1执行期间job2被提交到队列B,队列A就会向队列B归还原应占用的资源,用于执行job2,此时job1和job2都占用各自所在队列的全部资源。在job2执行期间job3被提交到队列B,job2会释放部分资源用于执行job3,此时job2和job3占用所在队列资源的一半。在job2执行完毕后,job3就会占用队列B的全部资源。

MapReduce

MapReduce基本介绍

MapReduce是一种分布式并行计算编程模型,由Google在2004年提出,用于大规模数据集(TB/PB级)的批处理。其核心思想是“分而治之”,将计算任务分为Map和Reduce两个阶段:

  • Map阶段,将输入数据分割为多个分片(Split),由多个Map任务并行处理,生成中间键值对(如<单词, 1>);
  • Reduce阶段,对中间结果按键分组,由Reduce任务汇总(如统计单词频次)。

MapReduce工作流程

MapReduce工作的流程如下:

  1. 输入分片(Input Split)

当用户提交一个MapReduce作业时,首先需要指定输入文件。MapReduce框架会将输入文件分割成一个个小的数据块,称为“输入分片”(Input Split)。每个输入分片的大小通常与HDFS中的块大小相等(默认128MB)。例如,如果有一个1GB的文件,HDFS块大小为128MB,那么这个文件会被分割成8个输入分片,这样做的目的是为了将大规模的数据分解成多个小任务,便于在不同的节点上并行处理。

  1. Map阶段

每个输入分片会被分配给一个Map任务(Map Task)。Map任务会读取输入分片中的数据,对数据进行处理,并产生一系列的中间结果。这些中间结果通常以键值对<key, value>的形式输出。例如,在一个词频统计的场景中,Map任务会读取输入分片中的文本行,将每一行拆分成单词,然后输出键值对,其中键是单词,值是1(表示该单词出现了一次)。如果输入分片中有”hello world”这行文本,那么Map任务会输出两个键值对:<”hello”, 1>和<”world”, 1>。

  1. Shuffle阶段

这是MapReduce中非常关键的一步。当Map任务输出中间结果后,这些中间结果会被分配到不同的Reduce任务中,分配的依据是中间结果的键(key)。MapReduce框架会根据键的范围或哈希值等规则,将具有相同键的中间结果归并到一起,并发送给同一个Reduce任务。例如,假设键是单词,那么所有键为”hello”的中间结果都会被发送到同一个Reduce任务。在这个过程中,数据会从Map节点传输到Reduce节点,这个过程称为Shuffle。Shuffle阶段是MapReduce性能的关键因素之一,因为数据的传输可能会占用大量的网络带宽。

  1. Reduce阶段

Reduce任务会接收到分配给它的所有中间结果。对于每个键,Reduce任务会将所有与该键相关的值合并在一起,产生最终的输出结果。继续以词频统计为例,如果Reduce任务接收到键为“hello”的中间结果<”hello”, 1>、<”hello”, 1>、<”hello”, 1>,那么它会将这些值合并,输出最终结果<”hello”, 3>,表示单词”hello”总共出现了3次。最终,Reduce任务将结果写入到输出文件中。每个Reduce任务对应一个输出文件,所有输出文件共同构成了MapReduce作业的最终结果。

MapReduce编程模型中,Splitting和Shuffing操作都是由框架实现的,需要自己编程实现的只有Mapping和Reducing,这也是MapReduce这个称呼的来源。

MapReduce案例实践

继承Mapper类和Reducer类,并重写map方法和reduce方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCount {

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split("\\W+");
for (String w : words) {
if (!w.isEmpty()) {
word.set(w.toLowerCase());
context.write(word, one);
}
}
}
}

public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

参考

Hadoop实战:使用Docker Compose部署Hadoop集群