【python – PySpark用其他列中的值替换列中的null】教程文章相关的互联网学习教程文章

python – 在PySpark中使用Apache Spark数据帧删除重音的最佳方法是什么?【代码】

我需要删除西班牙语中的重音和来自不同数据集的其他语言的重音. 我已经使用此post中提供的代码执行了一个功能,删除了特殊的重音符号.问题是函数很慢,因为它使用UDF.我只是想知道我是否可以提高函数的性能以在更短的时间内获得结果,因为这对小型数据帧有好处,但对大型数据帧则不行. 提前致谢. 在这里代码,您将能够按照它呈现的方式运行它:# Importing sql types from pyspark.sql.types import StringType, IntegerType, StructTyp...

python – 如何在pySpark数据帧中添加行ID [复制]【代码】

参见英文答案 > Primary keys with Apache Spark 3个我有一个csv文件;我在pyspark中转换为DataFrame(df);经过一番改造;我想在df中添加一列;这应该是简单的行id(从0或1开始到N). 我在rdd中转换了df并使用“zipwithindex”.我将生成的rdd转换回df.这种方法有效,但它产生了250k的任务,并且需要花费大量的时间来执行.我想知道是否还有其他方法可以减少运行时间. 以下是我的代码片段;我正在处理的cs...

python – 如何在PySpark中处理数据之前在所有Spark工作程序上运行函数?【代码】

