首页 / HADOOP / Hadoop辅助排序样例二
Hadoop辅助排序样例二
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了Hadoop辅助排序样例二,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含6562字,纯文字阅读大概需要10分钟。
内容图文
![Hadoop辅助排序样例二](/upload/InfoBanner/zyjiaocheng/1172/01f9e1f510c6482e9e4663011bbfc182.jpg)
求每年的最高温度
2. 样例数据
1995 10 1996 11 1995 16 1995 22 1996 26 1995 3 1996 7 1996 10 1996 20 1996 33 1995 21 1996 9 1995 31 1995 -13 1995 22 1997 -2 1997 28 1997 15 1995 8
将记录按年份分组并按温度降序排序,然后才将同一年份的所有记录送到一个 reducer 组,则各组的首条记录就是这一年的最高温度。实现此方案的要点是:
a. 定义包括自然键(年份)和自然值(温度)的组合键。
b. 根据组合键对记录进行排序。
c. 针对组合键进行分区和分组时均只考虑自然键。
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * 组合键,此例中用于辅助排序,包括年份和温度。 */ public class IntPair implements WritableComparable<IntPair> { private IntWritable first; private IntWritable second; public IntPair() { this.first = new IntWritable(); this.second = new IntWritable(); //若注释掉上面两行,使用时会发生异常 java.lang.NullPointerException at IntPair.readFields } public IntPair(int first, int second) { set(new IntWritable(first), new IntWritable(second)); } public IntPair(IntWritable first, IntWritable second) { set(first, second); } public void set(IntWritable first, IntWritable second) { this.first = first; this.second = second; } public IntWritable getFirst() { return first; } public IntWritable getSecond() { return second; } public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } @Override public int hashCode() { return first.hashCode() * 163 + second.hashCode(); } @Override public boolean equals(Object obj) { if (obj instanceof IntPair) { IntPair ip = (IntPair) obj; return first.get() == ip.first.get() && second.get() == ip.second.get(); } return false; } @Override public String toString() { return first + "\t" + second; } public int compareTo(IntPair o) { int cmp = first.compareTo(o.first); if (cmp == 0) { cmp = second.compareTo(o.second); } return cmp; } }
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; public class MaxTemperatureUsingSecondarySort extends Configured implements Tool { static class MaxTemperatureMapper extends Mapper<LongWritable, Text, IntPair, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] val = value.toString().split("\\t"); if (val.length == 2) { context.write(new IntPair(Integer.parseInt(val[0]), Integer.parseInt(val[1])), NullWritable.get()); } } } static class MaxTemperatureReducer extends Reducer<IntPair, NullWritable, IntPair, NullWritable> { @Override protected void reduce(IntPair key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); //仅输出第一行 } } //仅根据 first 分区 public static class FirstPartitioner extends Partitioner<IntPair, NullWritable> { @Override public int getPartition(IntPair key, NullWritable value, int numPartitions) { return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions; } } //仅根据 first 分组 public static class GroupComparator extends WritableComparator { private static final IntWritable.Comparator INT_COMPARATOR = new IntWritable.Comparator(); protected GroupComparator() { super(IntPair.class, true); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); return INT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2); } catch (IOException e) { throw new IllegalArgumentException(e); } } @Override public int compare(WritableComparable a, WritableComparable b) { if (a instanceof IntPair && b instanceof IntPair) { return ((IntPair) a).getFirst().compareTo(((IntPair) b).getFirst()); } return super.compare(a, b); } } //根据组合键排序 public static class KeyComparator extends WritableComparator { protected KeyComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { if (a instanceof IntPair && b instanceof IntPair) { IntPair ip1 = (IntPair) a; IntPair ip2 = (IntPair) b; int cmp = ip1.getFirst().compareTo(ip2.getFirst()); //升序(年份) if (cmp != 0) { return cmp; } return -ip1.getSecond().compareTo(ip2.getSecond()); //降序(温度) } return super.compare(a, b); } } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Parameter number is wrong, please enter two parameters:<input> <output>"); System.exit(-1); } Path inputPath = new Path(otherArgs[0]); Path outputPath = new Path(otherArgs[1]); //conf.set("fs.defaultFS", "hdfs://vmnode.zhch:9000"); Job job = Job.getInstance(conf, "MaxTemperatureUsingSecondarySort"); //job.setJar("F:/workspace/AssistRanking2/target/AssistRanking2-1.0-SNAPSHOT.jar"); job.setMapperClass(MaxTemperatureMapper.class); job.setPartitionerClass(FirstPartitioner.class); job.setSortComparatorClass(KeyComparator.class); //默认根据 Key 的 compareTo 函数排序 job.setGroupingComparatorClass(GroupComparator.class); job.setReducerClass(MaxTemperatureReducer.class); job.setMapOutputKeyClass(IntPair.class); job.setOutputKeyClass(IntPair.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), args); System.exit(exitCode); } }
![技术分享](/upload/getfiles/default/2022/11/12/20221112110227170.jpg)
注:本例源自 《Hadoop权威指南》第三版 8.2.4
原文:http://my.oschina.net/zc741520/blog/528448
内容总结
以上是互联网集市为您收集整理的Hadoop辅助排序样例二全部内容,希望文章能够帮你解决Hadoop辅助排序样例二所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。