大数据笔记(三十一)——SparkStreaming详细介绍
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了大数据笔记(三十一)——SparkStreaming详细介绍,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含7886字,纯文字阅读大概需要12分钟。
内容图文
![大数据笔记(三十一)——SparkStreaming详细介绍](/upload/InfoBanner/zyjiaocheng/1177/8f6badf63d754da199326c15fdd21c21.jpg)
Spark Streaming: Spark用于处理流式数据的模块,类似Storm
核心:DStream(离散流),就是一个RDD
============================================
一、Spark Streaming基础
1、什么是Spark Streaming?
(*)Spark Streaming makes it easy to build scalable fault-tolerant streaming applications.
(*)常见的流式处理框架
(1)Apache Storm
(2)Spark Streaming
(3)JStorm:阿里巴巴
(4)Flink:可以很好的管理内存
(*)离线计算和流式计算各自的特点
典型代表 数据的采集 数据源(结果)
离线计算: MR、Spark Core Sqoop 批量操作
流式计算: Storm等等 Flume(Kafka) 实时性
(*)典型的流式计算的框架:参考Hadoop的课件:P91
2、简介Spark Streaming内部结构
3、演示Demo:NetworkWordCount 处理的是流式数据
(*)工具:netcat
(*)文档:http://spark.apache.org/docs/latest/streaming-programming-guide.html#a-quick-example
(*)步骤:启动两个窗口
第一个窗口中:
bin/run-example streaming.NetworkWordCount bigdata11 9999
第二个窗口中:启动消息服务器(先启动)
nc -l -p 9999
注意:如果要演示成功,保证虚拟机的CPU的核数至少2以上
运行:
4、开发自己的NetworkWordCount程序
1 package main.scala.demo 2 3 import org.apache.spark.SparkConf 4 import org.apache.spark.storage.StorageLevel 5 import org.apache.spark.streaming.{Seconds, StreamingContext} 6 7 /** 8 * Created by YOGA on 2018/2/27. 9 */ 10 object MyNetworkWordCount { 11 def main(args: Array[String]) { 12 // 核心:通过StreamingContext对象,去创建一个DStream 13 // DStream从外部接收数据(使用的是Linux上的netcat工具) 14 15 // 创建一个SparkConf对象 16 // local[2]:相当于有两个工作线程,一个接收一个发送 17 val sparkconf = new SparkConf() 18 .setAppName("MyNetworkWordCount") 19 .setMaster("local[2]") 2021//创建StreamContext,表示每隔三秒采集一次数据22 val ssc = new StreamingContext(sparkconf,Seconds(3)) 2324//创建DStream,看成一个输入流 25//IP,端口,缓存到硬盘2627 val lines = ssc.socketTextStream("192.168.153.11",1234,StorageLevel.MEMORY_AND_DISK_SER) 2829//执行WordCount30 val words = lines.flatMap(_.split(" ")) 3132//使用transform完成同样的计数,相当于map操作 33//val wordPair = words.transform(x=>x.map(x=>(x,1))) 34//val wordCount = wordPair.reduceByKey(_+_)35 val wordCount = words.map((_,1)).reduceByKey(_+_) 3637/*38 * 参数一:执行运算 39 * 参数二:窗口的大小 40 * 参数三:创建滑动的距离 41 * 42 * 例子:每9秒钟,把过去30秒的数据进行wordcount 43 * 注意:第二个参数 第三个参数 必须是采样频率的整数倍 44 * */45//val wordCount = words.map((_,1)).reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(30),Seconds(9)) 46//输出47 wordCount.print() 4849//启动StreamingContext50 ssc.start() 5152//等待计算完成53 ssc.awaitTermination() 54 } 5556 }
二、Spark Streaming进阶
bin/spark-shell --master spark://bigdata11:7077
1、类:StreamingContext(类似:Spark Context、SQLContext)
上下文对象
创建的方式:
(1)通过SparkConf来创建
val sparkconf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]") //创建StreamingContext,表示每隔3秒采集一次数据 val ssc = new StreamingContext(sparkconf,Seconds(3))
(2)通过SparkContext对象来创建
import org.apache.spark.streaming.{Seconds, StreamingContext} val ssc = new StreamingContext(sc,Seconds(3))
说明:
(1)setMaster("local[2]")
(2)当创建StreamingContext对象,内部会创建一个SparkContext对象
(3)当StreamingContext开始执行,不能添加新的任务
(4)同一个时刻上,JVM只能有一个活动的StreamingContext
2、DStream(离散流):把连续的数据流,变成不连续的离散流,表现形式就是RDD
简单来说:把连续的变成不连续的
操作:Transformation和Action
? (*)transform(func)
? 通过RDD-to-RDD函数作用于源DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD
改写上面WordCount例子,屏蔽35行
// 使用transform完成同样的计数,相当于map操作 33 val wordPair = words.transform(x=>x.map(x=>(x,1))) 34 val wordCount = wordPair.reduceByKey(_+_)
(*)?updateStateByKey(func)
可以进行累加操作。方法:设置检查点,定义一个累加功能的函数
1 package main.scala.demo 2 3 import org.apache.spark.SparkConf 4 import org.apache.spark.storage.StorageLevel 5 import org.apache.spark.streaming.{Seconds, StreamingContext} 6 7 /** 8 * Created by YOGA on 2018/2/28. 9 */ 10 object MyTotalNetworkWordCount { 11 def main(args: Array[String]) { 12 val sparkconf = new SparkConf() 13 .setAppName("MyNetworkWordCount") 14 .setMaster("local[2]") 1516//创建StreamContext,表示每隔三秒采集一次数据17 val ssc = new StreamingContext(sparkconf,Seconds(3)) 1819//注意:如果累计,在执行计算的时候,需要保持之前的状态信息 20//设置检查点21 ssc.checkpoint("hdfs://192.168.153.11:9000/spark/checkpoint0228") 2223//创建DStream,看成一个输入流24 val lines = ssc.socketTextStream("192.168.153.11",1234,StorageLevel.MEMORY_AND_DISK_SER) 2526//执行WordCount27 val words = lines.flatMap(_.split(" ")) 2829//每个单词记一次数30 val pairs = words.map((_,1)) 3132//定义一个函数,进行累加 33//参数:1、当前的值 2、之前的值34 val addFunc = (currentValues:Seq[Int],preValues:Option[Int]) =>{ 35//得到当前的值36 val currentCount = currentValues.sum 3738//先得到之前的值39 val preCount = preValues.getOrElse(0) 4041//返回累加结果42 Some(currentCount + preCount) 43 } 4445//统计每个单词出现的频率:累计46 val totalCount = pairs.updateStateByKey(addFunc) 47 totalCount.print() 4849//启动任务50 ssc.start() 51 ssc.awaitTermination() 5253 } 54 }
3、窗口操作
例子:每9秒钟,把过去30秒的数据进行WordCount
注释上面的代码35行,放开下面一行代码
/* 38 * 参数一:执行运算 39 * 参数二:窗口的大小 40 * 参数三:创建滑动的距离 41 * 42 * 例子:每9秒钟,把过去30秒的数据进行wordcount 43 * 注意:第二个参数 第三个参数 必须是采样频率的整数倍,采样频率3s 44 * */ 45 val wordCount = words.map((_,1)).reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(30),Seconds(9))
4、输入和输出
(1)输入:接收器接收外部数据源的数据
(*)基本数据源:文件流、RDD队列流、Socket流
(*)高级数据源:Kafka、Flume
文件流:监听一个目录,当目录下的文件发生变化的时候,将变化的数据读入DStream
package main.scala.demo import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by YOGA on 2018/2/28. */ object MyFileDStream { def main(args: Array[String]) { // 创建一个SparkConf对象 // local[2]:相当于有两个工作线程,一个接收一个发送 val sparkconf = new SparkConf() .setAppName("MyNetworkWordCount") .setMaster("local[2]") //创建StreamContext,表示每隔三秒采集一次数据 val ssc = new StreamingContext(sparkconf,Seconds(3)) //监听一个目录,当目录下的文件发生变化的时候,将变化的数据读入DStream val lines = ssc.textFileStream("D:\\temp\\aaa") lines.print() ssc.start() ssc.awaitTermination() } }
RDD队列流queueStream
:定义一个for循环,生成RDD放入队列
package main.scala.demo import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable import scala.collection.mutable.Queue import org.apache.spark.rdd.RDD /** * Created by YOGA on 2018/2/28. */ object MyRDDQueueDStream { def main(args: Array[String]){ val sparkconf = new SparkConf() .setAppName("MyNetworkWordCount") .setMaster("local[2]") //创建StreamContext,表示每隔三秒采集一次数据 val ssc = new StreamingContext(sparkconf,Seconds(3)) //创建一个队列,把生成RDD放入队列 val rddQueue = new mutable.Queue[RDD[Int]]() //初始化for(i <- 1 to 3){ rddQueue += ssc.sparkContext.makeRDD(1 to 10) //让线程睡几秒 Thread.sleep(3000) } //创建一个RDD的DStream val inputStream = ssc.queueStream(rddQueue) //处理:乘以10 val result = inputStream.map(x=> (x,x*10)) result.print() ssc.start() ssc.awaitTermination() } }
运行:
(2)输出操作
5、集成DataFrame和SQL: 使用SparkSQL的方式处理流式数据
把RDD转换成DataFrame,并生成临时表,然后就可以进行SQL查询
1 package main.scala.demo 2 3 import org.apache.spark.SparkConf 4 import org.apache.spark.sql.SparkSession 5 import org.apache.spark.storage.StorageLevel 6 import org.apache.spark.streaming.{Seconds, StreamingContext} 7 8 /** 9 * Created by YOGA on 2018/2/28. 10 */ 11 object MyNetWorkWordCountBySQL { 12 def main(args: Array[String]) { 13 // 核心:通过StreamingContext对象,去创建一个DStream 14 // DStream从外部接收数据(使用的是Linux上的netcat工具) 15 16 // 创建一个SparkConf对象 17 // local[2]:相当于有两个工作线程,一个接收一个发送 18 val sparkconf = new SparkConf() 19 .setAppName("MyNetworkWordCount") 20 .setMaster("local[2]") 2122//创建StreamContext,表示每隔三秒采集一次数据23 val ssc = new StreamingContext(sparkconf,Seconds(3)) 2425//创建DStream,看成一个输入流26 val lines = ssc.socketTextStream("192.168.153.11",1234,StorageLevel.MEMORY_AND_DISK_SER) 2728//得到的所有单词29 val words = lines.flatMap(_.split(" ")) 30//val wordPair = words.transform(x=> x.map(x=>(x,1))) 31//val wordCount = wordPair.reduceByKey(_+_) 3233//使用sparkSQL处理Spark Streaming的数据34 words.foreachRDD(rdd =>{ 35//使用SparkSession来创建36 val spark = SparkSession.builder() 37 .config(rdd.sparkContext.getConf) 38 .getOrCreate() 3940//需要把RDD转成一个DataFrame41import spark.implicits._ 42 val wordCountDF = rdd.toDF("word") 4344//注册成一个表45 wordCountDF.createOrReplaceTempView("words") 4647//执行SQL48 val result = spark.sql("select * from words group by word") 49 result.show() 5051 Thread.sleep(5000) 52 }) 535455//启动StreamingContext56 ssc.start() 5758//等待计算完成59 ssc.awaitTermination() 60 } 61 }
原文:https://www.cnblogs.com/lingluo2017/p/8708600.html
内容总结
以上是互联网集市为您收集整理的大数据笔记(三十一)——SparkStreaming详细介绍全部内容,希望文章能够帮你解决大数据笔记(三十一)——SparkStreaming详细介绍所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。