saprk sql 整合 hbase 通过phoenix 关系映射 jdbc
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了saprk sql 整合 hbase 通过phoenix 关系映射 jdbc,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含3252字,纯文字阅读大概需要5分钟。
内容图文
![saprk sql 整合 hbase 通过phoenix 关系映射 jdbc](/upload/InfoBanner/zyjiaocheng/918/cb3209ace9d4402b8448d8dd471ff530.jpg)
首先说明遇到的坑
要在phoenxi中建表 , 只有在Phoenix中建表才鞥映射到hbase ,因为spark是通过Phoenix的jdbc
插入数据的 不是吧数据放进hbase而是把数据放进Phoenix
所以在hbase中建表是找不到的
我是通过sparksql插入的
sparksql是datafranme
所以使用Phoenix比较有优势
Phoenix建表
CREATE TABLE record (hospitalid CHAR(80) PRIMARY KEY ,hcount BIGINT,havgcost DOUBLE,havgreimburse DOUBLE,havgreproportion DOUBLE,havgday DOUBLE,havgfinproportion DOUBLE)
代码
val insertTable: String = "record"
val DB_PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver"
val DB_PHOENIX_URL = "jdbc:phoenix:master"
spark.sql(" SELECT \nhospitalid \n,SUM( CASE flag WHEN 1 THEN 1 ELSE 0 END ) hcount \n,SUM(allcost)/COUNT(l.recordid) havgcost\n,SUM(recost)/SUM( CASE flag WHEN 1 THEN 1 ELSE 0 END ) havgreimburse \n,SUM(recost/allcost)/COUNT(l.recordid) havgreproportion \n, SUM(datediff(starttime,endtime))/COUNT(l.recordid) havgday\n, SUM(CASE isrecovery WHEN 1 THEN 1 ELSE 0 END)/COUNT(l.recordid) havgfinproportion\n FROM \nrecord l , reimburse r\nWHERE\nl.recordid=r.recordid\nAND\nflag = 1\n\nGROUP BY\n hospitalid ")
.write.format("org.apache.phoenix.spark")//要以什么格式插入
.mode(SaveMode.Overwrite)//插入模式
.option("driver", DB_PHOENIX_DRIVER)//Phoenix驱动
.option("table", insertTable)//表名
.option("zkUrl", DB_PHOENIX_URL).save()//zookeeper的地址 url
整了半天才研究出来
还有一种方法是通过makerdd这种方法较为困难
需要把datafram转换为makerdd
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("INSERTHBase").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val conf = HBaseConfiguration.create()
//设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
conf.set("hbase.zookeeper.quorum","master")
//设置zookeeper连接端口,默认2181
conf.set("hbase.zookeeper.property.clientPort", "2181")
val tablename = "test1"
//初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!
val jobConf = new JobConf(conf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))
val rdd = indataRDD.map(_.split(',')).map{arr=>{
/*一个Put对象就是一行记录,在构造方法中指定主键
* 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换
* Put.add方法接收三个参数:列族,列名,数据
*/
val put = new Put(Bytes.toBytes(arr(0).toInt))
put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
//转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset
(new ImmutableBytesWritable, put)
}}
rdd.saveAsHadoopDataset(jobConf)
sc.stop()
}
踩坑不少
附上诗词 简直感人
妾发初覆额,折花门前剧。
郎骑竹马来,绕床弄青梅。
同居长干里,两小无嫌猜,
十四为君妇,羞颜未尝开。
低头向暗壁,千唤不一回。
十五始展眉,愿同尘与灰。
常存抱柱信,岂上望夫台。
十六君远行,瞿塘滟滪堆。
五月不可触,猿声天上哀。
门前迟行迹,一一生绿苔。
苔深不能扫,落叶秋风早。
八月蝴蝶来,双飞西园草。
感此伤妾心,坐愁红颜老。
早晚下三巴,预将书报家。
相迎不道远,直至长风沙。
内容总结
以上是互联网集市为您收集整理的saprk sql 整合 hbase 通过phoenix 关系映射 jdbc全部内容,希望文章能够帮你解决saprk sql 整合 hbase 通过phoenix 关系映射 jdbc所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。