首页 / PYTHON / python-Spark笛卡尔积
python-Spark笛卡尔积
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了python-Spark笛卡尔积,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含5940字,纯文字阅读大概需要9分钟。
内容图文
![python-Spark笛卡尔积](/upload/InfoBanner/zyjiaocheng/694/a73cc47d5e614dea8d98c62e76525030.jpg)
我必须比较坐标才能获得距离.因此,我用sc.textFile()加载数据并制成笛卡尔积.文本文件中大约有2.000.000行,因此需要比较2.000.000 x 2.000.000坐标.
我用大约2.000的坐标测试了代码,并且在几秒钟内运行良好.但是使用大文件似乎在某个时刻停止了,我不知道为什么.该代码如下所示:
def concat(x,y):
if(isinstance(y, list)&(isinstance(x,list))):
return x + y
if(isinstance(x,list)&isinstance(y,tuple)):
return x + [y]
if(isinstance(x,tuple)&isinstance(y,list)):
return [x] + y
else: return [x,y]
def haversian_dist(tuple):
lat1 = float(tuple[0][0])
lat2 = float(tuple[1][0])
lon1 = float(tuple[0][2])
lon2 = float(tuple[1][2])
p = 0.017453292519943295
a = 0.5 - cos((lat2 - lat1) * p)/2 + cos(lat1 * p) * cos(lat2 * p) * (1 - cos((lon2 - lon1) * p)) / 2
print(tuple[0][1])
return (int(float(tuple[0][1])), (int(float(tuple[1][1])),12742 * asin(sqrt(a))))
def sort_val(tuple):
dtype = [("globalid", int),("distance",float)]
a = np.array(tuple[1], dtype=dtype)
sorted_mins = np.sort(a, order="distance",kind="mergesort")
return (tuple[0], sorted_mins)
def calc_matrix(sc, path, rangeval, savepath, name):
data = sc.textFile(path)
data = data.map(lambda x: x.split(";"))
data = data.repartition(100).cache()
data.collect()
matrix = data.cartesian(data)
values = matrix.map(haversian_dist)
values = values.reduceByKey(concat)
values = values.map(sort_val)
values = values.map(lambda x: (x[0], x[1][1:int(rangeval)].tolist()))
values = values.map(lambda x: (x[0], [y[0] for y in x[1]]))
dicti = values.collectAsMap()
hp.save_pickle(dicti, savepath, name)
甚至包含大约15.000条目的文件也不起作用.我知道笛卡尔会导致O(n ^ 2)运行时.但是火花不应该解决吗?还是有问题?唯一的起点是一条错误消息,但我不知道它是否与实际问题有关:
16/08/06 22:21:12 WARN TaskSetManager: Lost task 15.0 in stage 1.0 (TID 16, hlb0004): java.net.SocketException: Daten?bergabe unterbrochen (broken pipe)
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:440)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
16/08/06 22:21:12 INFO TaskSetManager: Starting task 15.1 in stage 1.0 (TID 17, hlb0004, partition 15,PROCESS_LOCAL, 2408 bytes)
16/08/06 22:21:12 WARN TaskSetManager: Lost task 7.0 in stage 1.0 (TID 8, hlb0004): java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:209)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:139)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
解决方法:
您在代码中使用了data.collect(),该代码基本上将所有数据调用到一台计算机中.根据该计算机上的内存,可能无法很好地容纳2,000,000行数据.
此外,我尝试通过执行联接而不是使用笛卡尔来减少要执行的计算数量. (请注意,我只是使用numpy生成了随机数,此处的格式可能与您使用的格式不同.不过,主要思想是相同的.)
import numpy as np
from numpy import arcsin, cos, sqrt
# suppose my data consists of latlong pairs
# we will use the indices for pairing up values
data = sc.parallelize(np.random.rand(10,2)).zipWithIndex()
data = data.map(lambda (val, idx): (idx, val))
# generate pairs (e.g. if i have 3 pairs with indices [0,1,2],
# I only have to compute for distances of pairs (0,1), (0,2) & (1,2)
idxs = range(data.count())
indices = sc.parallelize([(i,j) for i in idxs for j in idxs if i < j])
# haversian func (i took the liberty of editing some parts of it)
def haversian_dist(latlong1, latlong2):
lat1, lon1 = latlong1
lat2, lon2 = latlong2
p = 0.017453292519943295
def hav(theta): return (1 - cos(p * theta))/2
a = hav(lat2 - lat1) + cos(p * lat1)*cos(p * lat2)*hav(lon2 - lon1)
return 12742 * arcsin(sqrt(a))
joined1 = indices.join(data).map(lambda (i, (j, val)): (j, (i, val)))
joined2 = joined1.join(data).map(lambda (j, ((i, latlong1), latlong2)): ((i,j), (latlong1, latlong2))
haversianRDD = joined2.mapValues(lambda (x, y): haversian_dist(x, y))
内容总结
以上是互联网集市为您收集整理的python-Spark笛卡尔积全部内容,希望文章能够帮你解决python-Spark笛卡尔积所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。