一脸懵逼学习Hadoop中的MapReduce程序中自定义分组的实现
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了一脸懵逼学习Hadoop中的MapReduce程序中自定义分组的实现,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含14277字,纯文字阅读大概需要21分钟。
内容图文
![一脸懵逼学习Hadoop中的MapReduce程序中自定义分组的实现](/upload/InfoBanner/zyjiaocheng/1196/ca1a51dd3fdd4a6ba7c693698eb6ee13.jpg)
1:首先搞好实体类对象:
write 是把每个对象序列化到输出流,readFields是把输入流字节反序列化,实现WritableComparable,Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法
1 package com.areapartition; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 7 import org.apache.hadoop.io.Writable; 8 import org.apache.hadoop.io.WritableComparable; 9 10 /* ** 11 * 12 * @author Administrator 13 * 1:write 是把每个对象序列化到输出流 14 * 2:readFields是把输入流字节反序列化 15 * 3:实现WritableComparable 16 * Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法 17 * 18 */ 19 public class FlowBean implements WritableComparable<FlowBean>{ 20 21 22private String phoneNumber;//电话号码 23privatelong upFlow;//上行流量 24privatelong downFlow;//下行流量 25privatelong sumFlow;//总流量 26 27 28 29public String getPhoneNumber() { 30return phoneNumber; 31 } 32publicvoid setPhoneNumber(String phoneNumber) { 33this.phoneNumber = phoneNumber; 34 } 35publiclong getUpFlow() { 36return upFlow; 37 } 38publicvoid setUpFlow(long upFlow) { 39this.upFlow = upFlow; 40 } 41publiclong getDownFlow() { 42return downFlow; 43 } 44publicvoid setDownFlow(long downFlow) { 45this.downFlow = downFlow; 46 } 47publiclong getSumFlow() { 48return sumFlow; 49 } 50publicvoid setSumFlow(long sumFlow) { 51this.sumFlow = sumFlow; 52 } 53 54//为了对象数据的初始化方便,加入一个带参的构造函数 55public FlowBean(String phoneNumber, long upFlow, long downFlow) { 56this.phoneNumber = phoneNumber; 57this.upFlow = upFlow; 58this.downFlow = downFlow; 59this.sumFlow = upFlow + downFlow; 60 } 61//在反序列化时候,反射机制需要调用空参的构造函数,所以定义了一个空参的构造函数 62public FlowBean() { 63 } 64 65//重写toString()方法 66 @Override 67public String toString() { 68return"" + upFlow + "\t" + downFlow + "\t" + sumFlow + ""; 69 } 70 71 72//从数据流中反序列出对象的数据 73//从数据流中读取字段时必须和序列化的顺序保持一致 74 @Override 75publicvoid readFields(DataInput in) throws IOException { 76 phoneNumber = in.readUTF(); 77 upFlow = in.readLong(); 78 downFlow = in.readLong(); 79 sumFlow = in.readLong(); 80 81 } 82 83//将对象数据序列化到流中 84 @Override 85publicvoid write(DataOutput out) throws IOException { 86out.writeUTF(phoneNumber); 87out.writeLong(upFlow); 88out.writeLong(downFlow); 89out.writeLong(sumFlow); 90 91 } 92 93//流量比较的实现方法 94 @Override 95publicint compareTo(FlowBean o) { 96 97//大就返回-1,小于等于返回1,进行倒序排序 98return sumFlow > o.sumFlow ? -1 : 1; 99 } 100101102103 }
2:流量分区处理操作的步骤:
2. 1:对流量原始日志进行流量统计,将不同的省份的用户统计结果输出到不同文件;
2.2:需要自定义改造两个机制:
2.2.1:改造分区的逻辑,自定义一个partitioner
2.2.2:自定义reducer task的并发任务数
1 package com.areapartition; 2 3 import java.io.IOException; 4 5 import org.apache.commons.lang.StringUtils; 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.Reducer; 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 16 /* ** 17 * 流量分区处理操作 18 * @author Administrator 19 * 1:对流量原始日志进行流量统计,将不同的省份的用户统计结果输出到不同文件; 20 * 2:需要自定义改造两个机制: 21 * 2.1:改造分区的逻辑,自定义一个partitioner 22 * 2.2:自定义reducer task的并发任务数 23 */ 24 public class FlowSumArea { 25 26 27 public static class FlowSumAreaMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ 28 @Override 29protectedvoid map(LongWritable key, Text value, Context context) 30 throws IOException, InterruptedException { 31//拿到一行数据 32 String line = value.toString(); 33//切分成各个字段 34 String[] fields = StringUtils.split(line, "\t"); 35 36//获取到我们需要的字段 37 String phoneNumber = fields[1]; 38long up_flow = Long.parseLong(fields[7]); 39long down_flow = Long.parseLong(fields[8]); 40 41//封装成key-value并且输出 42 context.write(new Text(phoneNumber), new FlowBean(phoneNumber, up_flow, down_flow)); 43 } 44 } 45 46 47publicstaticclass FlowSumAreaReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ 48 @Override 49protectedvoid reduce(Text key, Iterable<FlowBean> values, Context context) 50 throws IOException, InterruptedException { 51//遍历求和 52long up_flowSum = 0; 53long down_flowSum = 0; 54for(FlowBean fb : values){ 55 up_flowSum += fb.getUpFlow(); 56 down_flowSum += fb.getDownFlow(); 57 } 58 59//封装成key-value并且输出 60 context.write(key, new FlowBean(key.toString(),up_flowSum,down_flowSum)); 61 } 62 63 } 64 65 66publicstaticvoid main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 67//创建配置文件 68 Configuration conf = new Configuration(); 69//获取一个作业 70 Job job = Job.getInstance(conf); 71 72//设置整个job所用的那些类在哪个jar包 73 job.setJarByClass(FlowSumArea.class); 74//本job使用的mapper和reducer的类 75 job.setMapperClass(FlowSumAreaMapper.class); 76 job.setReducerClass(FlowSumAreaReducer.class); 77 78//设置我们自定义的分组逻辑定义 79 job.setPartitionerClass(AreaPartitioner.class); 80 81//指定mapper的输出数据key-value类型 82 job.setMapOutputKeyClass(Text.class); 83 job.setMapOutputValueClass(FlowBean.class); 84 85//指定reduce的输出数据key-value类型Text 86 job.setOutputKeyClass(Text.class); 87 job.setOutputValueClass(FlowBean.class); 88 89 90//设置reduce的任务并发数,应该跟分组的数量保持一致 91 job.setNumReduceTasks(7); 92 93//指定要处理的输入数据存放路径 94//FileInputFormat是所有以文件作为数据源的InputFormat实现的基类, 95//FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。 96//至于获得记录的方法是有不同的子类——TextInputFormat进行实现的。 97 FileInputFormat.setInputPaths(job, new Path(args[0])); 98 99//指定处理结果的输出数据存放路径100 FileOutputFormat.setOutputPath(job, new Path(args[1])); 101102//将job提交给集群运行 103//job.waitForCompletion(true); 104//正常执行成功返回0,否则返回1105 System.exit(job.waitForCompletion(true) ? 0 : 1);; 106107 } 108109 }
3:从key中拿到手机号,查询手机归属地字典,不同的省份返回不同的组号:
3.1:Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类。
3.2: HashPartitioner是mapreduce的默认partitioner。计算方法是 which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。
1 package com.areapartition; 2 3 import java.util.HashMap; 4 5 import org.apache.hadoop.mapreduce.Partitioner; 6 7 public class AreaPartitioner<KEY,VALUE> extends Partitioner<KEY, VALUE>{ 8 9privatestatic HashMap<String, Integer> areaMap = new HashMap<String,Integer>(); 1011static{ 12 areaMap.put("135", 0); 13 areaMap.put("136", 1); 14 areaMap.put("137", 2); 15 areaMap.put("138", 3); 16 areaMap.put("139", 4); 17 areaMap.put("841", 5); 18 } 1920 @Override 21publicint getPartition(KEY key, VALUE value, int numPartitions) { 22//从key中拿到手机号,查询手机归属地字典,不同的省份返回不同的组号23 Integer areaCoder = areaMap.get(key.toString().subSequence(0, 3)) == null ? 6 : areaMap.get(key.toString().subSequence(0, 3)); 242526return areaCoder; 27 } 282930 }
4:将打好的jar包上传到虚拟机上面:
然后启动搭建的集群start-dfs.sh,start-yarn.sh:
然后操作如下所示:
1 [root@master hadoop]# hadoop jar flowarea.jar com.areapartition.FlowSumArea /flow/data /flow/areaoutput4 217/09/2515:36:38 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.0.55:8032 317/09/2515:36:38 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 417/09/2515:36:38 INFO input.FileInputFormat: Total input paths to process : 1 517/09/2515:36:38 INFO mapreduce.JobSubmitter: number of splits:1 617/09/2515:36:38 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1506324201206_0004 717/09/2515:36:38 INFO impl.YarnClientImpl: Submitted application application_1506324201206_0004 817/09/2515:36:38 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1506324201206_0004/ 917/09/2515:36:38 INFO mapreduce.Job: Running job: job_1506324201206_0004 1017/09/2515:36:43 INFO mapreduce.Job: Job job_1506324201206_0004 running in uber mode : false 1117/09/2515:36:43 INFO mapreduce.Job: map 0% reduce 0% 1217/09/2515:36:48 INFO mapreduce.Job: map 100% reduce 0% 1317/09/2515:36:56 INFO mapreduce.Job: map 100% reduce 14% 1417/09/2515:37:04 INFO mapreduce.Job: map 100% reduce 29% 1517/09/2515:37:08 INFO mapreduce.Job: map 100% reduce 43% 1617/09/2515:37:10 INFO mapreduce.Job: map 100% reduce 71% 1717/09/2515:37:11 INFO mapreduce.Job: map 100% reduce 86% 1817/09/2515:37:12 INFO mapreduce.Job: map 100% reduce 100% 1917/09/2515:37:12 INFO mapreduce.Job: Job job_1506324201206_0004 completed successfully 2017/09/2515:37:12 INFO mapreduce.Job: Counters: 49 21 File System Counters 22 FILE: Number of bytes read=1158 23 FILE: Number of bytes written=746635 24 FILE: Number of read operations=0 25 FILE: Number of large read operations=0 26 FILE: Number of write operations=0 27 HDFS: Number of bytes read=2322 28 HDFS: Number of bytes written=526 29 HDFS: Number of read operations=24 30 HDFS: Number of large read operations=0 31 HDFS: Number of write operations=14 32 Job Counters 33 Launched map tasks=1 34 Launched reduce tasks=7 35 Data-local map tasks=1 36 Total time spent by all maps in occupied slots (ms)=2781 37 Total time spent by all reduces in occupied slots (ms)=98540 38 Total time spent by all map tasks (ms)=2781 39 Total time spent by all reduce tasks (ms)=98540 40 Total vcore-seconds taken by all map tasks=2781 41 Total vcore-seconds taken by all reduce tasks=98540 42 Total megabyte-seconds taken by all map tasks=2847744 43 Total megabyte-seconds taken by all reduce tasks=100904960 44 Map-Reduce Framework 45 Map input records=22 46 Map output records=22 47 Map output bytes=1072 48 Map output materialized bytes=1158 49 Input split bytes=93 50 Combine input records=0 51 Combine output records=0 52 Reduce input groups=21 53 Reduce shuffle bytes=1158 54 Reduce input records=22 55 Reduce output records=21 56 Spilled Records=44 57 Shuffled Maps =7 58 Failed Shuffles=0 59 Merged Map outputs=7 60 GC time elapsed (ms)=1751 61 CPU time spent (ms)=4130 62 Physical memory (bytes) snapshot=570224640 63 Virtual memory (bytes) snapshot=2914865152 64 Total committed heap usage (bytes)=234950656 65 Shuffle Errors 66 BAD_ID=0 67 CONNECTION=0 68 IO_ERROR=0 69 WRONG_LENGTH=0 70 WRONG_MAP=0 71 WRONG_REDUCE=0 72 File Input Format Counters 73 Bytes Read=2229 74 File Output Format Counters 75 Bytes Written=526 76 [root@master hadoop]# hadoop fs -ls /flow/ 77 Found 10 items 78 drwxr-xr-x - root supergroup 02017-09-2515:25 /flow/areaoutput 79 drwxr-xr-x - root supergroup 02017-09-2515:34 /flow/areaoutput2 80 drwxr-xr-x - root supergroup 02017-09-2515:35 /flow/areaoutput3 81 drwxr-xr-x - root supergroup 02017-09-2515:37 /flow/areaoutput4 82 -rw-r--r-- 1 root supergroup 22292017-09-2010:00 /flow/data 83 drwxr-xr-x - root supergroup 02017-09-2009:35 /flow/output 84 drwxr-xr-x - root supergroup 02017-09-2009:47 /flow/output2 85 drwxr-xr-x - root supergroup 02017-09-2010:01 /flow/output3 86 drwxr-xr-x - root supergroup 02017-09-2010:21 /flow/output4 87 drwxr-xr-x - root supergroup 02017-09-2119:32 /flow/sortoutput 88 [root@master hadoop]# hadoop fs -ls /flow/areaoutput4 89 Found 8 items 90 -rw-r--r-- 1 root supergroup 02017-09-2515:37 /flow/areaoutput4/_SUCCESS 91 -rw-r--r-- 1 root supergroup 772017-09-2515:36 /flow/areaoutput4/part-r-00000 92 -rw-r--r-- 1 root supergroup 492017-09-2515:37 /flow/areaoutput4/part-r-00001 93 -rw-r--r-- 1 root supergroup 1042017-09-2515:37 /flow/areaoutput4/part-r-00002 94 -rw-r--r-- 1 root supergroup 222017-09-2515:37 /flow/areaoutput4/part-r-00003 95 -rw-r--r-- 1 root supergroup 1022017-09-2515:37 /flow/areaoutput4/part-r-00004 96 -rw-r--r-- 1 root supergroup 242017-09-2515:37 /flow/areaoutput4/part-r-00005 97 -rw-r--r-- 1 root supergroup 1482017-09-2515:37 /flow/areaoutput4/part-r-00006 98 [root@master hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00000 9913502468823102733574371001356043666695420011541011356043965858924006292102 [root@master hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-0000110313602846565121938195010413660577991969606969105 [root@master hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-000021061371919941902002001071372623050324812468127162108137262388882481246812716210913760778710120200320110 [root@master hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00003111138265441010200200112 [root@master hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-0000411313922314466300837206728114139250574136311058111211151392625110602002001161392643565615122001712117 [root@master hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-0000511884138413411614325548119 [root@master hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-000061201348025310418020038012115013685858273659368612215920133257203156317612315989002119319381941124182115759611215271539125183201733821895319549
5:复制多份测试数据操作如下,测试map的多线程执行:
5.1:map task 的并发数是切片的数量决定的,有多少个切片,就启动多少个map task。
5.2:切片是一个逻辑的概念,指的就是文件中数据的偏移量的范围。
5.3:切片的具体大小应该根据所处理的文件的大小来调整。
[root@master hadoop]# hadoop fs -mkdir /flow/data/ [root@master hadoop]# hadoop fs -put HTTP_20130313143750.dat /flow/data/ [root@master hadoop]# hadoop fs -cp /flow/data/HTTP_20130313143750.dat /flow/data/HTTP_20130313143750.dat.2 [root@master hadoop]# hadoop fs -cp /flow/data/HTTP_20130313143750.dat /flow/data/HTTP_20130313143750.dat.3 [root@master hadoop]# hadoop fs -cp /flow/data/HTTP_20130313143750.dat /flow/data/HTTP_20130313143750.dat.4 [root@master hadoop]# hadoop fs -ls /flow/data/ Found 4 items -rw-r--r-- 1 root supergroup 22292017-09-2516:36 /flow/data/HTTP_20130313143750.dat -rw-r--r-- 1 root supergroup 22292017-09-2516:36 /flow/data/HTTP_20130313143750.dat.2 -rw-r--r-- 1 root supergroup 22292017-09-2516:37 /flow/data/HTTP_20130313143750.dat.3 -rw-r--r-- 1 root supergroup 22292017-09-2516:37 /flow/data/HTTP_20130313143750.dat.4 [root@master hadoop]#
6: Combiners编程
6.1:每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。
6.2:combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能。
6.3: 如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。
6.4:注意:Combiner的输出是Reducer的输入,如果Combiner是可插拔的,添加Combiner绝不能改变最终的计算结果。所以Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。
7:shuffle机制:
7.1:每个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spill.percent),一个后台线程把内容写到(spill)磁盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件。
7.2:写磁盘前,要partition,sort。如果有combiner,combine排序后数据。
7.3:等最后记录写完,合并全部溢出写文件为一个分区且排序的文件。
7.4:Reducer通过Http方式得到输出文件的分区。
7.5:TaskTracker为分区文件运行Reduce任务。复制阶段把Map输出复制到Reducer的内存或磁盘。一个Map任务完成,Reduce就开始复制输出。
7.6:排序阶段合并map输出。然后走Reduce阶段。
原文:http://www.cnblogs.com/biehongli/p/7591518.html
内容总结
以上是互联网集市为您收集整理的一脸懵逼学习Hadoop中的MapReduce程序中自定义分组的实现全部内容,希望文章能够帮你解决一脸懵逼学习Hadoop中的MapReduce程序中自定义分组的实现所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。