将Hive统计分析结果导入到MySQL数据库表中(三)——使用Hive UDF或GenericUDF
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了将Hive统计分析结果导入到MySQL数据库表中(三)——使用Hive UDF或GenericUDF,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含11776字,纯文字阅读大概需要17分钟。
内容图文
![将Hive统计分析结果导入到MySQL数据库表中(三)——使用Hive UDF或GenericUDF](/upload/InfoBanner/zyjiaocheng/447/dea709668f294236a89b4877db2c1d24.jpg)
一、使用UDF方式
使用UDF方式实现比较简单,只要继承UDF类,并重写evaluate方法即可
1、编写实现类
package com.gxnzx.hive.udf; import org.apache.hadoop.hive.ql.exec.UDF; import com.gxnzx.hive.util.DBSqlHelper; public class AnalyzeStatistics extends UDF{ public String evaluate(String clxxbh,String hphm){ //jtxx2数据库为目标数据库表 String sql="insert into jtxx2 values(?,?)"; //往数据库中插入记录 if(DBSqlHelper.addBatch(sql, clxxbh, hphm)){ return clxxbh+" SUCCESS "+hphm; }else{ return clxxbh+" faile "+hphm; } } }2、数据库操作方法
public static boolean addBatch(String sql,String clxxbh,String hphm){ boolean flag=false; try{ conn=DBSqlHelper.getConn(); //打开一个数据库连接 ps=(PreparedStatement) conn.prepareStatement(sql); ps.setString(1, clxxbh); ps.setString(2, hphm); System.out.println(ps.toString()); ps.execute(); flag=true; }catch(Exception e){ e.printStackTrace(); }finally{ try { ps.close(); } catch (SQLException e) { e.printStackTrace(); } } return flag; }3、使用eclipse将该项目包打成jar包导入到hive类环境中
hive> add jar hiveudf2.jar4、将MySQL JDBC驱动包导入hive 类环境中
hive> add jar /home/hadoopUser/cloud/hive/apache-hive-0.13.1-bin/lib/mysql-connector-java-5.1.18-bin.jar5、创建hive 临时函数
hive> create temporary function analyze as 'com.gxnzx.hive.udf.AnalyzeStatistics';6、测试
hive> select analyze(clxxbh,hphm) from transjtxx_hbase limit 10; Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks is set to 0 since there's no reduce operator Starting Job = job_1428394594787_0034, Tracking URL = http://secondmgt:8088/proxy/application_1428394594787_0034/ Kill Command = /home/hadoopUser/cloud/hadoop/programs/hadoop-2.2.0/bin/hadoop job -kill job_1428394594787_0034 Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0 2015-04-23 10:15:34,355 Stage-1 map = 0%, reduce = 0% 2015-04-23 10:15:51,032 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 7.14 sec MapReduce Total cumulative CPU time: 7 seconds 140 msec Ended Job = job_1428394594787_0034 MapReduce Jobs Launched: Job 0: Map: 1 Cumulative CPU: 7.14 sec HDFS Read: 256 HDFS Write: 532 SUCCESS Total MapReduce CPU Time Spent: 7 seconds 140 msec OK 32100017000000000220140317000015 SUCCESS 鲁Q58182 32100017000000000220140317000016 SUCCESS 鲁QV4662 32100017000000000220140317000019 SUCCESS 苏LL8128 32100017000000000220140317000020 SUCCESS 苏CAH367 32100017000000000220140317000023 SUCCESS 鲁Q7899W 32100017000000000220140317000029 SUCCESS 苏HN3819 32100017000000000220140317000038 SUCCESS 鲁C01576 32100017000000000220140317000044 SUCCESS 苏DT9178 32100017000000000220140317000049 SUCCESS 苏LZ1112 32100017000000000220140317000052 SUCCESS 苏K9795警 Time taken: 35.815 seconds, Fetched: 10 row(s)7、查看MySQL表中数据
mysql> select * from jtxx2; +----------------------------------+-------------+ | cllxbh | hphm | +----------------------------------+-------------+ | 32100017000000000220140317000015 | 鲁Q58182 | | 32100017000000000220140317000016 | 鲁QV4662 | | 32100017000000000220140317000019 | 苏LL8128 | | 32100017000000000220140317000020 | 苏CAH367 | | 32100017000000000220140317000023 | 鲁Q7899W | | 32100017000000000220140317000029 | 苏HN3819 | | 32100017000000000220140317000038 | 鲁C01576 | | 32100017000000000220140317000044 | 苏DT9178 | | 32100017000000000220140317000049 | 苏LZ1112 | | 32100017000000000220140317000052 | 苏K9795警 | +----------------------------------+-------------+ 10 rows in set (0.00 sec)二、使用GenericUDF方式
使用GenericUDF方式,实现比较复杂,我参考了别人的代码,如下:
1、编写调用函数
package com.gxnzx.hive.main; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.UDFType; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import org.apache.hadoop.io.IntWritable; /** * AnalyzeGenericUDFDBOutput is designed to output data directly from Hive to a * JDBC datastore. This UDF is useful for exporting small to medium summaries * that have a unique key. * * Due to the nature of hadoop, individual mappers, reducers or entire jobs can * fail. If a failure occurs a mapper or reducer may be retried. This UDF has no * way of detecting failures or rolling back a transaction. Consequently, you * should only only use this to export to a table with a unique key. The unique * key should safeguard against duplicate data. * * To use this UDF ,you should follow below three steps First of all, you need * to packag the UDF into the jar file; Secondly, you should use hive add jar * feature to add the UDF jar file to current class path; Thirdly, you should * use hive add jar feature to add JDBC Driver jar file to current class path; * Fourthly, you should use hive create temporary function feature to create an * temporary function belong to the UDF class. * * Examples for MySQL: hive> add jar udf.jar hive> add jar * mysql-connector-java-5.1.18-bin.jar hive> create temporary function * analyzedboutput as 'com.gxnzx.hive.main.AnalyzeGenericUDFDBOutput' */ @Description(name = "analyzedboutput", value = "_FUNC_(jdbctring,username,password,preparedstatement,[arguments])" + " - sends data to a jdbc driver", extended = "argument 0 is the JDBC connection string\n" + "argument 1 is the database user name\n" + "argument 2 is the database user's password\n" + "argument 3 is an SQL query to be used in the PreparedStatement\n" + "argument (4-n) The remaining arguments must be primitive and are " + "passed to the PreparedStatement object\n") @UDFType(deterministic = false) public class AnalyzeGenericUDFDBOutput extends GenericUDF { private static final Log LOG = LogFactory .getLog(AnalyzeGenericUDFDBOutput.class.getName()); private transient ObjectInspector[] argumentOI; private transient Connection connection = null; private String url; private String user; private String pass; private final IntWritable result = new IntWritable(-1); /** * @param arguments * argument 0 is the JDBC connection string argument 1 is the * user name argument 2 is the password argument 3 is an SQL * query to be used in the PreparedStatement argument (4-n) The * remaining arguments must be primitive and are passed to the * PreparedStatement object */ @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentTypeException { argumentOI = arguments; // this should be connection // url,username,password,query,column1[,columnn]* for (int i = 0; i < 4; i++) { if (arguments[i].getCategory() == ObjectInspector.Category.PRIMITIVE) { PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) arguments[i]); if (!(poi.getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING)) { throw new UDFArgumentTypeException(i, "The argument of function should be \"" + Constants.STRING_TYPE_NAME + "\", but \"" + arguments[i].getTypeName() + "\" is found"); } } } for (int i = 4; i < arguments.length; i++) { if (arguments[i].getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentTypeException(i, "The argument of function should be primative" + ", but \"" + arguments[i].getTypeName() + "\" is found"); } } return PrimitiveObjectInspectorFactory.writableIntObjectInspector; } /** * @return 0 on success -1 on failure */ @Override public Object evaluate(DeferredObject[] arguments) throws HiveException { url = ((StringObjectInspector) argumentOI[0]) .getPrimitiveJavaObject(arguments[0].get()); user = ((StringObjectInspector) argumentOI[1]) .getPrimitiveJavaObject(arguments[1].get()); pass = ((StringObjectInspector) argumentOI[2]) .getPrimitiveJavaObject(arguments[2].get()); try { connection = DriverManager.getConnection(url, user, pass); } catch (SQLException ex) { LOG.error("Driver loading or connection issue", ex); result.set(2); } if (connection != null) { try { PreparedStatement ps = connection .prepareStatement(((StringObjectInspector) argumentOI[3]) .getPrimitiveJavaObject(arguments[3].get())); for (int i = 4; i < arguments.length; ++i) { PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) argumentOI[i]); ps.setObject(i - 3, poi.getPrimitiveJavaObject(arguments[i].get())); } ps.execute(); ps.close(); result.set(0); } catch (SQLException e) { LOG.error("Underlying SQL exception", e); result.set(1); } finally { try { connection.close(); } catch (Exception ex) { LOG.error("Underlying SQL exception during close", ex); } } } return result; } @Override public String getDisplayString(String[] children) { StringBuilder sb = new StringBuilder(); sb.append("dboutput("); if (children.length > 0) { sb.append(children[0]); for (int i = 1; i < children.length; i++) { sb.append(","); sb.append(children[i]); } } sb.append(")"); return sb.toString(); } }2、将程序打成jar包,导入到Hive class path下
hive> add jar hiveGenericUdf.jar;3、添加mysql JDBC驱动 JAR文件
hive> add jar /home/hadoopUser/cloud/hive/apache-hive-0.13.1-bin/lib/mysql-connector-java-5.1.18-bin.jar4、创建临时函数
hive> create temporary function analyzedboutput as 'com.gxnzx.hive.main.AnalyzeGenericUDFDBOutput';5、测试
hive> select analyzedboutput('jdbc:mysql://192.168.2.133:3306/transport','hive','hive','insert into jtxx2 values(?,?)',clxxbh,hphm) from transjtxx_hbase limit 5; Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks is set to 0 since there's no reduce operator Starting Job = job_1428394594787_0043, Tracking URL = http://secondmgt:8088/proxy/application_1428394594787_0043/ Kill Command = /home/hadoopUser/cloud/hadoop/programs/hadoop-2.2.0/bin/hadoop job -kill job_1428394594787_0043 Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0 2015-04-23 22:01:46,205 Stage-1 map = 0%, reduce = 0% 2015-04-23 22:02:01,985 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 9.37 sec MapReduce Total cumulative CPU time: 9 seconds 370 msec Ended Job = job_1428394594787_0043 MapReduce Jobs Launched: Job 0: Map: 1 Cumulative CPU: 9.37 sec HDFS Read: 256 HDFS Write: 10 SUCCESS Total MapReduce CPU Time Spent: 9 seconds 370 msec OK 0 0 0 0 0 Time taken: 32.118 seconds, Fetched: 5 row(s)analyzedboutput六个参数分别表示:MySQL JDBC连接字符串、MYSQL数据用户名、密码、SQL插入语句、Hive表中对应的clxxbh,hphm两个查询字段。
6、查看MySQL数据库表数据
mysql> select * from jtxx2; Empty set (0.00 sec) mysql> select * from jtxx2; +----------------------------------+-----------+ | cllxbh | hphm | +----------------------------------+-----------+ | 32100017000000000220140317000015 | 鲁Q58182 | | 32100017000000000220140317000016 | 鲁QV4662 | | 32100017000000000220140317000019 | 苏LL8128 | | 32100017000000000220140317000020 | 苏CAH367 | | 32100017000000000220140317000023 | 鲁Q7899W | +----------------------------------+-----------+ 5 rows in set (0.00 sec)//此处结束
将Hive统计分析结果导入到MySQL数据库表中(三)——使用Hive UDF或GenericUDF
标签:hive mysql udf genericudf
本文系统来源:http://blog.csdn.net/niityzu/article/details/45227409
内容总结
以上是互联网集市为您收集整理的将Hive统计分析结果导入到MySQL数据库表中(三)——使用Hive UDF或GenericUDF全部内容,希望文章能够帮你解决将Hive统计分析结果导入到MySQL数据库表中(三)——使用Hive UDF或GenericUDF所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。