Hadoop上路-03_Hadoop JavaAPI
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了Hadoop上路-03_Hadoop JavaAPI,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含26677字,纯文字阅读大概需要39分钟。
内容图文
一。<SPAN style=‘font-family: "Times New Roman";‘>Eclipse
安装1.下载解压
下载:http://www.eclipse.org/downloads/
解压:<SPAN style=‘color: rgb(153, 51, 0); font-family: "courier new", courier;‘>SHELL$ sudo tar -zxvf eclipse.tar.gz
2.快捷方式
右键<SPAN style=‘font-family: "Times New Roman";‘>Ubuntu桌面,创建启动器
3.创建一个<SPAN style=‘font-family: "Times New Roman";‘>JavaProject
4.添加必须<SPAN style=‘font-family: "Times New Roman";‘>jar
全部<SPAN style=‘font-family: "Times New Roman";‘>jar都可以在<SPAN style=‘font-family: "Times New Roman";‘>%Hadoop安装目录<SPAN style=‘font-family: "Times New Roman";‘>%/share/hadoop目录中找到。
二。基本操作
这里仅限<SPAN style=‘font-family: "Times New Roman";‘>FileSystem中的方法,其数量繁多,具体查看<SPAN style=‘font-family: "Times New Roman";‘>API。
1.遍历目录和文件 <SPAN style=‘font-family: "Times New Roman";‘>listStatus
package hadooptest; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.Test; public class HdfsTest { private static FileSystem hdfs; @Test public void test() throws Exception { // 1.创建配置器 Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.1.240:9000"); conf.set("mapred.jop.tracker", "192.168.1.240:9001"); // 2.创建文件系统 hdfs = FileSystem.get(conf); // 3.遍历HDFS上的文件和目录 FileStatus[] fs = hdfs.listStatus(new Path("hdfs:/")); if (fs.length > 0) { for (FileStatus f : fs) { showDir(f); } } } privatestaticvoid showDir(FileStatus fs) throws Exception { Path path = fs.getPath(); System.out.println(path); // 如果是目录 //if (fs.isDir()) { //已过期if (fs.isDirectory()) { FileStatus[] f = hdfs.listStatus(path); if (f.length > 0) { for (FileStatus file : f) { showDir(file); } } } } }
2.遍历文件 listFiles
@Test public void test() throws Exception { // 1.配置器 Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.1.240:9000"); conf.set("mapred.jop.tracker", "192.168.1.240:9001"); // 2.文件系统 hdfs = FileSystem.get(conf); // 3.遍历HDFS上的文件 RemoteIterator<LocatedFileStatus> fs = hdfs.listFiles(new Path("hdfs:/"), true); while(fs.hasNext()){ System.out.println(fs.next()); } }
3.判断存在 <SPAN style=‘font-family: "Times New Roman";‘>exists
@Test public void test() throws Exception { // 1.创建配置器 Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.1.240:9000"); conf.set("mapred.jop.tracker", "192.168.1.240:9001"); //2.创建文件系统 FileSystem hdfs = FileSystem.get(conf); //3.创建可供hadoop使用的文件系统路径 Path file = new Path("hdfs:/test.txt"); // 4.判断文件是否存在(文件目标路径) System.out.println("文件存在:" + hdfs.exists(file)); }
4.判断目录<SPAN style=‘font-family: "Times New Roman";‘>/文件 <SPAN style=‘font-family: "Times New Roman";‘>isDirectory/isFile
package hadooptest; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.Test; public class HdfsTest { private static FileSystem hdfs; @Test public void test() throws Exception { // 1.配置器 Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.1.240:9000"); conf.set("mapred.jop.tracker", "192.168.1.240:9001"); // 2.文件系统 hdfs = FileSystem.get(conf); // 3.遍历HDFS上目前拥有的文件和目录 FileStatus[] fs = hdfs.listStatus(new Path("hdfs:/")); if (fs.length > 0) { for (FileStatus f : fs) { showDir(f); } } else{ System.out.println("没什么好遍历的..."); } } privatestaticvoid showDir(FileStatus fs) throws Exception { Path path = fs.getPath(); // 如果是目录if (fs.isDirectory()) { System.out.println("目录:" + path); FileStatus[] f = hdfs.listStatus(path); if (f.length > 0) { for (FileStatus file : f) { showDir(file); } } } else { System.out.println("文件:" + path); } } }
5.最后修改时间 getModificationTime
package hadooptest; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.Test; public class HdfsTest2 { private static FileSystem hdfs; @Test public void test() throws Exception { // 1.创建配置器 Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.1.240:9000"); conf.set("mapred.jop.tracker", "192.168.1.240:9001"); // 2.创建文件系统(指定为HDFS文件系统到URI) hdfs = FileSystem.get(conf); // 3.列出HDFS上目前拥有的文件和目录 FileStatus[] fs = hdfs.listStatus(new Path("hdfs:/")); if(fs.length>0){ for (FileStatus f : fs) { showDir(f); } } } privatestaticvoid showDir(FileStatus fs) throws Exception { Path path = fs.getPath(); //获取最后修改时间long time = fs.getModificationTime(); System.out.println("HDFS文件的最后修改时间:"+new Date(time)); System.out.println(path); if (fs.isDirectory()) { FileStatus[] f = hdfs.listStatus(path); if(f.length>0){ for (FileStatus file : f) { showDir(file); } } } } }
6.文件备份状态 getFileBlockLocations
package hadooptest; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.Test; public class HdfsTest2 { @Test public void test() throws Exception { //1.配置器 Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.1.240:9000"); conf.set("mapred.jop.tracker", "192.168.1.240:9001"); //2.文件系统 FileSystem fs = FileSystem.get(conf); //3.已存在的,必须是文件 Path path = new Path("hdfs:/vigiles/dir/test3.txt"); //4.文件状态 FileStatus status = fs.getFileStatus(path); //5.文件块 //BlockLocation[] blockLocations = fs.getFileBlockLocations(status, 0, status.getLen()); //方法1,传入文件的FileStatus BlockLocation[] blockLocations = fs.getFileBlockLocations(path, 0, status.getLen()); //方法2,传入文件的Path int blockLen = blockLocations.length; System.err.println("块数量:"+blockLen); //如果文件不够大,就不会分块,即得到1for (int i = 0; i < blockLen; i++) { //得到块文件大小long sizes = blockLocations[i].getLength(); System.err.println("块大小:"+sizes); //按照备份数量得到全部主机名 String[] hosts = blockLocations[i].getHosts(); for (String host : hosts) { System.err.println("主机名:"+host); } //按照备份数量得到全部主机名 String[] names = blockLocations[i].getNames(); for (String name : names) { System.err.println("IP:"+ name); } } } }
7.读取文件 <SPAN style=‘font-family: "Times New Roman";‘>open
package hadooptest; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.Test; public class HdfsTest2 { @Test public void test() throws Exception { Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.1.240:9000"); conf.set("mapred.jop.tracker", "192.168.1.240:9001"); FileSystem fs = FileSystem.get(conf); Path path = new Path("hdfs:/vigiles/dir/test3.txt"); FSDataInputStream is = fs.open(path); FileStatus stat = fs.getFileStatus(path); byte[] buffer = newbyte[Integer.parseInt(String.valueOf(stat.getLen()))]; is.readFully(0, buffer); is.close(); fs.close(); System.out.println(new String(buffer)); } }
8.复制上传文件 <SPAN style=‘font-family: "Times New Roman";‘>copyFromLocalFile
@Test public void test() throws Exception { // 1.创建配置器 Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.1.240:9000"); conf.set("mapred.jop.tracker", "192.168.1.240:9001"); //2.创建文件系统 FileSystem hdfs = FileSystem.get(conf); //3.创建可供hadoop使用的文件系统路径 Path src = new Path("file:/home/hadoop/桌面/copy_test.txt"); //本地目录/文件 Path dst = new Path("hdfs:/"); //目标目录/文件 // 4.拷贝本地文件上传(本地文件,目标路径) hdfs.copyFromLocalFile(src, dst); System.out.println("文件上传成功至:" + conf.get("fs.default.name")); // 5.列出HDFS上的文件 FileStatus[] fs = hdfs.listStatus(dst); for (FileStatus f : fs) { System.out.println(f.getPath()); } Path path = new Path("hdfs:/copy_test.txt"); FSDataInputStream is = hdfs.open(path); FileStatus stat = hdfs.getFileStatus(path); byte[] buffer = newbyte[Integer.parseInt(String.valueOf(stat.getLen()))]; is.readFully(0, buffer); is.close(); hdfs.close(); System.out.println("文件内容:" + new String(buffer)); }
另:移动上传<SPAN style=‘font-family: "Times New Roman";‘>moveFromLocalFile,和<SPAN style=‘font-family: "Times New Roman";‘>copyFromLocalFile类似,但其操作后源文件将不存在。
9.复制下载文件 copyToLocalFile
@Test public void test() throws Exception { Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.1.240:9000"); conf.set("mapred.jop.tracker", "192.168.1.240:9001"); FileSystem hdfs = FileSystem.get(conf); //创建HDFS源路径和本地目标路径 Path src = new Path("hdfs:/copy_test.txt"); //目标目录/文件 Path dst = new Path("file:/home/hadoop/桌面/new.txt"); //本地目录/文件 //拷贝本地文件上传(本地文件,目标路径) hdfs.copyToLocalFile(src, dst); }
另:<SPAN style=‘font-family: "Times New Roman";‘>moveToLocalFile,其操作后源文件将不存在。
10.创建目录 <SPAN style=‘font-family: "Times New Roman";‘>mkdirs
@Test public void test() throws Exception { Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.1.240:9000"); conf.set("mapred.jop.tracker", "192.168.1.240:9001"); FileSystem hdfs = FileSystem.get(conf); //创建目录 hdfs.mkdirs(new Path("hdfs:/eminem")); }
11.创建目录<SPAN style=‘font-family: "Times New Roman";‘>/文件 <SPAN style=‘font-family: "Times New Roman";‘>create
@Test public void test() throws Exception { Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.1.240:9000"); conf.set("mapred.jop.tracker", "192.168.1.240:9001"); FileSystem hdfs = FileSystem.get(conf); // 使用HDFS数据输出流(写)对象 在HDSF上根目录创建一个文件夹,其内再创建文件 FSDataOutputStream out = hdfs.create(new Path("hdfs:/vigiles/eminem.txt")); // 在文件中写入一行数据,必须使用UTF-8 out.write("痞子阿姆,Hello !".getBytes("UTF-8")); out = hdfs.create(new Path("/vigiles/alizee.txt")); out.write("艾莉婕,Hello !".getBytes("UTF-8")); out.close(); FSDataInputStream is = hdfs.open(new Path("hdfs:/vigiles/alizee.txt")); FileStatus stat = hdfs.getFileStatus(new Path("hdfs:/vigiles/alizee.txt")); byte[] buffer = newbyte[Integer.parseInt(String.valueOf(stat.getLen()))]; is.readFully(0, buffer); is.close(); hdfs.close(); System.out.println(new String(buffer)); }
12.创建空文件 <SPAN style=‘font-family: "Times New Roman";‘>createNewFile
@Test public void test() throws Exception { Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.1.240:9000"); conf.set("mapred.jop.tracker", "192.168.1.240:9001"); FileSystem hdfs = FileSystem.get(conf); //创建空文件 hdfs.createNewFile(new Path("hdfs:/newfile.txt")); }
13.写入文件 <SPAN style=‘font-family: "Times New Roman";‘>append
@Test public void test() throws Exception { Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.1.240:9000"); conf.set("mapred.jop.tracker", "192.168.1.240:9001"); FileSystem hdfs = FileSystem.get(conf); //创建空文件 FSDataOutputStream out = hdfs.append(new Path("hdfs:/newfile.txt")); out.write("使用append方法写入文件\n".getBytes("UTF-8")); out.close(); out = hdfs.append(new Path("/newfile.txt")); out.write("再次写入!!!\n".getBytes("UTF-8")); out.close(); }
14.重命名文件 <SPAN style=‘font-family: "Times New Roman";‘>rename
@Test public void test() throws Exception { Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.1.240:9000"); conf.set("mapred.jop.tracker", "192.168.1.240:9001"); FileSystem fs = FileSystem.get(conf); //重命名:fs.rename(源文件,新文件)boolean rename = fs.rename(new Path("/copy_test.txt"), new Path("/copy.txt")); System.out.println(rename); }
15.删除文件 <SPAN style=‘font-family: "Times New Roman";‘>delete
@Test public void test() throws Exception { Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.1.240:9000"); conf.set("mapred.jop.tracker", "192.168.1.240:9001"); FileSystem fs = FileSystem.get(conf); //判断删除(路径,true。false=非空时不删除,抛RemoteException、IOException异常)boolean delete = fs.delete(new Path("hdfs:/test.txt"), true); System.out.println("执行删除:"+delete); //FileSystem关闭时执行boolean exit = fs.deleteOnExit(new Path("/out.txt")); System.out.println("执行删除:"+exit); fs.close(); }
三。MapReduce常用算法
1.计数
1)数据准备
2)代码
1 package hadooptest; 2 3 import java.io.IOException; 4 import java.util.StringTokenizer; 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.IntWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 14 15 /* 16 * 单词计数 17 */ 18 public class WordCount { 19 20 /* 21 * 先经过mapper运算,然后才是reducer。 22 * 内部类:映射器 Mapper<Key_IN, Value_IN, Key_OUT, Value_OUT> 23 */ 24 public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> { 25 26//计数,查到一个就占个坑 27privatestaticfinal IntWritable one = new IntWritable(1); 28//文本 29private Text word = new Text(); 30 31/** 32 * 重写map方法,实现理想效果 33 * MyMapper的实例只有一个,但实例的这个map方法却一直在执行 34 * Key1:文本行号。Value1:指定行的文本。context:上下文对象 35 * 这里K1、V1像这样[K,V] 36 **/ 37publicvoid map(Object Key1, Text Value1, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { 38//拆分字符串,返回单词集合。默认以空格拆分 39 StringTokenizer itr = new StringTokenizer(Value1.toString()); 40//遍历一行的全部单词 41while (itr.hasMoreTokens()) { 42//将文本转为临时Text变量 43this.word.set(itr.nextToken()); 44//将单词保存到上下文对象中(单词,占坑),输出 45 context.write(this.word, one); 46 } 47 } 48 } 49 50/************************************************************************ 51 * 在Mapper后,Reducer前,有个shuffle过程,会根据k2将对应的v2归并为v2[...] * 52 *************************************************************************/ 53 54/* 55 * mapper结束后,执行现在的reducer。 56 * 内部类:拆分器 Reducer<Key_IN, Value_IN, Key_OUT, Value_OUT> 57*/ 58publicstaticclass MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 59 60//个数统计 61private IntWritable result = new IntWritable(); 62 63/** 64 * 重写reduce方法,实现理想效果 65 * MyReducer的实例也只有一个,但实例的这个reduce方法却一直在执行 66 * Key2:单词。Values2:value的集合,也就是[1,1,1,...]。context:上下文对象 67 * 这里这里K2、V2像这样[K,V[1,1,1,...]] 68 **/ 69publicvoid reduce(Text Key2, Iterable<IntWritable> Values2, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { 70int sum = 0; 71//累加V2的元素,有多少个 1 ,即有多少个指定单词 72for (IntWritable val : Values2) { 73 sum += val.get(); 74 } 75this.result.set(sum); 76//终于将单词和总个数再次输出 77 context.write(Key2, this.result); 78 } 79 } 80 81publicstaticvoid main(String[] args) throws Exception { 82// 声明配置信息 83 Configuration conf = new Configuration(); 84 conf.set("fs.default.name", "hdfs://192.168.1.240:9000"); 85// 创建作业 86 Job job = new Job(conf, "word count"); 87 job.setJarByClass(WordCount.class); 88// 设置mr 89 job.setMapperClass(MyMapper.class); 90 job.setReducerClass(MyReducer.class); 91// 设置输出类型,和Context上下文对象write的参数类型一致 92 job.setOutputKeyClass(Text.class); 93 job.setOutputValueClass(IntWritable.class); 94// 设置输入输出路径 95 FileInputFormat.addInputPath(job, new Path("hdfs:/input")); //文件已经存在 96 FileOutputFormat.setOutputPath(job, new Path("hdfs:/output")); //尚未存在 97// 执行 98 System.exit(job.waitForCompletion(true) ? 0 : 1); 99 } 100 }
3)结果
2.排序
1)数据准备
2)代码
1 package hadooptest; 2 3 import * 4 5//hadoop默认排序: 6//如果k2、v2类型是Text-文本,结果是按照字典顺序 7//如果k2、v2类型是LongWritable-数字,结果是按照数字大小顺序 8 9publicclass TestSort { 10/**11 * 内部类:映射器 Mapper<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT> 12*/13publicstaticclass MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> { 14/**15 * 重写map方法 16*/17publicvoid map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { 18//这里v1转为k2-数字类型,舍弃k1。null为v219 context.write(new LongWritable(Long.parseLong(v1.toString())), NullWritable.get()); 20//因为v1可能重复,这时,k2也是可能有重复的21 } 22 } 2324/*** 在此方法执行前,有个shuffle过程,会根据k2将对应的v2归并为v2[...] ***/2526/**27 * 内部类:拆分器 Reducer<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT> 28*/29publicstaticclass MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> { 30/**31 * 重写reduce方法 32*/33protectedvoid reduce(LongWritable k2, Iterable<NullWritable> v2, Context context) throws IOException, InterruptedException { 34//k2=>k3, v2[...]舍弃。null => v335 context.write(k2, NullWritable.get()); 36//此时,k3如果发生重复,根据默认算法会发生覆盖,即最终仅保存一个k337 } 38 } 3940publicstaticvoid main(String[] args) throws Exception { 41// 声明配置信息42 Configuration conf = new Configuration(); 43 conf.set("fs.default.name", "hdfs://192.168.1.240:9000"); 4445// 创建作业46 Job job = new Job(conf, "Test Sort"); 47 job.setJarByClass(TestSort.class); 4849// 设置mr50 job.setMapperClass(MyMapper.class); 51 job.setReducerClass(MyReducer.class); 5253// 设置输出类型,和Context上下文对象write的参数类型一致54 job.setOutputKeyClass(LongWritable.class); 55 job.setOutputValueClass(NullWritable.class); 5657// 设置输入输出路径58 FileInputFormat.setInputPaths(job, new Path("/input/")); 59 FileOutputFormat.setOutputPath(job, new Path("/out")); 6061// 执行62 System.exit(job.waitForCompletion(true) ? 0 : 1); 63 } 64 }
3)结果
3.去重
1 /* 2 * 内部类:映射器 Mapper<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT> 3 */ 4 public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> { 5/**** 6 * 重写map方法 7 ****/ 8publicvoid map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { 9//因为我们读入的数据就是一行一个数字,直接使用 10//这个数字有几个都无所谓,只有知道有这么一个数字即可,所以输出的v2为null11 context.write(new LongWritable(Long.parseLong(v1.toString())), NullWritable.get()); 12 } 13 } 1415/** 在此方法执行前,有个shuffle过程,会根据k2将对应的v2归并为v2[...] **/1617/*18 * 内部类:拆分器 Reducer<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT> 19*/20publicstaticclass MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> { 21/**** 22 * 重写reduce方法 23 ****/24protectedvoid reduce(LongWritable k2, Iterable<NullWritable> v2, Context context) throws IOException, InterruptedException { 25//此时,k3(即眼前的k2)如果发生重复,根据默认算法会发生覆盖,即最终仅保存一个k3,达到去重到效果,而v3是null无所谓26 context.write(k2, NullWritable.get()); 2728 } 29 }
4.过滤
1 /* 2 * 内部类:映射器 Mapper<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT> 3 */ 4 public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> { 5 String tmp = "8238"; 6 7/** 8 * 重写map方法。k1:行首字符索引,v1:这一行文本 9 **/10protectedvoid map(LongWritable k1, Text v1, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException ,InterruptedException { 11 System.out.println(v1+", "+tmp); 12//如果行文本是指定值,过滤之13if(v1.toString().equals(tmp)){ 14 System.out.println("有了"); 15//保存(按照泛型限制,k2是Text,v2是Nullritable)16 context.write(v1, NullWritable.get()); 17 } 18 } 19 } 2021/*22 * 内部类:拆分器 Reducer<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT> 23*/24publicstaticclass MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> { 25/**26 * 重写reduce方法 27 **/28protectedvoid reduce(Text k2, Iterable<NullWritable> v2, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException ,InterruptedException { 29 context.write(k2, NullWritable.get()); 30 } 31 }
如果报错:
Error: java.io.IOException: Type
mismatch in key from map: expected org.apache.hadoop.io.LongWritable, received
org.apache.hadoop.io.Text
一定要检查main方法里:
// 设置输出类型,和Context上下文对象write的参数类型一致 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class);
5.TopN
1)数值最大
1 // map(泛型定义了输入和输出类型) 2 public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> { 3 4// 首先创建一个临时变量,保存一个可存储的最小值:Long.MIN_VALUE=-9223372036854775808 5long temp = Long.MIN_VALUE; 6 7// 找出最大值。这个map不断迭代v1,最终保存最大值 8protectedvoid map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { 910// 将文本转数值11long val = Long.parseLong(v1.toString()); 12// 如果v1比临时变量大,则保存v1的值13if (temp < val) { 14 temp = val; 15 } 16 } 1718/** ---此方法在全部的map任务结束后执行一次。这时仅输出临时变量到最大值--- **/19protectedvoid cleanup(Context context) throws IOException, InterruptedException { 20 context.write(new LongWritable(temp), NullWritable.get()); 21 System.out.println("文件读取完毕,保存最大值"); //输出两次,对应两个文本文件22 } 23 } 2425// reduce26publicstaticclass MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> { 27// 临时变量28 Long temp = Long.MIN_VALUE; 2930// 因为一个文件得到一个最大值,我们有两个txt文件会得到两个值。再次将这些值比对,得到最大的31protectedvoid reduce(LongWritable k2, Iterable<NullWritable> v2, Context context) throws IOException, InterruptedException { 3233long val = Long.parseLong(k2.toString()); 34// 如果k2比临时变量大,则保存k2的值35if (temp < val) { 36 temp = val; 37 } 38 } 3940/** !!!此方法在全部的reduce任务结束后执行一次。这时仅输出唯一最大值!!! **/41protectedvoid cleanup(Context context) throws IOException, InterruptedException { 42 context.write(new LongWritable(temp), NullWritable.get()); 43 } 44 }
2)数值前<SPAN style=‘font-family: "Times New Roman";‘>5大
1 // map 2 public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> { 3 4// 首先创建一个临时变量,保存一个可存储的最小值:Long.MIN_VALUE=-9223372036854775808 5long temp = Long.MIN_VALUE; 6// Top5存储空间,我们取前5个 7long[] tops; 8 9/** 这个方法在run中调用,在全部map之前执行一次 **/10protectedvoid setup(Context context) { 11// 初始化数组长度为512 tops = newlong[5]; 13 } 1415// 找出最大值16protectedvoid map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { 1718// 将文本转数值19finallong val = Long.parseLong(v1.toString()); 20// 保存在0索引21 tops[0] = val; 22// 排序后最大值在最后一个索引,这样从[5]到[0]依次减小。每执行一次map,最小的[0]都会赋予新值23 Arrays.sort(tops); 24 } 2526/** ---此方法在全部到map任务结束后执行一次。输出map后得到的前5个最大值--- **/27protectedvoid cleanup(Context context) throws IOException, InterruptedException { 28for (int i = 0; i < tops.length; i++) { 29 context.write(new LongWritable(tops[i]), NullWritable.get()); 30 } 31 } 32 } 3334// reduce35publicstaticclass MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> { 36 Long temp = Long.MIN_VALUE; 37long[] tops; 3839/** 次方法在run中调用,在全部map之前执行一次 **/40protectedvoid setup(Context context) { 41 tops = newlong[5]; 42 } 4344// 因为每个文件都得到5个值,再次将这些值比对,得到最大的45protectedvoid reduce(LongWritable k2, Iterable<NullWritable> v2, Context context) throws IOException, InterruptedException { 46long top = Long.parseLong(k2.toString()); 47 tops[0] = top; 48 Arrays.sort(tops); 49 } 5051/** ---此方法在全部到reduce任务结束后执行一次--- **/52protectedvoid cleanup(Context context) throws IOException, InterruptedException { 53for (int i = 0; i < tops.length; i++) { 54 context.write(new LongWritable(tops[i]), NullWritable.get()); 55 } 56 } 57 }
3)数量最大
public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> { privatestaticfinal IntWritable one = new IntWritable(1); private Text word = new Text(); publicvoid map(Object Key1, Text Value1, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String[] strings = Value1.toString().split(" "); for (String str : strings) { this.word.set(str); context.write(this.word, one); } } } publicstaticclass MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { //临时变量,保存最大数量的单词private String keyer; //注意这里不能用Hadoop的类型,如Text private IntWritable valer; //这里最好也是基本的java数据类型,如int //计数private Integer temp = Integer.MIN_VALUE; publicvoid reduce(Text Key2, Iterable<IntWritable> Values2, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum = 0; //统计数量for (IntWritable val : Values2) { sum += val.get(); } //保存最大数量值if (sum > temp) { temp = sum; keyer = Key2.toString(); valer = new IntWritable(temp); } } //最终输出最大数量的单词protectedvoid cleanup(Context context) throws IOException, InterruptedException { context.write(new Text(keyer), valer); } }
6.单表关联
/* 父 子 子 孙 1 2 2 3 A B B C */ // map public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> { // 拆分原始数据protectedvoid map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { // 按制表符拆分记录。一行拆出两个角色 String[] splits = v1.toString().split(" "); //针对无意义的换行过滤if (splits.length > 1) { // 把“父”作为k2;“子“加下划线区分,作为v2 context.write(new Text(splits[0]), new Text("_" + splits[1])); // 把“子”作为k2;“父”辈作为v2。就是把原两个单词调换位置保存 context.write(new Text(splits[1]), new Text(splits[0])); } } /** * 父 _子 * 子 父 * * 子 _孙 * 孙 子 **/ } /** * k2 v2[...] * 子 [父,_孙] **/// reducepublicstaticclass MyReducer extends Reducer<Text, Text, Text, Text> { // 拆分k2v2[...]数据protectedvoid reduce(Text k2, Iterable<Text> v2, Context context) throws IOException, InterruptedException { String grandson = ""; // “孙” String grandfather = ""; // “父” // 从迭代中遍历v2[...]for (Text man : v2) { String p = man.toString(); System.out.println("得到:" + p); // 如果单词是以下划线开始的if (p.startsWith("_")) { grandson = p.substring(1); } // 如果单词没有下划线起始else { // 直接赋值给孙辈变量 grandfather = p; } } // 在得到有效数据的情况下if (grandson != "" && grandfather != "") { // 写出得到的结果。 context.write(new Text(grandson), new Text(grandfather)); } /** * k3=父,v3=孙 **/ } }
7.双表关联
// map public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> { // 拆分原始数据protectedvoid map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { // 拆分记录 String[] splited = v1.toString().split(" "); // 如果第一列是数字(使用正则判断),就是歌曲表。先读入那个文件由hadoop决定if (splited[0].matches("^[-+]?(([0-9]+)([.]([0-9]+))?|([.]([0-9]+))?)$")) { String id = splited[0]; String song = splited[1]; //v2加两条下划线作为前缀标识为歌曲 context.write(new Text(id), new Text("__" + song)); } // 否则就是歌手表else { String singer = splited[0]; String id = splited[1]; //v2-加两条横线作为前缀标识为歌手 context.write(new Text(id), new Text("--" + singer)); } /** * 1 __Eminem 1 --LoseYourself **/ } } // reducepublicstaticclass MyReducer extends Reducer<Text, Text, Text, Text> { // 拆分k2v2[...]数据protectedvoid reduce(Text k2, Iterable<Text> v2, Context context) throws IOException, InterruptedException { String song = ""; // 歌曲 String singer = ""; // 歌手/** * 1, [__Eminem, --LoseYourself] **/for (Text text : v2) { String tmp = text.toString(); if (tmp.startsWith("__")) { // 如果是__开头的是song song = tmp.substring(2); // 从索引2开始截取字符串 } if (tmp.startsWith("--")) { // 如果是--开头的是歌手 singer = tmp.substring(2); } } context.write(new Text(singer), new Text(song)); } /** * k3=Eminem,v3=LoseYourself * Eminem LoseYourself Alizee LaIslaBonita Michael YouAreNotAlone Manson FuckFrankie * **/ }
- end
原文:http://www.cnblogs.com/vigiles/p/3623621.html
内容总结
以上是互联网集市为您收集整理的Hadoop上路-03_Hadoop JavaAPI全部内容,希望文章能够帮你解决Hadoop上路-03_Hadoop JavaAPI所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。