首页 / MYSQL / spark_sql_DataFromMysql_InferringSchema_SparkSqlSchema_SparkSqlToMysql_SparkStreaming_Flume_Poll
spark_sql_DataFromMysql_InferringSchema_SparkSqlSchema_SparkSqlToMysql_SparkStreaming_Flume_Poll
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了spark_sql_DataFromMysql_InferringSchema_SparkSqlSchema_SparkSqlToMysql_SparkStreaming_Flume_Poll,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含8432字,纯文字阅读大概需要13分钟。
内容图文
![spark_sql_DataFromMysql_InferringSchema_SparkSqlSchema_SparkSqlToMysql_SparkStreaming_Flume_Poll](/upload/InfoBanner/zyjiaocheng/1313/4c17f5d2ff3c49149c90c93f33e347f8.jpg)
![技术分享图片](/img/jia.gif)
![技术分享图片](/img/jian.gif)
1 package com.spark_sql 2 3 import java.util.Properties 4 import org.apache.spark.sql.{DataFrame, SparkSession} 5 6 object DataFromMysql { 7 def main(args: Array[String]): Unit = { 8//todo:1、创建sparkSession对象 9 val spark: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate() 10//todo:2、创建Properties对象,设置连接mysql的用户名和密码11 val properties: Properties = new Properties() 12 properties.setProperty("user", "root") 13 properties.setProperty("password", "123") 14//todo:3、读取mysql中的数据15 val mysqlDF: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/spark?serverTimezone=UTC", "student", properties) 16//todo:4、显示mysql中表的数据17 mysqlDF.show() 18 spark.stop() 19 } 2021 }
1 package com.spark_sql 2 3 import java.util.Properties 4 import org.apache.spark.sql.{DataFrame, SparkSession} 5 6 object DataFromMysql { 7 def main(args: Array[String]): Unit = { 8//todo:1、创建sparkSession对象 9 val spark: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate() 10//todo:2、创建Properties对象,设置连接mysql的用户名和密码11 val properties: Properties = new Properties() 12 properties.setProperty("user", "root") 13 properties.setProperty("password", "123") 14//todo:3、读取mysql中的数据15 val mysqlDF: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/spark?serverTimezone=UTC", "student", properties) 16//todo:4、显示mysql中表的数据17 mysqlDF.show() 18 spark.stop() 19 } 2021 }
1 package com.spark_sql 2 3 import org.apache.spark.SparkContext 4 import org.apache.spark.rdd.RDD 5 import org.apache.spark.sql.{DataFrame, SparkSession} 6 7 object InferringSchema { 8 def main(args: Array[String]): Unit = { 9//todo:1、构建sparkSession 指定appName和master的地址10 val spark: SparkSession = SparkSession.builder().appName("InferringSchema").master("local[2]").getOrCreate() 11//todo:2、从sparkSession获取sparkContext对象12 val sc: SparkContext = spark.sparkContext 13 sc.setLogLevel("WARN") //设置日志输出级别 14//todo:3、加载数据15 val dataRDD: RDD[String] = sc.textFile("D:\\IDEA_Maven\\day08\\src\\main\\resources\\Person.txt") 16//todo:4、切分每一行记录17 val lineArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" ")) 18//todo:5、将RDD与Person类关联19 val personRDD: RDD[Person] = lineArrayRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) 20//todo:6、创建dataFrame,需要导入隐式转换21import spark.implicits._ 22 val personDF: DataFrame = personRDD.toDF() 2324//todo-------------------DSL语法操作 start-------------- 25//1、显示DataFrame的数据,默认显示20行26 personDF.show() 27//2、显示DataFrame的schema信息28 personDF.printSchema() 29//3、显示DataFrame记录数30 println(personDF.count()) 31//4、显示DataFrame的所有字段32 personDF.columns.foreach(println) 33//5、取出DataFrame的第一行记录34 println(personDF.head()) 35//6、显示DataFrame中name字段的所有值36 personDF.select("name").show() 37//7、过滤出DataFrame中年龄大于30的记录38 personDF.filter($"age" > 30).show() 39//8、统计DataFrame中年龄大于30的人数40 println(personDF.filter($"age" > 30).count()) 41//9、统计DataFrame中按照年龄进行分组,求每个组的人数42 personDF.groupBy("age").count().show() 43//todo-------------------DSL语法操作 end------------- 4445//todo--------------------SQL操作风格 start----------- 46//todo:将DataFrame注册成表47 personDF.createOrReplaceTempView("t_person") 48//todo:传入sql语句,进行操作4950 spark.sql("select * from t_person").show() 5152 spark.sql("select * from t_person where name=‘zhangsan‘").show() 5354 spark.sql("select * from t_person order by age desc").show() 55//todo--------------------SQL操作风格 end-------------565758 sc.stop() 59 } 60} 6162caseclass Person (val id:Int,val name: String, val age: Int)
1 package com.spark_sql 2 3 import org.apache.spark.SparkContext 4 import org.apache.spark.rdd.RDD 5 import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} 6 import org.apache.spark.sql.{DataFrame, Row, SparkSession} 7 8 object SparkSqlSchema { 9 def main(args: Array[String]): Unit = { 10//todo:1、创建SparkSession,指定appName和master11 val spark: SparkSession = SparkSession.builder().appName("SparkSqlSchema").master("local[2]").getOrCreate() 12//todo:2、获取sparkContext对象13 val sc: SparkContext = spark.sparkContext 14//todo:3、加载数据15 val dataRDD: RDD[String] = sc.textFile("D:\\IDEA_Maven\\day08\\src\\main\\resources\\Person.txt") 16//todo:4、切分每一行17 val dataArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" ")) 18//todo:5、加载数据到Row对象中19 val personRDD: RDD[Row] = dataArrayRDD.map(x => Row(x(0).toInt, x(1), x(2).toInt)) 20//todo:6、创建schema21 val schema: StructType = StructType(Seq( 22 StructField("id", IntegerType, false), 23 StructField("name", StringType, false), 24 StructField("age", IntegerType, false) 25 )) 2627//todo:7、利用personRDD与schema创建DataFrame28 val personDF: DataFrame = spark.createDataFrame(personRDD, schema) 2930//todo:8、DSL操作显示DataFrame的数据结果31 personDF.show() 3233//todo:9、将DataFrame注册成表34 personDF.createOrReplaceTempView("t_person") 3536//todo:10、sql语句操作37 spark.sql("select * from t_person").show() 3839 spark.sql("select count(*) from t_person").show() 404142 sc.stop() 43 } 44 }
1 package com.spark_sql 2 3 import java.util.Properties 4 import org.apache.spark.rdd.RDD 5 import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession} 6 7 object SparkSqlToMysql { 8 def main(args: Array[String]): Unit = { 9//val spark: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate() 10//todo:1、创建sparkSession对象11 val spark: SparkSession = SparkSession.builder().appName("SparkSqlToMysql").master("local[2]").getOrCreate() 12//todo:2、读取数据13 val data: RDD[String] = spark.sparkContext.textFile("D:\\IDEA_Maven\\day08\\src\\main\\resources\\Person.txt") 14//todo:3、切分每一行,15 val arrRDD: RDD[Array[String]] = data.map(_.split(" ")) 16//todo:4、RDD关联Student17 val studentRDD: RDD[student01] = arrRDD.map(x => student01(x(0).toInt, x(1), x(2).toInt)) 18//todo:导入隐式转换19import spark.implicits._ 20//todo:5、将RDD转换成DataFrame21 val studentDF: DataFrame = studentRDD.toDF() 22//todo:6、将DataFrame注册成表23 studentDF.createOrReplaceTempView("student") 24//todo:7、操作student表 ,按照年龄进行降序排列25 val resultDF: DataFrame = spark.sql("select * from student order by age desc") 2627//todo:8、把结果保存在mysql表中 28//todo:创建Properties对象,配置连接mysql的用户名和密码29 val prop = new Properties() 30 prop.setProperty("user", "root") 31 prop.setProperty("password", "123") 3233 resultDF.write.jdbc("jdbc:mysql://localhost:3306/spark?serverTimezone=UTC", "student01", prop) 3435//todo:写入mysql时,可以配置插入mode,overwrite覆盖,append追加,ignore忽略,error默认表存在报错 36//resultDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.200.150:3306/spark","student",prop)37 spark.stop() 38 } 39} 4041//todo:创建样例类Student42caseclass student01(id: Int, name: String, age: Int)
1 package com.SparkStreaming_Flume_Poll 2 3 import java.net.InetSocketAddress 4 import org.apache.spark.storage.StorageLevel 5 import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} 6 import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent} 7 import org.apache.spark.streaming.{Seconds, StreamingContext} 8 import org.apache.spark.{SparkConf, SparkContext} 9 10 object SparkStreaming_Flume_Poll { 11 12 // newValues 表示当前批次汇总成的(word,1)中相同单词的所有的1 13 // runningCount 历史的所有相同key的value总和 14 def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { 15 val newCount = runningCount.getOrElse(0) + newValues.sum 16 Some(newCount) 17 } 181920 def main(args: Array[String]): Unit = { 21//配置sparkConf参数22 val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Poll").setMaster("local[2]") 23//构建sparkContext对象24 val sc: SparkContext = new SparkContext(sparkConf) 25//设置日志级别26 sc.setLogLevel("WARN") 27//构建StreamingContext对象,每个批处理的时间间隔28 val scc: StreamingContext = new StreamingContext(sc, Seconds(5)) 29//设置checkpoint30 scc.checkpoint("./") 31//设置flume的地址,可以设置多台32 val address = Seq(new InetSocketAddress("192.168.107.144", 8888)) 33// 从flume中拉取数据34 val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(scc, address, StorageLevel.MEMORY_AND_DISK) 3536//获取flume中数据,数据存在event的body中,转化为String37 val lineStream: DStream[String] = flumeStream.map(x => new String(x.event.getBody.array())) 38//实现单词汇总39 val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(updateFunction) 4041 result.print() 42 scc.start() 43 scc.awaitTermination() 44 } 454647 }
原文:https://www.cnblogs.com/xjqi/p/12817140.html
内容总结
以上是互联网集市为您收集整理的spark_sql_DataFromMysql_InferringSchema_SparkSqlSchema_SparkSqlToMysql_SparkStreaming_Flume_Poll全部内容,希望文章能够帮你解决spark_sql_DataFromMysql_InferringSchema_SparkSqlSchema_SparkSqlToMysql_SparkStreaming_Flume_Poll所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。