最近在进行大规模数据处理时,由于集群资源紧张,需要控制 Map 的数量。本文将从底层代码入手,介绍 MapReduce 如何决定 Map 的数量。
Hive 是基于 Hadoop 的数据仓库工具,可以将结构化的数据文件映射为数据库表,并提供 SQL 查询功能。它可以将 SQL 语句转换为 MapReduce 任务执行。那么,当运行一个 HQL 语句时,Map 的数量是如何计算出来的?又有哪些方法可以调整 Map 的数量呢?
本文测试所用的集群版本为 CDH 4.3.0。
在 CDH 4.3.0 版本的 Hive 中,默认的 Input Format 是 CombineHiveInputFormat
。如果你使用的是 IDH 的 Hive,则默认值为 HiveInputFormat
。
CombineHiveInputFormat
继承自 HiveInputFormat
,而后者实现了 org.apache.hadoop.mapred.InputFormat
接口。这个接口的主要功能是描述输入数据的格式,并提供以下两个关键功能:
该接口的具体定义如下:
java
public interface InputFormat {
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException;
}
其中,getSplits()
方法主要用于数据切分。每一份数据由 Split 表示,但实际在磁盘上并不会将数据切分成物理分片,而是以 Block 为单位存储在 HDFS 上。InputSplit
只记录了 Map 任务所需处理的数据元数据信息,例如起始位置、长度和所在节点。
数据切分算法主要用于确定 Split 的数量及每个 Split 的数据段。FileInputFormat 是一个抽象类,它为各种 InputFormat 提供了一致的 getSplits()
方法。该方法的核心是文件切分算法和主机选择算法。
文件切分算法主要考虑以下参数:
- goalSize
:由 totalSize / numSplits
计算得出,目标 Split 大小。
- minSize
:由配置参数 mapred.min.split.size
决定的最小 Split 大小。
- blockSize
:HDFS 中的文件 Block 大小。
计算公式如下:
java
splitSize = max{minSize, min{goalSize, blockSize}}
主机选择算法主要用于确定每个 Split 的元数据信息。具体步骤如下: 1. 按照机架包含的数据量对机架排序。 2. 在机架内按节点包含的数据量排序。 3. 选取前 N 个节点作为 Split 的主机列表。
FileInputFormat 使用一种启发式的主机选择算法,优先选择包含数据最多的节点,以提高数据本地性。
如果 hive.input.format
设置为 HiveInputFormat
,则参数如下:
- mapred.min.split.size
:1
- mapred.map.tasks
:2
- dfs.blocksize
:128M
假设有一个 200M 的文件,按 HiveInputFormat 的切分算法:
1. 文件总大小为 200M,goalSize
为 100M,minSize
为 1,因此 splitSize
为 100M。
2. 文件大小大于 splitSize
,因此第一个 Split 为 100M。
3. 剩余文件大小为 100M,小于 128M,因此第二个 Split 也为 100M。
如果 hive.input.format
设置为 CombineHiveInputFormat
,则参数如下:
- mapred.min.split.size
:1
- mapred.max.split.size
:64M
- mapred.min.split.size.per.rack
:1
- mapred.min.split.size.per.node
:1
- dfs.blocksize
:128M
假设有一个 200M 的文件,按 CombineHiveInputFormat 的切分算法: 1. 文件大小为 200M,Block 大小为 128M,最大 Split 大小为 64M。 2. 第一个 Split 为 128M。 3. 剩余 72M,小于最大 Split 大小,因此第二个 Split 为 72M。
通过以上方法,可以根据不同的配置和数据情况灵活调整 Map 的数量,以优化集群性能。