java – spark中的cache()是否会改变RDD的状态或创建一个新的状态?
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了java – spark中的cache()是否会改变RDD的状态或创建一个新的状态?,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含2256字,纯文字阅读大概需要4分钟。
内容图文
这个问题是我之前有一个问题的后续问题What happens if I cache the same RDD twice in Spark.
在RDD上调用cache()时,RDD的状态是否发生了变化(并且返回的RDD只是为了易于使用)或者创建了一个新的RDD包装现有的RDD?
以下代码中会发生什么:
// Init
JavaRDD<String> a = ... // some initialise and calculation functions.
JavaRDD<String> b = a.cache();
JavaRDD<String> c = b.cache();
// Case 1, will 'a' be calculated twice in this case
// because it's before the cache layer:
a.saveAsTextFile(somePath);
a.saveAsTextFile(somePath);
// Case 2, will the data of the calculation of 'a'
// be cached in the memory twice in this case
// (once as 'b' and once as 'c'):
c.saveAsTextFile(somePath);
解决方法:
When calling cache() on a RDD, does the state of the RDD changed (and
the returned RDD is just this for ease of use) or a new RDD is created
the wrapped the existing one
/**
* Mark this RDD for persisting using the specified level.
*
* @param newLevel the target storage level
* @param allowOverride whether to override any existing level with the new one
*/
private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
// TODO: Handle changes of StorageLevel
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
throw new UnsupportedOperationException(
"Cannot change storage level of an RDD after it was already assigned a level")
}
// If this is the first time this RDD is marked for persisting, register it
// with the SparkContext for cleanups and accounting. Do this only once.
if (storageLevel == StorageLevel.NONE) {
sc.cleaner.foreach(_.registerRDDForCleanup(this))
sc.persistRDD(this)
}
storageLevel = newLevel
this
}
缓存不会对所述RDD造成任何副作用.如果它已标记为持久性,则不会发生任何事情.如果不是,唯一的副作用是将其注册到SparkContext,其中副作用不在RDD本身,而是在上下文中.
编辑:
看看JavaRDD.cache,似乎底层调用将导致另一个JavaRDD的分配:
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): JavaRDD[T] = wrapRDD(rdd.cache())
wrapRDD调用JavaRDD.fromRDD的地方:
object JavaRDD {
implicit def fromRDD[T: ClassTag](rdd: RDD[T]): JavaRDD[T] = new JavaRDD[T](rdd)
implicit def toRDD[T](rdd: JavaRDD[T]): RDD[T] = rdd.rdd
}
这将导致新的JavaRDD的分配.也就是说,RDD [T]的内部实例将保持不变.
内容总结
以上是互联网集市为您收集整理的java – spark中的cache()是否会改变RDD的状态或创建一个新的状态?全部内容,希望文章能够帮你解决java – spark中的cache()是否会改变RDD的状态或创建一个新的状态?所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。