JDBC的ResultSet游标转spark的DataFrame,数据类型的映射以TeraData数据库为例
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了JDBC的ResultSet游标转spark的DataFrame,数据类型的映射以TeraData数据库为例,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含3259字,纯文字阅读大概需要5分钟。
内容图文
1.编写给ResultSet添加spark的schema成员及DF(DataFrame)成员
/* spark、sc对象因为是全局的,没有导入,需自行定义 teradata的字段类型转换成spark的数据类型 */ import java.sql.{ResultSet, ResultSetMetaData} import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Row} object addDataframeMember { trait ResultSetMetaDataToSchema { def columnCount: Int def schema: StructType } implicit def wrapResultSetMetaData(rsmd: ResultSetMetaData) = { new ResultSetMetaDataToSchema { def columnCount = rsmd.getColumnCount def schema = { def tdCovert(tdDpeStr: String, precision: Int = 0, scale: Int = 0, className: String = ""): DataType = { tdDpeStr match { case "BYTEINT" => IntegerType case "SMALLINT" => ShortType case "INTEGER" => IntegerType case "BIGINT" => LongType case "FLOAT" => DoubleType case "CHAR" => CharType(precision) case "DECIMAL" => DecimalType(precision, scale) case "VARCHAR" => StringType case "BYTE" => ByteType case "VARBYTE" => ByteType case "DATE" => DateType case "TIME" => TimestampType case "TIMESTAMP" => TimestampType case "CLOB" => StringType case "BLOB" => BinaryType case "Structured UDT" => ObjectType(Class.forName(className)) } } def col2StructField(rsmd: ResultSetMetaData, i: Int): StructField = StructField(rsmd.getColumnName(i), tdCovert(rsmd.getColumnTypeName(i), rsmd.getPrecision(i), rsmd.getScale(i), rsmd.getColumnClassName(i)), rsmd.isNullable(i) match { case 1 => true case 0 => false }).withComment(rsmd.getColumnLabel(i)) def rsmd2Schema(rsmd: ResultSetMetaData): StructType = (1 to columnCount).map(col2StructField(rsmd, _)).foldLeft(new StructType)((s: StructType, i: StructField) => s.add(i)) rsmd2Schema(rsmd) } } } trait ResultSetToDF { def schema: StructType def DF: DataFrame } implicit def wrapResultSet(rs: ResultSet) = { def rsmd = rs.getMetaData def toList[T](retrieve: ResultSet => T): List[T] = Iterator.continually((rs.next(), rs)).takeWhile(_._1).map(r => r._2).map(retrieve).toList def rsContent2Row(rs: ResultSet): Row = Row.fromSeq(Array.tabulate[Object](rsmd.columnCount)(i => rs.getObject(i + 1)).toSeq) new ResultSetToDF { def schema = rsmd.schema def DF = spark.createDataFrame(sc.parallelize(toList(rsContent2Row)), schema) } } }
2.正常基于JDBC连接并且获得数据集游标
import java.sql.{Connection, DriverManager} /* 获取TeraData的连接 */ val (dialect, host, user, passwd, database, charset) = ("teradata", "ip", "user", "password", "database", "ASCII") val tdConf = collection.immutable.Map( "driver" -> "com.ncr.teradata.TeraDriver", "uri" -> s"jdbc:$dialect://$host/CLIENT_CHARSET=EUC_CN,TMODE=TERA,COLUMN_NAME=ON,CHARSET=ASCII,database=$database", "username" -> user, "password" -> passwd ) def getTeraConn: Connection = { Class.forName(tdConf("driver")) DriverManager.getConnection(tdConf("uri"), tdConf("username"), tdConf("password")) } val sql = "SELECT TOP 10 * FROM xxx" var conn = getTeraConn val stmt = conn.createStatement() val rs = stmt.executeQuery(sql)
3.导入隐式转换,调用成员
import addDataframeMember.wrapResultSet rs.DF.show()
JDBC的ResultSet游标转spark的DataFrame,数据类型的映射以TeraData数据库为例
标签:har lse typename 编写 nec scala etc sci long
本文系统来源:https://www.cnblogs.com/shld/p/11803503.html
内容总结
以上是互联网集市为您收集整理的JDBC的ResultSet游标转spark的DataFrame,数据类型的映射以TeraData数据库为例全部内容,希望文章能够帮你解决JDBC的ResultSet游标转spark的DataFrame,数据类型的映射以TeraData数据库为例所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。