python – Spark – 嵌套的RDD操作
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了python – Spark – 嵌套的RDD操作,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含2692字,纯文字阅读大概需要4分钟。
内容图文
![python – Spark – 嵌套的RDD操作](/upload/InfoBanner/zyjiaocheng/795/924ac4a320bb4488a8d922f63737dc41.jpg)
我有两个RDD说
rdd1 =
id | created | destroyed | price
1 | 1 | 2 | 10
2 | 1 | 5 | 11
3 | 2 | 3 | 11
4 | 3 | 4 | 12
5 | 3 | 5 | 11
rdd2 =
[1,2,3,4,5] # lets call these value as timestamps (ts)
rdd2基本上是使用range(intial_value,end_value,interval)生成的.这里的参数可以有所不同.大小可以与rdd1相同或不同.我的想法是使用过滤criertia基于rdd2的值从rdd1获取记录到rdd2(来自rdd1的记录可以在获取时重复,如输出中所示)
过滤标准rdd1.created< = ts< rdd1.destroyed) 预期产量:
ts | prices
1 | 10,11 # i.e. for ids 1,2 of rdd1
2 | 11,11 # ids 2,3
3 | 11,12,11 # ids 2,4,5
4 | 11,11 # ids 2,5
现在我想根据使用RDD2键的某些条件来过滤RDD1. (如上所述)并返回连接RDD2的密钥和RDD1的过滤结果的结果
所以我这样做:
rdd2.map(lambda x : somefilterfunction(x, rdd1))
def somefilterfunction(x, rdd1):
filtered_rdd1 = rdd1.filter(rdd1[1] <= x).filter(rdd1[2] > x)
prices = filtered_rdd1.map(lambda x : x[3])
res = prices.collect()
return (x, list(res))
我得到:
Exception: It appears that you are attempting to broadcast an RDD or
reference an RDD from an action or transformation. RDD transformations
and actions can only be invoked by the driver, not inside of other
transformations; for example, rdd1.map(lambda x: rdd2.values.count() *
x) is invalid because the values transformation and count action
cannot be performed inside of the rdd1.map transformation. For more
information, see SPARK-5063.
我尝试使用groupBy,但是因为rdd1的元素可以一次又一次地重复,与我理解的分组相比,它只会在某个特定的插槽中使用rdd1的每个元素一次.
唯一的方法是使用普通的for循环并进行过滤并最终加入所有内容.
有什么建议?
解决方法:
由于您使用常规范围,因此没有理由创建第二个RDD.您只需为每条记录生成特定范围内的值:
from __future__ import division # Required only for Python 2.x
from math import ceil
from itertools import takewhile
rdd1 = sc.parallelize([
(1, 1, 2, 10),
(2, 1, 5, 11),
(3, 2, 3, 11),
(4, 3, 4, 12),
(5, 3, 5, 11),
])
def generate(start, end, step):
def _generate(id, created, destroyed, price):
# Smallest ts >= created
start_for_record = int(ceil((created - start) / step) * step + start)
rng = takewhile(
lambda x: created <= x < destroyed,
xrange(start_for_record, end, step)) # In Python 3.x use range
for i in rng:
yield i, price
return _generate
result = rdd1.flatMap(lambda x: generate(1, 6, 1)(*x)).groupByKey()
结果:
result.mapValues(list).collect()
## [(1, [10, 11]), (2, [11, 11]), (3, [11, 12, 11]), (4, [11, 11])]
内容总结
以上是互联网集市为您收集整理的python – Spark – 嵌套的RDD操作全部内容,希望文章能够帮你解决python – Spark – 嵌套的RDD操作所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。