前言
部分内容摘自尚硅谷、黑马等培训资料
1. 计数器概述
在执行 MapReduce 程序的时候,控制台输出信息中通常有下面所示片段内容:
可以发现,输出信息中的核心词是counters
,中文叫做计数器
。在进行 MapReduce 运算过程中,许多时候,用户希望了解程序的运行情况。Hadoop内置的计数器功能收集作业的主要统计信息,可以帮助用户理解程序的运行情况,辅助用户诊断故障
。
这些记录了该程序运行过程的的一些信息的计数,如Map input records=2
,表示 Map 有 2 条记录。可以看出来这些内置计数器可以被分为若干个组,即对于大多数的计数器来说,Hadoop 使用的组件分为若干类。
2. MapReduce内置计数器
Hadoop 为每个 MapReduce 作业维护一些内置的计数器,这些计数器报告各种指标,例如和 MapReduce 程序执行中每个阶段输入输出的数据量相关的计数器,可以帮助用户进行判断程序逻辑是否生效、正确。
Hadoop 内置计数器根据功能进行分组。每个组包括若干个不同的计数器,分别是:MapReduce 任务计数器(Map-Reduce Framework
)、文件系统计数器(File System Counters
)、作业计数器(Job Counters
)、输入文件任务计数器(File Input Format Counters
)、输出文件计数器(File Output Format Counters
)。
需要注意的是,内置的计数器都是 MapReduce 程序中全局的计数器
,跟 MapReduce 分布式运算没有关系,不是所谓的每个局部的统计信息。
2.1 Map-Reduce Framework Counters
计数器名称 |
说明 |
MAP_INPUT_RECORDS |
所有mapper已处理的输入记录数 |
MAP_OUTPUT_RECORDS |
所有mapper产生的输出记录数 |
MAP_OUTPUT_BYTES |
所有mapper产生的未经压缩的输出数据的字节数 |
MAP_OUTPUT_MATERIALIZED_BYTES |
mapper输出后确实写到磁盘上字节数 |
COMBINE_INPUT_RECORDS |
所有combiner(如果有)已处理的输入记录数 |
COMBINE_OUTPUT_RECORDS |
所有combiner(如果有)已产生的输出记录数 |
REDUCE_INPUT_GROUPS |
所有reducer已处理分组的个数 |
REDUCE_INPUT_RECORDS |
所有reducer已经处理的输入记录的个数。每当某个reducer的迭代器读一个值时,该计数器的值增加 |
REDUCE_OUTPUT_RECORDS |
所有reducer输出记录数 |
REDUCE_SHUFFLE_BYTES |
Shuffle时复制到reducer的字节数 |
SPILLED_RECORDS |
所有map和reduce任务溢出到磁盘的记录数 |
CPU_MILLISECONDS |
一个任务的总CPU时间,以毫秒为单位,可由/proc/cpuinfo获取 |
PHYSICAL_MEMORY_BYTES |
一个任务所用的物理内存,以字节数为单位,可由/proc/meminfo获取 |
VIRTUAL_MEMORY_BYTES |
一个任务所用虚拟内存的字节数,由/proc/meminfo获取 |
2.2 File System Counters Counters
文件系统的计数器会针对不同的文件系统使用情况进行统计,比如 HDFS、本地文件系统:
计数器名称 |
说明 |
BYTES_READ |
程序从文件系统中读取的字节数 |
BYTES_WRITTEN |
程序往文件系统中写入的字节数 |
READ_OPS |
文件系统中进行的读操作的数量(例如,open操作,filestatus操作) |
LARGE_READ_OPS |
文件系统中进行的大规模读操作的数量 |
WRITE_OPS |
文件系统中进行的写操作的数量(例如,create操作,append操作) |
2.3 Job Counters
计数器名称 |
说明 |
Launched map tasks |
启动的map任务数,包括以“推测执行”方式启动的任务 |
Launched reduce tasks |
启动的reduce任务数,包括以“推测执行”方式启动的任务 |
Data-local map tasks |
与输人数据在同一节点上的map任务数 |
Total time spent by all maps in occupied slots (ms) |
所有map任务在占用的插槽中花费的总时间(毫秒) |
Total time spent by all reduces in occupied slots (ms) |
所有reduce任务在占用的插槽中花费的总时间(毫秒) |
Total time spent by all map tasks (ms) |
所有map task花费的时间 |
Total time spent by all reduce tasks (ms) |
所有reduce task花费的时间 |
计数器名称 |
说明 |
读取的字节数(BYTES_READ) |
由map任务通过FilelnputFormat读取的字节数 |
写的字节数(BYTES_WRITTEN) |
由map任务(针对仅含map的作业)或者reduce任务通过FileOutputFormat写的字节数 |
3. MapReduce自定义计数器
虽然 Hadoop 内置的计数器比较全面,给作业运行过程的监控带了方便,但是对于一些业务中的特定要求(统计过程中对某种情况发生进行计数统计)MapReduce 还是提供了用户编写自定义计数器的方法。最重要的是,计数器是全局的统计
,避免了用户自己维护全局变量的不利性。
自定义计数器的使用分为两步:
首先通过context.getCounter
方法获取一个全局计数器,创建的时候需要指定计数器所属的组名和计数器的名字:
然后在程序中需要使用计数器的地方,调用 counter 提供的方法即可,比如 +1 操作:
这样在执行程序的时候,在控制台输出的信息上就有自定义计数器组和计数器统计信息。
4. 案例:MapReduce自定义计数器
4.1 需求
针对一批文件进行词频统计,不知何种原因,在任意文件的任意地方都有可能插入单词 “apple”,现要求使用计数器统计出数据中 apple 出现的次数,便于用户执行程序时判断。
4.2 代码实现
4.2.1 Mapper类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class WordCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { Counter counter = context.getCounter("itcast_counters", "apple Counter");
String[] words = value.toString().split("\\s+"); for (String word : words) { if("apple".equals(word)){ counter.increment(1); } context.write(new Text(word),new LongWritable(1)); } } }
|
4.2.2 Reduce类
1 2 3 4 5 6 7 8 9 10
| public class WordCountReducer extends Reducer<Text, LongWritable,Text,LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count = 0; for (LongWritable value : values) { count +=value.get(); } context.write(key,new LongWritable(count)); } }
|
4.2.3 运行主类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public class WordCountDriver extends Configured implements Tool {
@Override public int run(String[] args) throws Exception { Job job = Job.getInstance(getConf(), WordCountDriver.class.getSimpleName()); job.setJarByClass(this.getClass());
job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); int status = ToolRunner.run(conf, new WordCountDriver(), args); System.exit(status); } }
|
4.2.4 执行结果