Hadoop 系列(二)Top N
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了Hadoop 系列(二)Top N,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含7473字,纯文字阅读大概需要11分钟。
内容图文
一:流程分析
Top N简介
关系数据库中经常有Top n数据查询的大部分是以下四种需求
1.直接min或者max就可以取得最大或者最小的数据 (top 1)
2.升级一点就再加上一个groupby取一个分组内的最大值,最小值(分组内的top1)
3.top 10需求,使用order函数取一个前10
4.分组内的top 10需求,使用window 函数生成一个虚拟列,虚拟列取< 11的数据就可以
相同的我们在mapreduce中也可能需要实现这种需求:
1.key取相同的值,value取最大值,或者最小值就可以。(优化一点的就是在map阶段就聚合部分的数据,不然容易数据倾斜,其实就是Combiner,但是没有做过helloworld,自己先试试)
2.key取groupby的值,value取最大值,最小值。(优化方案:map阶段取出来组内的最大最小值)
3.key取相同的值,value取一个前10
4.key取groupby的值,value取一个前10
我们可以把这四种全部都实现一下:有一点需要谨记:所有的map函数和reduce函数都不是只执行一次的
代码
数据:
2020040112 1
2020040113 3
2020040114 4
2020040115 5
2020040116 6
2020040117 7
2020040118 8
2020040119 9
2020040312 1
2020040313 3
2020040314 4
2020040315 5
2020040316 6
2020040317 7
2020040318 8
2020040319 9
2020040412 1
2020040413 3
2020040414 4
2020040415 5
2020040416 6
2020040417 7
2020040418 8
2020040419 9
代码1 输出最高温度和最低温度:
package org.example;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private int max = 0;
private int min = 0;
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line[] = value.toString().split(" ");
if(Integer.valueOf(line[1]) > max){
max = Integer.valueOf(line[1]);
}
if(Integer.valueOf(line[1]) < min){
min = Integer.valueOf(line[1]);
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
context.write(new Text("min"),new IntWritable(min));
context.write(new Text("max"),new IntWritable(max));
}
}
class WordcountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
private int max = 0;
private int min = 0;
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
for (IntWritable value : values) {
if(value.get() > max){
max = value.get();
}
if(value.get() < min){
min = value.get();
}
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
context.write(new Text("min"),new IntWritable(min));
context.write(new Text("max"),new IntWritable(max));
}
}
public class WordcountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
FileSystem fs= FileSystem.get(conf);
String outputPath = "/software/java/data/output/";
if(fs.exists(new Path(outputPath))) fs.delete(new Path(outputPath),true);
Job job = Job.getInstance(conf);
job.setJarByClass(WordcountDriver.class);
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path("/software/java/data/input/"));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
//将job配置的参数,以及job所用的java类所在的jar包提交给yarn去运行
//job.submit();
boolean res = job.waitForCompletion(true);
}
}
代码2 分组内输出最高温度和最低温度:
package org.example; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; class WordcountMapper extends Mapper<LongWritable, Text, Text, Text> { private Map<String,String> minmaxMap = new HashMap<String,String>(); @Override protectedvoid map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line[] = value.toString().split("\\ "); String date = line[0].substring(0,line[0].length()-2); int temperature = Integer.parseInt(line[1]); if(minmaxMap.containsKey(date)){ int max = Integer.parseInt(minmaxMap.get(date).split("\\:")[0]); int min = Integer.parseInt(minmaxMap.get(date).split("\\:")[1]); if(temperature > max){ minmaxMap.put(date,temperature+":"+min); } if(temperature < min){ minmaxMap.put(date,max+":"+temperature); } } else{ minmaxMap.put(date,temperature+":"+temperature); } } @Override protectedvoid cleanup(Context context) throws IOException, InterruptedException { for (Map.Entry<String, String> dateTemperature :minmaxMap.entrySet()) { System.out.println("map"+dateTemperature.getKey() + "|"+dateTemperature.getValue()); context.write(new Text(dateTemperature.getKey()),new Text(dateTemperature.getValue())); } } } class WordcountReducer extends Reducer<Text,Text,Text,Text> { private Map<String,String> minmaxMap = new HashMap<String,String>(); @Override protectedvoid reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value:values ) { String date = key.toString(); if(minmaxMap.containsKey(date)){ int existMax = Integer.parseInt(minmaxMap.get(date).split("\\:")[0]); int existMin = Integer.parseInt(minmaxMap.get(date).split("\\:")[1]); int max = Integer.parseInt(value.toString().split("\\:")[0]); int min = Integer.parseInt(value.toString().split("\\:")[1]); int finalMax = existMax > max ? existMax:max; int finalMin = existMin < min ? existMin:min; minmaxMap.put(date,finalMax+":"+finalMin); } else{ minmaxMap.put(date,value.toString()); } } } @Override protectedvoid cleanup(Context context) throws IOException, InterruptedException { for (Map.Entry<String, String> dateTemperature :minmaxMap.entrySet()) { System.out.println("reduce"+dateTemperature.getKey() + "|"+dateTemperature.getValue()); context.write(new Text(dateTemperature.getKey()),new Text(dateTemperature.getValue())); } } } publicclass WordcountDriver { publicstaticvoid main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "file:///"); FileSystem fs= FileSystem.get(conf); String outputPath = "/software/java/data/output/"; if(fs.exists(new Path(outputPath))) fs.delete(new Path(outputPath),true); Job job = Job.getInstance(conf); job.setJarByClass(WordcountDriver.class); job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path("/software/java/data/input/")); FileOutputFormat.setOutputPath(job, new Path(outputPath)); //将job配置的参数,以及job所用的java类所在的jar包提交给yarn去运行 //job.submit(); boolean res = job.waitForCompletion(true); } }
代码3和代码4就不写了,因为差不多。
原文:https://www.cnblogs.com/wuxiaolong4/p/12733518.html
内容总结
以上是互联网集市为您收集整理的Hadoop 系列(二)Top N全部内容,希望文章能够帮你解决Hadoop 系列(二)Top N所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。