流量汇总(自定义jar包,在hadoop集群上 统计,排序,分组)之统计
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了流量汇总(自定义jar包,在hadoop集群上 统计,排序,分组)之统计,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含10180字,纯文字阅读大概需要15分钟。
内容图文
小知识点:
package cn.itcast.hadoop.mr.flowsort;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import cn.itcast.hadoop.mr.flowsum.FlowBean;
publicclassSortMR{
publicstaticclassSortMapperextendsMapper<LongWritable,Text,FlowBean,NullWritable>{
//拿到一行数据,切分出各字段,封装为一个flowbean,作为key输出
@Override
protectedvoid map(LongWritable key,Text value,Context context)
throwsIOException,InterruptedException{
String line = value.toString();
String[] fields =StringUtils.split(line,"\t");
String phoneNB = fields[0];
long u_flow =Long.parseLong(fields[1]);
long d_flow =Long.parseLong(fields[2]);
context.write(newFlowBean(phoneNB, u_flow, d_flow),NullWritable.get());
}
}
publicstaticclassSortReducerextendsReducer<FlowBean,NullWritable,Text,FlowBean>{
@Override
protectedvoid reduce(FlowBean key,Iterable<NullWritable> values,Context context)
throwsIOException,InterruptedException{
String phoneNB = key.getPhoneNB();
context.write(newText(phoneNB), key);
}
}
publicstaticvoid main(String[] args)throwsException{
Configuration conf =newConfiguration();
Job job =Job.getInstance(conf);
// main方法所在的类,此处表示自身的类
job.setJarByClass(SortMR.class);
//会代表map,reduce的output,如果不一样可以申明mapoutput类型,像下面的一样
job.setMapperClass(SortMapper.class);
job.setReducerClass(SortReducer.class);
// mapoutput类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//这两个参数正好是 hadoop jar 。。 最后两个参数
FileInputFormat.setInputPaths(job,newPath(args[0]));
FileOutputFormat.setOutputPath(job,newPath(args[1]));
//标准输出
System.exit(job.waitForCompletion(true)?0:1);
}
}
package cn.itcast.hadoop.mr.areapartition;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.metrics2.impl.ConfigBuilder;
import cn.itcast.hadoop.mr.flowsum.FlowBean;
/**
* 对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件
* 需要自定义改造两个机制
* 1,改造分区的逻辑,自定义一个partitioneer
* 2,自定义reduer task的并发任务数
*/
publicclassFlowSumArea{
publicstaticclassFlowSumAreaMapperextendsMapper<LongWritable,Text,Text,FlowBean>{
@Override
protectedvoid map(LongWritable key,Text value,Context context)
throwsIOException,InterruptedException{
//拿一行数据
String line = value.toString();
//切分成各个字段
String[] fields =StringUtils.split(line,"\t");
//拿到我们的字段
String phoneNB = fields[1];
long u_flow =Long.parseLong(fields[7]);
long d_flow =Long.parseLong(fields[8]);
//封装数据为kv并输出
context.write(newText(phoneNB),newFlowBean(phoneNB,u_flow,d_flow));
}
}
publicstaticclassFlowSumAreaReducerextendsReducer<Text,FlowBean,Text,FlowBean>{
@Override
protectedvoid reduce(Text key,Iterable<FlowBean> values,Context context)
throwsIOException,InterruptedException{
long up_flow_counter =0;
long d_flow_counter =0;
for(FlowBean bean : values){
up_flow_counter +=bean.getUp_flow();
d_flow_counter += bean.getD_flow();
}
context.write(key,newFlowBean(key.toString(),up_flow_counter,d_flow_counter));
}
}
publicstaticvoid main(String[] args)throwsIOException,ClassNotFoundException,InterruptedException{
Configuration conf =newConfiguration();
Job job =Job.getInstance(conf);
job.setJarByClass(FlowSumArea.class);
//job.setMapperClass(FlowSumAreaMapper.class);
job.setMapperClass(FlowSumAreaMapper.class);
job.setReducerClass(FlowSumAreaReducer.class);
//设置我们自定义的分组逻辑定义
job.setPartitionerClass(AreaPartitioner.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//设置reduce的任务并发数,应该跟分组的数量保持一致
job.setNumReduceTasks(6);
//进程数如果大了,后面的文件为空,小了会出现错误,为1则没有分组
FileInputFormat.setInputPaths(job,newPath(args[0]));
FileOutputFormat.setOutputPath(job,newPath(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
package cn.itcast.hadoop.mr.areapartition;
import java.util.HashMap;
import org.apache.hadoop.mapreduce.Partitioner;
publicclassAreaPartitioner<KEY, VALUE>extendsPartitioner<KEY, VALUE>{
privatestaticHashMap<String,Integer> areaMap =newHashMap<>();
static{
areaMap.put("135",0);
areaMap.put("136",1);
areaMap.put("137",2);
areaMap.put("138",3);
areaMap.put("139",4);
}
@Override
publicint getPartition(KEY key, VALUE value,int numPartitions){
//从key中拿到手机号,查询手机归属地字典,不同省份返回不同的组号
int areaCoder = areaMap.get(key.toString().substring(0,3))==null?5:areaMap.get(key.toString().substring(0,3));
return areaCoder;
}
}
附件列表
原文:http://www.cnblogs.com/xiaoxiao5ya/p/c23cd7c85104ae4bc5875c798d81fb2e.html
内容总结
以上是互联网集市为您收集整理的流量汇总(自定义jar包,在hadoop集群上 统计,排序,分组)之统计全部内容,希望文章能够帮你解决流量汇总(自定义jar包,在hadoop集群上 统计,排序,分组)之统计所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。