spark实现大数据join操作的两个算法,map-side join和reduce-side join
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了spark实现大数据join操作的两个算法,map-side join和reduce-side join,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含3587字,纯文字阅读大概需要6分钟。
内容图文
![spark实现大数据join操作的两个算法,map-side join和reduce-side join](/upload/InfoBanner/zyjiaocheng/850/c836b1cc8b434d0f9cbd426c5f1881ef.jpg)
Map-Side Join
Map-side Join使用场景是一个大表和一个小表的连接操作,其中,“小表”是指文件足够小,可以加载到内存中。该算法可以将join算子执行在Map端,无需经历shuffle和reduce等阶段,因此效率非常高。
在Hadoop MapReduce中, map-side join是借助DistributedCache实现的。DistributedCache可以帮我们将小文件分发到各个节点的Task工作目录下,这样,我们只需在程序中将文件加载到内存中(比如保存到Map数据结构中),然后借助Mapper的迭代机制,遍历另一个大表中的每一条记录,并查找是否在小表中,如果在则输出,否则跳过。
在Apache Spark中,同样存在类似于DistributedCache的功能,称为“广播变量”(Broadcast variable)。其实现原理与DistributedCache非常类似,但提供了更多的数据/文件广播算法,包括高效的P2P算法,该算法在节点数目非常多的场景下,效率远远好于DistributedCache这种基于HDFS共享存储的方式。使用MapReduce DistributedCache时,用户需要显示地使用File API编写程序从本地读取小表数据,而Spark则不用,它借助Scala语言强大的函数闭包特性,可以隐藏数据/文件广播过程,让用户编写程序更加简单。
适用于一个数据集小,另一个数据集大的情况
package spark.examples.join
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._
object SparkMapsideJoin {
def main(args: Array[String]) {
val conf = new SparkConf()
conf.setAppName("SparkMapsideJoin")
conf.setMaster("local[3]")
conf.set("spark.shuffle.manager", "sort");
val sc = new SparkContext(conf)
//val table1 = sc.textFile(args(1))
//val table2 = sc.textFile(args(2))
val table1 = sc.parallelize(List("k1,v11", "k2,v12", "k3,v13"))
val table2 = sc.parallelize(List("k1,v21", "k4,v24", "k3,v23"))
// table1 is smaller, so broadcast it as a map<String, String>
val pairs = table1.map { x =>
val pos = x.indexOf(',')
(x.substring(0, pos), x.substring(pos + 1))
}.collectAsMap
val broadCastMap = sc.broadcast(pairs) //save table1 as map, and broadcast it
// table2 join table1 in map side
val result = table2.map { x =>
val pos = x.indexOf(',')
(x.substring(0, pos), x.substring(pos + 1))
}.mapPartitions({ iter =>
val m = broadCastMap.value
for {
(key, value) <- iter
if (m.contains(key))
} yield (key, (value, m.get(key).getOrElse("")))
})
val output = "d:/wordcount-" + System.currentTimeMillis() ;
result.saveAsTextFile(output) //save result to local file or HDFS
}
}
Reduce Side Join
当两个文件/目录中的数据非常大,难以将某一个存放到内存中时,Reduce-side Join是一种解决思路。该算法需要通过Map和Reduce两个阶段完成,在Map阶段,将key相同的记录划分给同一个Reduce Task(需标记每条记录的来源,便于在Reduce阶段合并),在Reduce阶段,对key相同的进行合并。
Spark提供了Join算子,可以直接通过该算子实现reduce-side join,但要求RDD中的记录必须是pair,即RDD[KEY, VALUE],
适用于两个join表数据量都很大的情况
package spark.examples.join
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._
object SparkReducesideJoin {
def main(args: Array[String]) {
val conf = new SparkConf()
conf.setAppName("SparkMapsideJoin")
conf.setMaster("local[3]")
conf.set("spark.shuffle.manager", "sort");
val sc = new SparkContext(conf)
val table1 = sc.parallelize(List("k1,v11", "k2,v12", "k3,v13"))
val table2 = sc.parallelize(List("k1,v21", "k4,v24", "k3,v23"))
//table1 and table 2 are both very large
val pairs1 = table1.map { x =>
val pos = x.indexOf(',')
(x.substring(0, pos), x.substring(pos + 1))
}
val pairs2 = table2.map { x =>
val pos = x.indexOf(',')
(x.substring(0, pos), x.substring(pos + 1))
}
val result = pairs1.join(pairs2)
val output = "d:/wordcount-" + System.currentTimeMillis();
result.saveAsTextFile(output) //save result to local file or HDFS
}
}
内容总结
以上是互联网集市为您收集整理的spark实现大数据join操作的两个算法,map-side join和reduce-side join全部内容,希望文章能够帮你解决spark实现大数据join操作的两个算法,map-side join和reduce-side join所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。