【python – PySpark用其他列中的值替换列中的null】教程文章相关的互联网学习教程文章

Python循环函数问题与pyspark【代码】

我是相对较新的火花,当我尝试在导入pyspark函数后使用python的builtin round()函数时遇到了问题.这似乎与我如何导入pyspark函数有关,但我不确定区别是什么或为什么一种方式会导致问题而另一种方法不会. 预期行为:import pyspark.sql.functions print(round(3.14159265359,2)) >>> 3.14意外行为:from pyspark.sql.functions import * print(round(3.14159265359,2)) >>> ERRORAttributeError Tracebac...

在Python和PySpark中等效的R data.table滚动连接【代码】

有谁知道如何在PySpark中进行R data.table滚动连接? 借用Ben here的滚动连接的例子和很好的解释;sales<-data.table(saleID=c("S1","S2","S3","S4","S5"), saleDate=as.Date(c("2014-2-20","2014-5-1","2014-6-15","2014-7- 1","2014-12-31")))commercials<-data.table(commercialID=c("C1","C2","C3","C4"), commercialDate=as.Date(c("2014-1-1","2014-4-1","2014-7-1","2014-9-15")))setkey(sales,"saleDate") setkey(commercia...

python – 使用列表中的startswith过滤Pyspark【代码】

我有一个元素列表,可能会启动一些RDD记录的字符串.如果我有和元素列表是和否,它们应该匹配yes23和no3但不匹配35yes或41no.使用pyspark,我如何使用列表或元组中的任何元素的开头. DF的一个例子是:+-----+------+ |index| label| +-----+------+ | 1|yes342| | 2| 45yes| | 3| no123| | 4| 75no| +-----+------+当我尝试:Element_List = ['yes','no'] filter_DF = DF.where(DF.label.startswith(tuple(Element_List))...

python – Pyspark中的DarseVector转换为SparseVector【代码】

在PySpark 1.4.1中将SparseVector转换为DenseVector时出现意外错误:from pyspark.mllib.linalg import SparseVector, DenseVectorDenseVector(SparseVector(5, {4: 1.}))这在Ubuntu上正常运行,运行pyspark,返回:DenseVector([0.0, 0.0, 0.0, 0.0, 1.0])这导致RedHat出错,运行pyspark,返回:Traceback (most recent call last): File “”, line 1, inFile “/usr/lib/spark/python/pyspark/mllib/linalg.py”, line206, in ini...

python – Pyspark:TaskMemoryManager:无法分配页面:在错误分析中需要帮助【代码】

我在独立群集模式下运行spark作业时遇到这些错误. 我的火花工作旨在: >运行一些groupby,>数,>并加入以获得最终的df,然后加入df.toPandas().to_csv(). 输入数据集为524 Mb.我得到的错误:WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.多次重复上述后,再次出现新错误 > WARN NettyRpcEnv:忽略失败:java.util.concurrent.TimeoutException:无法在10秒内收到任何回复> org.apache.spark.rpc.Rp...

python – 根据列值是否在另一列中,向PySpark DataFrame添加列【代码】

我有一个PySpark DataFrame,其结构由[('u1', 1, [1 ,2, 3]), ('u1', 4, [1, 2, 3])].toDF('user', 'item', 'fav_items')我需要添加一个带有1或0的列,具体取决于’item’是否在’fav_items’中. 所以我想要[('u1', 1, [1 ,2, 3], 1), ('u1', 4, [1, 2, 3], 0)]我如何查找第二列到第三列来决定值以及如何添加它?解决方法:以下代码执行所请求的任务.定义了一个用户定义的函数,它接收两列DataFrame作为参数.因此,对于每一行,搜索项目是...

python – PySpark:添加一个新列,其中包含从列创建的元组【代码】

这里我创建了一个dateframe,如下所示,df = spark.createDataFrame([('a',5,'R','X'),('b',7,'G','S'),('c',8,'G','S')], ["Id","V1","V2","V3"])看起来像+---+---+---+---+ | Id| V1| V2| V3| +---+---+---+---+ | a| 5| R| X| | b| 7| G| S| | c| 8| G| S| +---+---+---+---+我想添加一个由V1,V2,V3组成的元组列. 结果应该是这样的+---+---+---+---+-------+ | Id| V1| V2| V3|V_tuple| +---+---+---+---+-------+ | ...

python – PySpark Dataframe:逗号点【代码】

我在浮点数中使用逗号导入数据,我想知道如何将’逗号转换为点.我正在使用pyspark数据帧,所以我尝试了这个:commaToDot = udf(lambda x : str(x).replace(',', '.'), FloatType())myData.withColumn('area',commaToDot(myData.area))它绝对不起作用.那么我们可以直接用spark替换数据框中的数据,还是应该转换为numpy类型还是其他什么? 谢谢 !解决方法:我想你错过了from pyspark.sql.types import FloatType正如Pushkr建议udf with ...

python – 在pyspark中按行连接字符串【代码】

我有一个pyspark数据帧DOCTOR | PATIENT JOHN | SAM JOHN | PETER JOHN | ROBIN BEN | ROSE BEN | GRAY并需要按行连接患者姓名,以便我得到如下输出:DOCTOR | PATIENT JOHN | SAM, PETER, ROBIN BEN | ROSE, GRAY有人可以帮助我在pyspark中创建这个数据帧吗? 提前致谢.解决方法:我能想到的最简单的方法是使用collect_listdf.groupby("col1").agg(f.concat_ws(", ", f.collect_list(df.col2)))

从PySpark DataFrame中的Python列表中删除元素【代码】

我试图从Python列表中删除一个元素:+---------------+ | sources| +---------------+ | [62]| | [7, 32]| | [62]| | [18, 36, 62]| |[7, 31, 36, 62]| | [7, 32, 62]|我希望能够从上面列表中的每个列表中删除元素rm.我写了一个函数,可以为列表列表做到这一点:def asdf(df, rm):temp = dffor n in range(len(df)):temp[n] = [x for x in df[n] if x != rm]return(temp)删除rm = 1:a = [[...

python – 从执行程序记录PySpark【代码】

在执行程序上使用pyspark访问Spark的log4j记录器的正确方法是什么? 在驱动程序中这样做很容易,但我似乎无法理解如何访问执行程序上的日志记录功能,以便我可以在本地登录并让YARN收集本地日志. 有没有办法访问本地记录器? 标准的日志记录过程是不够的,因为我无法从执行程序访问spark上下文.解决方法:您不能在执行程序上使用本地log4j记录器.执行者生成的Python工作者jvms没有与java的“回调”连接,他们只接收命令.但是有一种方法可...

python – 在PySpark中的cogroup【代码】

教程建议如下:>>> x = sc.parallelize([("a", 1), ("b", 4)]) >>> y = sc.parallelize([("a", 2)]) >>> sorted(x.cogroup(y).collect()) [('a', ([1], [2])), ('b', ([4], []))]但是,在运行时,我得到以下输出:('a', (<pyspark.resultiterable.ResultIterable object at 0x1d8b190>, <pyspark.resultiterable.ResultIterable object at 0x1d8b150>)) ('b', (<pyspark.resultiterable.ResultIterable object at 0x1d8b210>, <pyspa...

python – PySpark马尔可夫模型的算法/编码帮助【代码】

我需要一些帮助让我的大脑围绕设计一个(高效)markov链在spark(通过python).我尽可能地写了它,但是我提出的代码没有扩展.基本上对于各种地图阶段,我编写了自定义函数,它们可以很好地处理几千个序列,但是当我们得到时在20,000(并且我有一些高达800k)的东西慢慢爬行. 对于那些不熟悉马尔科夫模型的人来说,这就是它的要点. 这是我的数据..此时我在RDD中得到了实际数据(没有标题).ID, SEQ 500, HNL, LNH, MLH, HML我们看一下元组中的序列...

python – 在pyspark中积累数据帧的最有效方法是什么?【代码】

我有一个数据帧(或者可能是任何RDD),在一个众所周知的架构中包含数百万行,如下所示:Key | FeatureA | FeatureB -------------------------- U1 | 0 | 1 U2 | 1 | 1我需要从磁盘加载十几个其他数据集,其中包含相同数量的键的不同功能.有些数据集最多可达十几列.想像:Key | FeatureC | FeatureD | FeatureE ------------------------------------- U1 | 0 | 0 | 1Key | F...

python – 使用窗口函数时出现pyspark错误(Spark 2.1.0报告问题列未找到)?【代码】

更新: 我创建了以下JIRA问题:https://issues.apache.org/jira/browse/SPARK-20086 状态:已修复! (周末!这太快了!) UPDATE2: 对于版本2.1.1,2.2.0,此问题在https://github.com/apache/spark/pull/17432中得到修复.所以我在http://people.apache.org/~pwendell/spark-nightly/的夜间版本中获得了更新的火花版本如果您使用< = 2.1.0,您可能仍会遇到此问题.原帖:使用pyspark窗口函数时出错.这是一些示例代码: imp...

替换 - 相关标签