我正在使用YARN在集群中运行Spark Streaming任务.集群中的每个节点都运行多个spark worker.在流式传输开始之前,我想在群集中所有节点上的所有工作程序上执行“设置”功能. 流式传输任务将传入的邮件分类为垃圾邮件或非垃圾邮件,但在此之前,它需要将最新的预先训练的模型从HDFS下载到本地磁盘,如此伪代码示例:def fetch_models():if hadoop.version > local.version:hadoop.download()我在SO上看过以下示例:sc.parallelize().map(...

python – spark-submit和pyspark有什么区别?【代码】

如果我启动pyspark然后运行此命令:import my_script; spark = my_script.Sparker(sc); spark.collapse('./data/')一切都很好.但是,如果我尝试通过命令行和spark-submit执行相同的操作,则会收到错误消息:Command: /usr/local/spark/bin/spark-submit my_script.py collapse ./data/File "/usr/local/spark/python/pyspark/rdd.py", line 352, in funcreturn f(iterator)File "/usr/local/spark/python/pyspark/rdd.py", line 1576...

python – 与pyspark中的scala.util.Try相同的是什么?【代码】

我有一个糟糕的HTTPD access_log,只想跳过“糟糕”的行. 在scala中,这很简单:import scala.util.Tryval log = sc.textFile("access_log")log.map(_.split(' ')).map(a => Try(a(8))).filter(_.isSuccess).map(_.get).map(code => (code,1)).reduceByKey(_ + _).collect()对于python我通过使用“lambda”表示法明确定义函数来获得以下解决方案:log = sc.textFile("access_log")def wrapException(a):try:return a[8]except:return...

python – 在PySpark ML中创建自定义Transformer【代码】

我是Spark SQL DataFrames和ML的新手(PySpark).如何创建服装标记器,例如删除停用词并使用nltk中的某些库?我可以延长默认值吗? 谢谢.解决方法:Can I extend the default one?并不是的.默认Tokenizer是pyspark.ml.wrapper.JavaTransformer的子类,与pyspark.ml.feature中的其他transfromers和估算器一样,将实际处理委托给其Scala对应项.由于您想使用Python,您应该直接扩展pyspark.ml.pipeline.Transformer.import nltkfrom pyspark ...

python – 如何在调试模式下调用PySpark?【代码】

我使用Apache Spark 1.4设置了IntelliJ IDEA. 我希望能够将调试点添加到我的Spark Python脚本中,以便我可以轻松地调试它们. 我目前正在运行这一点Python来初始化spark过程proc = subprocess.Popen([SPARK_SUBMIT_PATH, scriptFile, inputFile], shell=SHELL_OUTPUT, stdout=subprocess.PIPE)if VERBOSE:print proc.stdout.read()print proc.stderr.read()当spark-submit最终调用myFirstSparkScript.py时,调试模式未被启用并且正常执...

在PySpark中的GroupedData上应用UDF(具有正常运行的python示例)【代码】

我有这个在pandas数据帧中本地运行的python代码:df_result = pd.DataFrame(df.groupby('A').apply(lambda x: myFunction(zip(x.B, x.C), x.name))我想在PySpark中运行它,但在处理pyspark.sql.group.GroupedData对象时遇到问题. 我尝试过以下方法:sparkDF.groupby('A').agg(myFunction(zip('B', 'C'), 'A')) 返回KeyError: 'A'我推测因为’A’不再是一列而我找不到x.name的等价物. 然后sparkDF.groupby('A').map(lambda row: Row(...

python – 在PySpark中爆炸【代码】

我想从包含单词列表的DataFrame转换为DataFrame,每个单词都在自己的行中. 如何在DataFrame中的列上进行爆炸? 下面是我的一些尝试示例,您可以在其中取消注释每个代码行并获取以下注释中列出的错误.我在Python 2.7中使用PySpark和Spark 1.6.1.from pyspark.sql.functions import split, explode DF = sqlContext.createDataFrame([('cat \n\n elephant rat \n rat cat', )], ['word']) print 'Dataset:' DF.show() print '\n\n Tryi...

python – 当列表值与Pyspark数据帧中的列值的子字符串匹配时,填充新列【代码】

我在Pyspark有一个数据框,如下所示df.show()+---+----------------------+ | id| con| +---+----------------------+ | 3| mac,mac pro| | 1| iphone5,iphone| | 1| android,android phone| | 1| windows,windows pc| | 1| spy camera,spy camera| | 2| camera,| | 3| cctv,cctv| | 2| apple iphone,iphone| | 3| ,spy camera| +---+------...

python – 有没有办法在PySpark中读取文本文件时控制分区数量【代码】

我在PySpark中使用以下命令读取文本文件rating_data_raw = sc.textFile("/<path_to_csv_file>.csv")有没有办法指定RDD rating_data_raw应分成的分区数?我想指定大量的分区以实现更高的并发性.解决方法:正如其他用户所说,您可以在读取文件时设置将创建的最小分区数,方法是在可选参数minPartitions of textFile中进行设置.rating_data_raw = sc.textFile("/<path_to_csv_file>.csv", minPartitions=128)另一种实现此目的的方法是使用...

python – pyspark reduce方法的歧义【代码】

def reduce(self, f): #1 parameterdef func(iterator): iterator = iter(iterator) try: initial = next(iterator) except StopIteration: return yield reduce(f, iterator, initial) #3 parameter vals = self.mapPartitions(func).collect() if vals: return reduce(f, vals) #2 parameter raise ValueErro...

python – 如何在“pyspark”中迭代特定结果的列表列表【代码】

我是PySpark的新手,我想知道如何做到这一点.任何帮助赞赏. 我有这个RDD例如:[[u'merit', u'release', u'appearance'], [u'www.bonsai.wbff.org'], [u'whitepages.com'], [u'the', u'childs', u'wonderland', u'company'], [u'lottery']]我试着:[[(u'merit',1), (u'release',1), (u'appearance',1)], [(u'www.bonsai.wbff.org',1)], [(u'whitepages.com',1)], [(u'the',1), (u'childs',1), (u'wonderland',1), (u'company',1)], [...

python – 是否有可能在Pyspark中继承DataFrame?【代码】

Pyspark的文档显示了从sqlContext,sqlContext.read()和各种其他方法构造的DataFrame. (见https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html) 是否有可能将Dataframe子类化并独立实例化它?我想为基本DataFrame类添加方法和功能.解决方法:这真的取决于你的目标. >从技术上讲,这是可能的. pyspark.sql.DataFrame只是一个普通的Python类.如果需要,您可以扩展它或猴子补丁.from pyspark.sql import DataFrameclass Dat...

python – 在Apache Spark中使用pyspark进行数据帧转置【代码】

我有一个具有以下结构的数据帧df:+-----+-----+-----+-------+ | s |col_1|col_2|col_...| +-----+-------------------+ | f1 | 0.0| 0.6| ... | | f2 | 0.6| 0.7| ... | | f3 | 0.5| 0.9| ... | | ...| ...| ...| ... |我想计算这个数据帧的转置,所以它看起来像+-------+-----+-----+-------+------+ | s | f1 | f2 | f3 | ...| +-------+-------------------+------+ |col_1 | 0.0| 0.6| ...

替换 - 相关标签