Hadoop日记Day17---计数器、map规约、分区学习
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了Hadoop日记Day17---计数器、map规约、分区学习,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含30336字,纯文字阅读大概需要44分钟。
内容图文
![Hadoop日记Day17---计数器、map规约、分区学习](/upload/InfoBanner/zyjiaocheng/1150/3bc22fb8eabc4779909c1b0788cd1151.jpg)
一、Hadoop计数器
1.1 什么是Hadoop计数器
Haoop是处理大数据的,不适合处理小数据,有些大数据问题是小数据程序是处理不了的,他是一个高延迟的任务,有时处理一个大数据需要花费好几个小时这都是正常的。下面我们说一下Hadoop计数器,Hadoop计数器就相当于我们的日志,而日志可以让我们查看程序运行时的很多状态,而计数器也有这方面的作用。那么就研究一下Hadoop自身的计数器。计数器的程序如代码1.1所示,下面代码还是以内容为“hello you;hell0 me”的单词统计为例。
1 package counter; 2 3 import java.net.URI; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.FileSystem; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Counter; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 19 20 public class WordCountApp { 21 static final String INPUT_PATH = "hdfs://hadoop:9000/input"; 22staticfinal String OUT_PATH = "hdfs://hadoop:9000/output"; 2324publicstaticvoid main(String[] args) throws Exception { 2526 Configuration conf = new Configuration(); 2728final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); 29final Path outPath = new Path(OUT_PATH); 3031if(fileSystem.exists(outPath)){ 32 fileSystem.delete(outPath, true); 33 } 34final Job job = new Job(conf , WordCountApp.class.getSimpleName()); 3536//1.1指定读取的文件位于哪里37 FileInputFormat.setInputPaths(job, INPUT_PATH); 38 job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对 3940//1.2 指定自定义的map类41 job.setMapperClass(MyMapper.class); 42 job.setMapOutputKeyClass(Text.class);//map输出的<k,v>类型。43 job.setMapOutputValueClass(LongWritable.class);//如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略 4445//1.3 分区46 job.setPartitionerClass(HashPartitioner.class); 47 job.setNumReduceTasks(1);//有一个reduce任务运行 4849//2.2 指定自定义reduce类50 job.setReducerClass(MyReducer.class); 5152 job.setOutputKeyClass(Text.class);//指定reduce的输出类型53 job.setOutputValueClass(LongWritable.class); 5455//2.3 指定写出到哪里56 FileOutputFormat.setOutputPath(job, outPath); 57 job.setOutputFormatClass(TextOutputFormat.class);//指定输出文件的格式化类5859 job.waitForCompletion(true);//把job提交给JobTracker运行60 } 6162/**63 * KEYIN 即k1 表示行的偏移量 64 * VALUEIN 即v1 表示行文本内容 65 * KEYOUT 即k2 表示行中出现的单词 66 * VALUEOUT 即v2 表示行中出现的单词的次数,固定值1 67*/68staticclass MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ 69protectedvoid map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException { 70final String line = v1.toString(); 71final String[] splited = line.split("\t"); 72for (String word : splited) { 73 context.write(new Text(word), new LongWritable(1)); 74 } 75 }; 76 } 7778/**79 * KEYIN 即k2 表示行中出现的单词 80 * VALUEIN 即v2 表示行中出现的单词的次数 81 * KEYOUT 即k3 表示文本中出现的不同单词 82 * VALUEOUT 即v3 表示文本中出现的不同单词的总次数 83 * 84*/85staticclass MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ 86protectedvoid reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException { 87long times = 0L; 88for (LongWritable count : v2s) { 89 times += count.get(); 90 } 91 ctx.write(k2, new LongWritable(times)); 92 }; 93 } 9495 }
代码 1.1
运行结果如下图1.1所示。
Counters: 19//Counter表示计数器,19表示有19个计数器(下面一共4计数器组) File Output Format Counters //文件输出格式化计数器组 Bytes Written=19 //reduce输出到hdfs的字节数,一共19个字节 FileSystemCounters//文件系统计数器组 FILE_BYTES_READ=481 HDFS_BYTES_READ=38 FILE_BYTES_WRITTEN=81316 HDFS_BYTES_WRITTEN=19 File Input Format Counters //文件输入格式化计数器组 Bytes Read=19 //map从hdfs读取的字节数 Map-Reduce Framework//MapReduce框架 Map output materialized bytes=49 Map inputrecords=2 //map读入的记录行数,读取两行记录,”hello you”,”hello me” Reduce shuffle bytes=0//规约分区的字节数 Spilled Records=8 Map output bytes=35 Total committed heap usage (bytes)=266469376 SPLIT_RAW_BYTES=105 Combineinput records=0//合并输入的记录数Reduce input records=4 //reduce从map端接收的记录行数Reduce input groups=3 //reduce函数接收的key数量,即归并后的k2数量Combineoutput records=0//合并输出的记录数Reduce output records=3 //reduce输出的记录行数。<helllo,{1,1}>,<you,{1}>,<me,{1}>Map output records=4 //map输出的记录行数,输出4行记录
图 1.1
通过上面我们对计数器的分析,可以知道,我们可以通过计数器来分析MapReduece程序的运行状态。
1.2 自定义计数器
通过上面的分析,我们了解了计数器的作用,那么我们可以自定义一个计数器,来实现我们自己想要的功能。如定义一个记录敏感词的计数器,记录敏感词在一行所出现的次数,如代码2.1所示。我们处理文件内容为“hello you”,“hello me”。
1 Counters: 19//Counter表示计数器,19表示有19个计数器(下面一共4计数器组) 2 File Output Format Counters //文件输出格式化计数器组 3 Bytes Written=19 //reduce输出到hdfs的字节数,一共19个字节 4 FileSystemCounters//文件系统计数器组 5 FILE_BYTES_READ=481 6 HDFS_BYTES_READ=38 7 FILE_BYTES_WRITTEN=81316 8 HDFS_BYTES_WRITTEN=19 9 File Input Format Counters //文件输入格式化计数器组10 Bytes Read=19 //map从hdfs读取的字节数11 Map-Reduce Framework//MapReduce框架12 Map output materialized bytes=49 13 Map input records=2 //map读入的记录行数,读取两行记录,”hello you”,”hello me”14 Reduce shuffle bytes=0//规约分区的字节数15 Spilled Records=8 16 Map output bytes=35 17 Total committed heap usage (bytes)=266469376 18 SPLIT_RAW_BYTES=105 19 Combine input records=0//合并输入的记录数20 Reduce input records=4 //reduce从map端接收的记录行数21 Reduce input groups=3 //reduce函数接收的key数量,即归并后的k2数量22 Combine output records=0//合并输出的记录数23 Reduce output records=3 //reduce输出的记录行数。<helllo,{1,1}>,<you,{1}>,<me,{1}>24 Map output records=4 //map输出的记录行数,输出4行记录
代码2.1
运行结果如下图2.1所示。
Counters: 20 Sensitive Words hello=2 File Output Format Counters Bytes Written=21 FileSystemCounters FILE_BYTES_READ=359 HDFS_BYTES_READ=42 FILE_BYTES_WRITTEN=129080 HDFS_BYTES_WRITTEN=21 File Input Format Counters Bytes Read=21 Map-Reduce Framework Map output materialized bytes=67 Map input records=2 Reduce shuffle bytes=0 Spilled Records=8 Map output bytes=53 Total committed heap usage (bytes)=391774208 SPLIT_RAW_BYTES=95 Combine input records=0 Reduce input records=4 Reduce input groups=3 Combine output records=0 Reduce output records=3 Map output records=4
图 2.1
二、Combiners编程
2.1 什么是Combiners
从上面程序运行的结果我们可以发现,在Map-Reduce Framework即MapReduce框架的输出中,Combine input records这个字段为零, 那么combine怎么使用呢?其实这是MapReduce程序中Mapper任务中第五步,这是可选的一步,使用方法非常简单,以上面单词统计为例,只需添加下面一行代码即可,如下: job.setCombinerClass(MyReducer.class);
combine操作是一个可选的操作,使用时需要我们自己设定,我们用MyReducer类来设置Combiners,表示Combiners与Reduce功能相同,带有combine功能的MapRduce程序如代码3.1所示。
1 package combine; 2 3 import java.net.URI; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.FileSystem; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.Partitioner; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 import org.apache.jasper.tagplugins.jstl.core.If; 18 19 public class WordCountApp2 { 20 static final String INPUT_PATH = "hdfs://hadoop:9000/hello"; 21staticfinal String OUT_PATH = "hdfs://hadoop:9000/out"; 22 23publicstaticvoid main(String[] args) throws Exception { 24 Configuration conf = new Configuration(); 25final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); 26final Path outPath = new Path(OUT_PATH); 27if(fileSystem.exists(outPath)){ 28 fileSystem.delete(outPath, true); 29 } 30final Job job = new Job(conf , WordCountApp2.class.getSimpleName()); 31 job.setJarByClass(WordCountApp2.class); 32 33//1.1指定读取的文件位于哪里 34 FileInputFormat.setInputPaths(job, INPUT_PATH); 35 job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对 36 37//1.2 指定自定义的map类 38 job.setMapperClass(MyMapper.class); 39 job.setMapOutputKeyClass(Text.class);//map输出的<k,v>类型。 40 job.setMapOutputValueClass(LongWritable.class);//如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略 41 42//1.3 分区 43 job.setPartitionerClass(MyPartitioner.class); 44//有几个reduce任务运行 45 job.setNumReduceTasks(2); 46 47//1.4 TODO 排序、分组 48 49//1.5 规约 50 job.setCombinerClass(MyCombiner.class); 51 52//2.2 指定自定义reduce类 53 job.setReducerClass(MyReducer.class); 54//指定reduce的输出类型 55 job.setOutputKeyClass(Text.class); 56 job.setOutputValueClass(LongWritable.class); 57 58//2.3 指定写出到哪里 59 FileOutputFormat.setOutputPath(job, outPath); 60//指定输出文件的格式化类 61//job.setOutputFormatClass(TextOutputFormat.class); 62 63//把job提交给JobTracker运行 64 job.waitForCompletion(true); 65 } 66 67staticclass MyPartitioner extends Partitioner<Text, LongWritable>{ 68 @Override 69publicint getPartition(Text key, LongWritable value, int numReduceTasks) { 70return (key.toString().equals("hello"))?0:1; 71 } 72 } 73 74/** 75 * KEYIN 即k1 表示行的偏移量 76 * VALUEIN 即v1 表示行文本内容 77 * KEYOUT 即k2 表示行中出现的单词 78 * VALUEOUT 即v2 表示行中出现的单词的次数,固定值1 79*/ 80staticclass MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ 81protectedvoid map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException { 82final String[] splited = v1.toString().split("\t"); 83for (String word : splited) { 84 context.write(new Text(word), new LongWritable(1)); 85 System.out.println("Mapper输出<"+word+","+1+">"); 86 } 87 }; 88 } 89 90/** 91 * KEYIN 即k2 表示行中出现的单词 92 * VALUEIN 即v2 表示行中出现的单词的次数 93 * KEYOUT 即k3 表示文本中出现的不同单词 94 * VALUEOUT 即v3 表示文本中出现的不同单词的总次数 95 * 96*/ 97staticclass MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ 98protectedvoid reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException { 99//显示次数表示redcue函数被调用了多少次,表示k2有多少个分组100 System.out.println("MyReducer输入分组<"+k2.toString()+",...>"); 101long times = 0L; 102for (LongWritable count : v2s) { 103 times += count.get(); 104//显示次数表示输入的k2,v2的键值对数量105 System.out.println("MyReducer输入键值对<"+k2.toString()+","+count.get()+">"); 106 } 107 ctx.write(k2, new LongWritable(times)); 108 }; 109 } 110111112staticclass MyCombiner extends Reducer<Text, LongWritable, Text, LongWritable>{ 113protectedvoid reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException { 114//显示次数表示redcue函数被调用了多少次,表示k2有多少个分组115 System.out.println("Combiner输入分组<"+k2.toString()+",...>"); 116long times = 0L; 117for (LongWritable count : v2s) { 118 times += count.get(); 119//显示次数表示输入的k2,v2的键值对数量120 System.out.println("Combiner输入键值对<"+k2.toString()+","+count.get()+">"); 121 } 122123 ctx.write(k2, new LongWritable(times)); 124//显示次数表示输出的k2,v2的键值对数量125 System.out.println("Combiner输出键值对<"+k2.toString()+","+times+">"); 126 }; 127 } 128 }
代码 3.1
运行结果如下图3.1所示。
Counters: 20 Sensitive Words hello=2 File Output Format Counters Bytes Written=21 FileSystemCounters FILE_BYTES_READ=359 HDFS_BYTES_READ=42 FILE_BYTES_WRITTEN=129080 HDFS_BYTES_WRITTEN=21 File Input Format Counters Bytes Read=21 Map-Reduce Framework Map output materialized bytes=67 Map input records=2 Reduce shuffle bytes=0 Spilled Records=8 Map output bytes=53 Total committed heap usage (bytes)=391774208 SPLIT_RAW_BYTES=95 Combine input records=4 Reduce input records=3 Reduce input groups=3 Combine output records=3 Reduce output records=3 Map output records=4
图 3.1
从上面的运行结果我们可以发现,此时Combine input records=4,Combine output records=3,Reduce input records=3,因为Combine阶段在Ma pper结束与Reducer开始之间,Combiners处理的数据,就是在不设置Combiners时,Reduce所应该接受的数据,所以为4,然后再将Combiners的输出作为Re duce端的输入,所以Reduce input records这个字段由4变成了3。注意,combine操作是一个可选的操作,使用时需要我们自己设定,在本代码中我们用MyRed ucer类来设置Combiners,Combine方法的使用的是Reduce的方法,这说明归约的方法是通用的,Reducer阶段的方法也可以用到Mapper阶段。
2.1 自定义Combiners
为了能够更加清晰的理解Combiners的工作原理,我们自定义一个Combiners类,不再使用MyReduce做为Combiners的类,如代码3.2所示。
1 package combine; 2 3 import java.net.URI; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.FileSystem; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.Partitioner; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 import org.apache.jasper.tagplugins.jstl.core.If; 18 19 /** 20 * 问:为什么使用Combiner? 21 * 答:Combiner发生在Map端,对数据进行规约处理,数据量变小了,传送到reduce端的数据量变小了,传输时间变短,作业的整体时间变短。 22 * 23 * 问:为什么Combiner不作为MR运行的标配,而是可选步骤哪? 24 * 答:因为不是所有的算法都适合使用Combiner处理,例如求平均数。 25 * 26 * 问:Combiner本身已经执行了reduce操作,为什么在Reducer阶段还要执行reduce操作哪? 27 * 答:combiner操作发生在map端的,处理一个任务所接收的文件中的数据,不能跨map任务执行;只有reduce可以接收多个map任务处理的数据。 28 * 29 */ 30 public class WordCountApp2 { 31 static final String INPUT_PATH = "hdfs://hadoop:9000/hello"; 32staticfinal String OUT_PATH = "hdfs://hadoop:9000/out"; 33 34publicstaticvoid main(String[] args) throws Exception { 35 Configuration conf = new Configuration(); 36final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); 37final Path outPath = new Path(OUT_PATH); 38if(fileSystem.exists(outPath)){ 39 fileSystem.delete(outPath, true); 40 } 41final Job job = new Job(conf , WordCountApp2.class.getSimpleName()); 42 job.setJarByClass(WordCountApp2.class); 43 44//1.1指定读取的文件位于哪里 45 FileInputFormat.setInputPaths(job, INPUT_PATH); 46 job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对 47 48//1.2 指定自定义的map类 49 job.setMapperClass(MyMapper.class); 50 job.setMapOutputKeyClass(Text.class);//map输出的<k,v>类型。 51 job.setMapOutputValueClass(LongWritable.class);//如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略 52 53//1.3 分区 54 job.setPartitionerClass(MyPartitioner.class); 55//有几个reduce任务运行 56 job.setNumReduceTasks(2); 57 58//1.4 TODO 排序、分组 59 60//1.5 规约 61 job.setCombinerClass(MyCombiner.class); 62 63//2.2 指定自定义reduce类 64 job.setReducerClass(MyReducer.class); 65//指定reduce的输出类型 66 job.setOutputKeyClass(Text.class); 67 job.setOutputValueClass(LongWritable.class); 68 69//2.3 指定写出到哪里 70 FileOutputFormat.setOutputPath(job, outPath); 71//指定输出文件的格式化类 72//job.setOutputFormatClass(TextOutputFormat.class); 73 74//把job提交给JobTracker运行 75 job.waitForCompletion(true); 76 } 77 78staticclass MyPartitioner extends Partitioner<Text, LongWritable>{ 79 @Override 80publicint getPartition(Text key, LongWritable value, int numReduceTasks) { 81return (key.toString().equals("hello"))?0:1; 82 } 83 } 84 85/** 86 * KEYIN 即k1 表示行的偏移量 87 * VALUEIN 即v1 表示行文本内容 88 * KEYOUT 即k2 表示行中出现的单词 89 * VALUEOUT 即v2 表示行中出现的单词的次数,固定值1 90*/ 91staticclass MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ 92protectedvoid map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException { 93final String[] splited = v1.toString().split("\t"); 94for (String word : splited) { 95 context.write(new Text(word), new LongWritable(1)); 96 System.out.println("Mapper输出<"+word+","+1+">"); 97 } 98 }; 99 } 100101/**102 * KEYIN 即k2 表示行中出现的单词 103 * VALUEIN 即v2 表示行中出现的单词的次数 104 * KEYOUT 即k3 表示文本中出现的不同单词 105 * VALUEOUT 即v3 表示文本中出现的不同单词的总次数 106 * 107*/108staticclass MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ 109protectedvoid reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException { 110//显示次数表示redcue函数被调用了多少次,表示k2有多少个分组111 System.out.println("MyReducer输入分组<"+k2.toString()+",...>"); 112long times = 0L; 113for (LongWritable count : v2s) { 114 times += count.get(); 115//显示次数表示输入的k2,v2的键值对数量116 System.out.println("MyReducer输入键值对<"+k2.toString()+","+count.get()+">"); 117 } 118 ctx.write(k2, new LongWritable(times)); 119 }; 120 } 121122123staticclass MyCombiner extends Reducer<Text, LongWritable, Text, LongWritable>{ 124protectedvoid reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException { 125//显示次数表示redcue函数被调用了多少次,表示k2有多少个分组126 System.out.println("Combiner输入分组<"+k2.toString()+",...>"); 127long times = 0L; 128for (LongWritable count : v2s) { 129 times += count.get(); 130//显示次数表示输入的k2,v2的键值对数量131 System.out.println("Combiner输入键值对<"+k2.toString()+","+count.get()+">"); 132 } 133134 ctx.write(k2, new LongWritable(times)); 135//显示次数表示输出的k2,v2的键值对数量136 System.out.println("Combiner输出键值对<"+k2.toString()+","+times+">"); 137 }; 138 } 139 }
代码 3.2
运行结果如图3.2所示。
14/10/07 18:56:32 INFO mapred.MapTask: record buffer = 262144/327680 Mapper输出<hello,1> 14/10/07 18:56:32 INFO mapred.MapTask: Starting flush of map output Mapper输出<world,1> Mapper输出<hello,1> Mapper输出<me,1> Combiner输入分组<hello,...> Combiner输入键值对<hello,1> Combiner输入键值对<hello,1> Combiner输出键值对<hello,2> Combiner输入分组<me,...> Combiner输入键值对<me,1> Combiner输出键值对<me,1> Combiner输入分组<world,...> Combiner输入键值对<world,1> Combiner输出键值对<world,1> 14/10/07 18:56:32 INFO mapred.MapTask: Finished spill 0 14/10/07 18:56:32 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting 14/10/07 18:56:32 INFO mapred.LocalJobRunner: 14/10/07 18:56:32 INFO mapred.Task: Task ‘attempt_local_0001_m_000000_0‘ done. 14/10/07 18:56:32 INFO mapred.Task: Using ResourceCalculatorPlugin : null 14/10/07 18:56:32 INFO mapred.LocalJobRunner: 14/10/07 18:56:32 INFO mapred.Merger: Merging 1 sorted segments 14/10/07 18:56:32 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 47 bytes 14/10/07 18:56:32 INFO mapred.LocalJobRunner: MyReducer输入分组<hello,...> MyReducer输入键值对<hello,2> MyReducer输入分组<me,...> MyReducer输入键值对<me,1> MyReducer输入分组<world,...> MyReducer输入键值对<world,1> 14/10/07 18:56:33 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting 14/10/07 18:56:33 INFO mapred.LocalJobRunner: 14/10/07 18:56:33 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now 14/10/07 18:56:33 INFO output.FileOutputCommitter: Saved output of task ‘attempt_local_0001_r_000000_0‘ to hdfs://hadoop:9000/output 14/10/07 18:56:33 INFO mapred.LocalJobRunner: reduce > reduce 14/10/07 18:56:33 INFO mapred.Task: Task ‘attempt_local_0001_r_000000_0‘ done. 14/10/07 18:56:33 INFO mapred.JobClient: map 100% reduce 100% 14/10/07 18:56:33 INFO mapred.JobClient: Job complete: job_local_0001 14/10/07 18:56:33 INFO mapred.JobClient: Counters: 19 14/10/07 18:56:33 INFO mapred.JobClient: File Output Format Counters 14/10/07 18:56:33 INFO mapred.JobClient: Bytes Written=21 14/10/07 18:56:33 INFO mapred.JobClient: FileSystemCounters 14/10/07 18:56:33 INFO mapred.JobClient: FILE_BYTES_READ=343 14/10/07 18:56:33 INFO mapred.JobClient: HDFS_BYTES_READ=42 14/10/07 18:56:33 INFO mapred.JobClient: FILE_BYTES_WRITTEN=129572 14/10/07 18:56:33 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=21 14/10/07 18:56:33 INFO mapred.JobClient: File Input Format Counters 14/10/07 18:56:33 INFO mapred.JobClient: Bytes Read=21 14/10/07 18:56:33 INFO mapred.JobClient: Map-Reduce Framework 14/10/07 18:56:33 INFO mapred.JobClient: Map output materialized bytes=51 14/10/07 18:56:33 INFO mapred.JobClient: Map input records=2 14/10/07 18:56:33 INFO mapred.JobClient: Reduce shuffle bytes=0 14/10/07 18:56:33 INFO mapred.JobClient: Spilled Records=6 14/10/07 18:56:33 INFO mapred.JobClient: Map output bytes=53 14/10/07 18:56:33 INFO mapred.JobClient: Total committed heap usage (bytes)=391774208 14/10/07 18:56:33 INFO mapred.JobClient: SPLIT_RAW_BYTES=95 14/10/07 18:56:33 INFO mapred.JobClient: Combine input records=4 14/10/07 18:56:33 INFO mapred.JobClient: Reduce input records=3 14/10/07 18:56:33 INFO mapred.JobClient: Reduce input groups=3 14/10/07 18:56:33 INFO mapred.JobClient: Combine output records=3 14/10/07 18:56:33 INFO mapred.JobClient: Reduce output records=3 14/10/07 18:56:33 INFO mapred.JobClient: Map output records=4
图 3.2
从上面的运行结果我们可以得知,combine具体作用如下:
- 每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。
- combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能。
- 如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。
注意:Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以从我的想法来看,Combiner只应该用于那 种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。
解释一下
*问:为什么使用Combiner?
答:Combiner发生在Map端,对数据进行规约处理,数据量变小了,传送到reduce端的数据量变小了,传输时间变短,作业的整体时间变短。
* 问:为什么Combiner不作为MR运行的标配,而是可选步骤?
答:因为不是所有的算法都适合使用Combiner处理,例如求平均数。
* 问:Combiner本身已经执行了reduce操作,为什么在Reducer阶段还要执行reduce操作?
答:combiner操作发生在map端的,智能处理一个map任务中的数据,不能跨map任务执行;只有reduce可以接收多个map任务处理的数据。
三、Partitioner编程
4.1 什么是分区
在MapReuce程序中的Mapper任务的第三步就是分区,那么分区到底是干什么的呢?其实,把数据分区是为了更好的利用数据,根据数据的属性不同来分成不同区,再根据不同的分区完成不同的任务。MapReduce程序中他的默认分区是1个分区,我们看一下默认分区的代码,还是以单词统计为例如代码4.1所示。
1 package counter; 2 3 import java.net.URI; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.FileSystem; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Counter; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 19 20 public class WordCountApp { 21 static final String INPUT_PATH = "hdfs://hadoop:9000/input"; 22staticfinal String OUT_PATH = "hdfs://hadoop:9000/output"; 23 24publicstaticvoid main(String[] args) throws Exception { 25 26 Configuration conf = new Configuration(); 27 28final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); 29final Path outPath = new Path(OUT_PATH); 30 31if(fileSystem.exists(outPath)){ 32 fileSystem.delete(outPath, true); 33 } 34final Job job = new Job(conf , WordCountApp.class.getSimpleName()); 35 36//1.1指定读取的文件位于哪里 37 FileInputFormat.setInputPaths(job, INPUT_PATH); 38 job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对 39 40//1.2 指定自定义的map类 41 job.setMapperClass(MyMapper.class); 42 job.setMapOutputKeyClass(Text.class);//map输出的<k,v>类型。 43 job.setMapOutputValueClass(LongWritable.class);//如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略 44 45//1.3 分区 46 job.setPartitionerClass(HashPartitioner.class); 47 job.setNumReduceTasks(1);//有一个reduce任务运行 48 49 job.setCombinerClass(MyReducer.class); 50//2.2 指定自定义reduce类 51 job.setReducerClass(MyReducer.class); 52 53 job.setOutputKeyClass(Text.class);//指定reduce的输出类型 54 job.setOutputValueClass(LongWritable.class); 55 56//2.3 指定写出到哪里 57 FileOutputFormat.setOutputPath(job, outPath); 58 job.setOutputFormatClass(TextOutputFormat.class);//指定输出文件的格式化类 59 60 job.waitForCompletion(true);//把job提交给JobTracker运行 61 } 62 63/** 64 * KEYIN 即k1 表示行的偏移量 65 * VALUEIN 即v1 表示行文本内容 66 * KEYOUT 即k2 表示行中出现的单词 67 * VALUEOUT 即v2 表示行中出现的单词的次数,固定值1 68*/ 69staticclass MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ 70protectedvoid map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException { 71final Counter helloCounter = context.getCounter("Sensitive Words", "hello"); 72 73final String line = v1.toString(); 74if(line.contains("hello")){ 75//记录敏感词出现在一行中 76 helloCounter.increment(1L); 77 } 78final String[] splited = line.split("\t"); 79for (String word : splited) { 80 context.write(new Text(word), new LongWritable(1)); 81 } 82 }; 83 } 84 85/** 86 * KEYIN 即k2 表示行中出现的单词 87 * VALUEIN 即v2 表示行中出现的单词的次数 88 * KEYOUT 即k3 表示文本中出现的不同单词 89 * VALUEOUT 即v3 表示文本中出现的不同单词的总次数 90 * 91*/ 92staticclass MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ 93protectedvoid reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException { 94long times = 0L; 95for (LongWritable count : v2s) { 96 times += count.get(); 97 } 98 ctx.write(k2, new LongWritable(times)); 99 }; 100 } 101102 }
代码 4.1
在MapReduce程序中默认的分区方法为HashPartitioner,代码job.setNumReduceTasks(1)表示运行的Reduce任务数,他会将numReduceTask这个变量设为1. HashPartitioner继承自Partitioner,Partitioner是Partitioner的基类,如果需要定制partitioner也需要继承该类。 HashPartitioner计算方法如代码4.2所示。
1 public class HashPartitioner<K, V> extends Partitioner<K, V> { 23/** Use {@link Object#hashCode()} to partition. */4publicint getPartition(K key, V value, 5int numReduceTasks) { 6return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; 7 } 89 }
代码 4.2
在上面的代码中K和V,表示k2和v2,该类中只有一个方法getPartition(),返回值如下”(key.hashCode()& Integer.MAX_VALUE)%numReduceTasks“其中key.hashCode()表示该关键是否属于该类。numReduceTasks的值在上面代码中设置为1,取模后只有一种结果那就是0。getPartition()的意义就是表示划分到不同区域的一个标记,返回0,就是表示划分到第0区,所以我们可以把它理解分区的下标,来代表不同的分区。
4.2 自定义分区
下面我们尝试自定义一个分区,来处理一下手机的日志数据(在前面学习中用过),手机日志数据如下图4.1所示。
图 4.1
从图中我们可以发现,在第二列上并不是所有的数据都是手机号,我们任务就是在统计手机流量时,将手机号码和非手机号输出到不同的文件中。我们的分区是按手机和非手机号码来分的,所以我们可以按该字段的长度来划分,如代码4.3所示。
1 package partition; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.fs.Path; 9 import org.apache.hadoop.io.LongWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.io.Writable; 12 import org.apache.hadoop.mapreduce.Job; 13 import org.apache.hadoop.mapreduce.Mapper; 14 import org.apache.hadoop.mapreduce.Reducer; 15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 19 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 20 21 public class KpiApp { 22 static final String INPUT_PATH = "hdfs://hadoop:9000/wlan"; 23staticfinal String OUT_PATH = "hdfs://hadoop:9000/out"; 24publicstaticvoid main(String[] args) throws Exception{ 25final Job job = new Job(new Configuration(), KpiApp.class.getSimpleName()); 26 27 job.setJarByClass(KpiApp.class); 28 29//1.1 指定输入文件路径 30 FileInputFormat.setInputPaths(job, INPUT_PATH); 31 job.setInputFormatClass(TextInputFormat.class);//指定哪个类用来格式化输入文件 32 33//1.2指定自定义的Mapper类 34 job.setMapperClass(MyMapper.class); 35 job.setMapOutputKeyClass(Text.class);//指定输出<k2,v2>的类型 36 job.setMapOutputValueClass(KpiWritable.class); 37 38//1.3 指定分区类 39 job.setPartitionerClass(KpiPartitioner.class); 40 job.setNumReduceTasks(2); 41 42//2.2 指定自定义的reduce类 43 job.setReducerClass(MyReducer.class); 44 job.setOutputKeyClass(Text.class);//指定输出<k3,v3>的类型 45 job.setOutputValueClass(KpiWritable.class); 46 47//2.3 指定输出到哪里 48 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); 49 job.setOutputFormatClass(TextOutputFormat.class);//设定输出文件的格式化类 50 job.waitForCompletion(true);//把代码提交给JobTracker执行 51 } 52 53staticclass MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable>{ 54protectedvoid map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,KpiWritable>.Context context) throws IOException ,InterruptedException { 55final String[] splited = value.toString().split("\t"); 56final String msisdn = splited[1]; 57final Text k2 = new Text(msisdn); 58final KpiWritable v2 = new KpiWritable(splited[6],splited[7],splited[8],splited[9]); 59 context.write(k2, v2); 60 }; 61 } 62 63staticclass MyReducer extends Reducer<Text, KpiWritable, Text, KpiWritable>{ 64/** 65 * @param k2 表示整个文件中不同的手机号码 66 * @param v2s 表示该手机号在不同时段的流量的集合 67*/ 68protectedvoid reduce(Text k2, java.lang.Iterable<KpiWritable> v2s, org.apache.hadoop.mapreduce.Reducer<Text,KpiWritable,Text,KpiWritable>.Context context) throws IOException ,InterruptedException { 69long upPackNum = 0L; 70long downPackNum = 0L; 71long upPayLoad = 0L; 72long downPayLoad = 0L; 73 74for (KpiWritable kpiWritable : v2s) { 75 upPackNum += kpiWritable.upPackNum; 76 downPackNum += kpiWritable.downPackNum; 77 upPayLoad += kpiWritable.upPayLoad; 78 downPayLoad += kpiWritable.downPayLoad; 79 } 80 81final KpiWritable v3 = new KpiWritable(upPackNum+"", downPackNum+"", upPayLoad+"", downPayLoad+""); 82 context.write(k2, v3); 83 }; 84 } 85 86staticclass KpiPartitioner extends HashPartitioner<Text, KpiWritable>{ 87 @Override 88publicint getPartition(Text key, KpiWritable value, int numReduceTasks) { 89return (key.toString().length()==11)?0:1; 90 } 91 } 92} 93 94class KpiWritable implements Writable{ 95long upPackNum; 96long downPackNum; 97long upPayLoad; 98long downPayLoad; 99100public KpiWritable(){} 101102public KpiWritable(String upPackNum, String downPackNum, String upPayLoad, String downPayLoad){ 103this.upPackNum = Long.parseLong(upPackNum); 104this.downPackNum = Long.parseLong(downPackNum); 105this.upPayLoad = Long.parseLong(upPayLoad); 106this.downPayLoad = Long.parseLong(downPayLoad); 107 } 108109110 @Override 111publicvoid readFields(DataInput in) throws IOException { 112this.upPackNum = in.readLong(); 113this.downPackNum = in.readLong(); 114this.upPayLoad = in.readLong(); 115this.downPayLoad = in.readLong(); 116 } 117118 @Override 119publicvoid write(DataOutput out) throws IOException { 120 out.writeLong(upPackNum); 121 out.writeLong(downPackNum); 122 out.writeLong(upPayLoad); 123 out.writeLong(downPayLoad); 124 } 125126 @Override 127public String toString() { 128return upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t" + downPayLoad; 129 } 130 }
代码 4.3
注意:分区的例子必须打成jar运行,运行结果如下图4.3,4.4所示,4.3表示手机号码流量,4.4为非手机号流量。
图 4.3
图4.4
我们知道一个分区对应一个Reducer任务是否是这样呢,我可以通过访问50030MapReduce端口来验证,在浏览器输入”http://hadoop:50030"可以看到MapReduce界面,如图4.5,4.6所示。
图 4.5
图4.6
从图中可以知道,该MapReduce任务有一个Mapper任务,两个Reducer任务,那么我们细看一下Reducer的两个任务到底是什么?如图4.7,4.8,4.9所示。task_201410070239_0002_r_000000表示第一个分区的输出,有20条记录,task_201410070239_0002_r_000001表示第二分区,有一条输出记录。和我们程序运行结果一样。
图 4.7
图 4.8 第一分区
图 4.9 第二分区
综上一些列分析,分区的用处如下:
1.根据业务需要,产生多个输出文件
2.多个reduce任务在并发运行,提高整体job的运行效率
原文:http://www.cnblogs.com/sunddenly/p/4009568.html
内容总结
以上是互联网集市为您收集整理的Hadoop日记Day17---计数器、map规约、分区学习全部内容,希望文章能够帮你解决Hadoop日记Day17---计数器、map规约、分区学习所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。