HBase - MapReduce - HBase 作为输入源的示例 | 那伊抹微笑
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了HBase - MapReduce - HBase 作为输入源的示例 | 那伊抹微笑,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含5153字,纯文字阅读大概需要8分钟。
内容图文
博文标题: HBase - MapReduce - HBase 作为输入源的示例 | 那伊抹微笑
个性签名: 世界上最遥远的距离不是天涯,也不是海角,而是我站在妳的面前,妳却感觉不到我的存在
技术方向: Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive+Mahout+Spark ... 云计算技术
转载声明: 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明,谢谢合作!
qq交流群:214293307 (期待与你一起学习,共同进步)
Configuration config = HBaseConfiguration.create();...mapper需要继承于 TableMapper ...
Job job = new Job(config, "ExampleRead");
job.setJarByClass(MyReadJob.class); // class that contains mapper
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don‘t set to true for MR jobs
// set other scan attrs
...
TableMapReduceUtil.initTableMapperJob(
tableName, // input HBase table name
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper
null, // mapper output key
null, // mapper output value
job);
job.setOutputFormatClass(NullOutputFormat.class); // because we aren‘t emitting anything from mapper
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
public class MyMapper extends TableMapper<Text, LongWritable> { public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException { // process data for the row from the Result instance.
package com.itdog8.cloud.hbase.mr.test; import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /** * TestHBaseAsSourceMapReduceMainClass * * @author 那伊抹微笑 * @date 2015-07-30 18:00:21 * */ public class TestHBaseAsSourceMapReduceMainClass { private static final Log _log = LogFactory.getLog(TestHBaseAsSourceMapReduceMainClass.class); private static final String JOB_NAME = "TestHBaseAsSourceMapReduce"; private static String tmpPath = "/tmp/com/itdog8/yting/TestHBaseAsSourceMapReduce"; private static String hbaseInputTble = "itdog8:test_1"; public static class ExampleSourceMapper extends TableMapper<Text, Text> { private Text k = new Text(); private Text v = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); } @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { String rowkey = Bytes.toString(key.get()); // 这里的操作需要熟悉下 Result 的操作就行了,接下来就是业务逻辑了 try { // set value k.set("望咩望"); v.set("食屎啦你"); // context write to reducer context.write(k, v); } catch (Exception e) { e.printStackTrace(); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); } } public static void main(String[] args) throws Exception { // hbase configuration Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "a234-198.hadoop.com,a234-197.hadoop.com,a234-196.hadoop.com"); conf.set("hbase.zookeeper.property.clientPort", "2181"); // batch and caching Scan scan = new Scan(); scan.setCaching(10000); scan.setCacheBlocks(false); scan.setMaxVersions(1); // set hadoop speculative execution to false conf.setBoolean("mapred.map.tasks.speculative.execution", false); conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); // tmp index path tmpPath = args[0]; Path tmpIndexPath = new Path(tmpPath); FileSystem fs = FileSystem.get(conf); if(fs.exists(tmpIndexPath)) { // fs.delete(tmpIndexPath, true); // dangerous // _log.info("delete tmp index path : " + tmpIndexPath.getName()); _log.warn("The hdfs path ["+tmpPath+"] existed, please change a path."); return ; } // Job && conf Job job = new Job(conf, JOB_NAME); job.setJarByClass(TestHBaseAsSourceMapReduceMainClass.class); TableMapReduceUtil.initTableMapperJob(hbaseInputTble, scan, ExampleSourceMapper.class, Text.class, Text.class, job); // job.setReducerClass(MyReducer.class); // 自己的处理逻辑 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); FileOutputFormat.setOutputPath(job, tmpIndexPath); int success = job.waitForCompletion(true) ? 0 : 1; System.exit(success); } }
版权声明:本文为博主原创文章,未经博主允许不得转载。
原文:http://blog.csdn.net/u012185296/article/details/47279419
内容总结
以上是互联网集市为您收集整理的HBase - MapReduce - HBase 作为输入源的示例 | 那伊抹微笑全部内容,希望文章能够帮你解决HBase - MapReduce - HBase 作为输入源的示例 | 那伊抹微笑所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。