【python – PySpark用其他列中的值替换列中的null】教程文章相关的互联网学习教程文章

python – 用户定义的函数打破了pyspark数据帧【代码】

我的火花版是1.3,我正在使用pyspark. 我有一个名为df的大型数据框.from pyspark import SQLContext sqlContext = SQLContext(sc) df = sqlContext.parquetFile("events.parquet")然后,我选择数据帧的几列,并尝试计算行数.这很好用.df3 = df.select("start", "end", "mrt") print(type(df3)) print(df3.count())然后我应用用户定义的函数将其中一个列从字符串转换为数字,这也可以正常工作from pyspark.sql.functions import UserDef...

python – Pyspark:如何在数据帧中复制行n次?【代码】

我有一个像这样的数据帧,如果列n大于1,我想复制该行n次:A B n 1 2 1 2 9 1 3 8 2 4 1 1 5 3 3 并像这样转变:A B n 1 2 1 2 9 1 3 8 2 3 8 2 4 1 1 5 3 3 5 3 3 5 3 3 我想我应该使用爆炸,但我不明白它是如何工作的……谢谢解决方法:explode函数为给定数组或映射中的每个元素返回一个新行. 利用此函数的一种方法是使用udf为每行创...

python – 将数据帧保存到pyspark中本地驱动器上的JSON文件【代码】

我有一个数据帧,我试图使用pyspark 1.4保存为JSON文件,但它似乎没有工作.当我给它指向目录的路径时,它返回一个错误,表明它已经存在.我基于documentation的假设是它会在你给它的路径中保存一个json文件.df.write.json("C:\Users\username")指定一个带有名称的目录不会产生任何文件,并给出错误“java.io.IOException:Mkdirs无法创建文件:/ C:Users / username / test / _temporary / …. etc.创建名称test的目录,其中包含几个带有...

在AWS EMR v4.0.0上使用Pyspark配置Ipython / Jupyter笔记本【代码】

我正在尝试使用Apache Spark 1.4.0的IPython笔记本.我按照下面的2教程设置了我的配置 Installing Ipython notebook with pyspark 1.4 on AWS 和 Configuring IPython notebook support for Pyspark fisnish配置后,以下是相关文件中的几个代码: 1.ipython_notebook_config.pyc=get_config() c.NotebookApp.ip = '*' c.NotebookApp.open_browser =False c.NotebookApp.port = 81932.00-pyspark-setup.pyimport os import sys spark_...

python – 访问PySpark中的count列【代码】

code:mydf = testDF.groupBy(testDF.word).count() mydf.show()output:+-----------+-----+ | word|count| +-----------+-----+ | she| 2208| | mothers| 93| | poet| 59| | moving| 18| | active| 6| | foot| 169|我想根据字数按降序排序这个数据框.code:countDF = mydf.orderBy(mydf.count.desc()) countDF.show()Error:AttributeError: 'function' object has no attribute 'des...

python – 如何合并pyspark和pandas数据帧【代码】

我有一个非常大的pyspark数据帧和一个较小的pandas数据帧,我读如下:df1 = spark.read.csv("/user/me/data1/") df2 = pd.read_csv("data2.csv")两个数据帧都包括标记为“A”和“B”的列.我想创建另一个pyspark数据帧,只包含来自df1的那些行,其中“A”和“B”列中的条目出现在df2中具有相同名称的那些列中.即使用df2的列“A”和“B”过滤df1.Normally I think this would be a join (implemented with merge) buthow do you join a ...

python – 使用pyspark脚本从bigquery加载表到spark集群【代码】

我有一个在bigquery中加载的数据表,我想通过pyspark .py文件在我的spark集群中导入它. 我在Dataproc + BigQuery examples – any available?中看到有一种方法可以使用scala在spark集群中加载一个bigquery表,但有没有办法在pyspark脚本中执行它?解决方法:这来自@MattJ在this question.这是一个连接到Spark中的BigQuery并执行字数统计的示例.import json import pyspark sc = pyspark.SparkContext()hadoopConf=sc._jsc.hadoopConfi...

python – 将GraphFrames ShortestPath Map转换为PySpark中的DataFrame行【代码】

我试图找到最有效的方法从GraphFrames函数shortestPaths获取Map输出,并将每个顶点的距离映射平铺为新DataFrame中的各个行.通过将距离列拉入字典然后从那里转换为pandas数据帧然后转换回Spark数据帧,我已经能够非常笨拙地做到这一点,但我知道必须有更好的方法.from graphframes import *v = sqlContext.createDataFrame([("a", "Alice", 34),("b", "Bob", 36),("c", "Charlie", 30), ], ["id", "name", "age"])# Create an Edge Dat...

python – 什么导致’unicode’对象在pyspark中没有属性’toordinal’?【代码】

我得到了这个错误,但我没有导致它.我的python代码在pyspark中运行.堆栈跟踪很长,我只展示其中的一些.所有的堆栈跟踪都没有显示我的代码,所以我不知道在哪里寻找.导致此错误的原因是什么?/usr/hdp/2.4.2.0-258/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)306 raise Py4JJavaError(307 "An error occurred while calli...

python – PySpark中具有多个列的日期算法【代码】

我正在尝试使用PySpark数据框中的多个列进行一些中等复杂的日期算术.基本上,我有一个名为number的列,表示我需要过滤的created_at时间戳之后的周数.在PostgreSQL中你可以乘以interval based on the value in a column,但我似乎无法使用SQL API或Python API弄清楚如何在PySpark中执行此操作.这里的任何帮助将不胜感激!import datetime from pyspark.sql import SQLContext from pyspark.sql import Row from pyspark import SparkCo...

python – pyspark将两个rdd合并在一起【代码】

我有两个rdd,它们都是groupby的结果,看起来像:[(u'1', [u'0']), (u'3', [u'1']), (u'2', [u'0']), (u'4', [u'1'])]和[(u'1', [u'3', u'4']), (u'0', [u'1', u'2'])]如何合并这两个并获得以下内容:[(u'1', [u'0',u'3', u'4']]), (u'3', [u'1']), (u'2', [u'0']), (u'4', [u'1']),(u'0', [u'1', u'2'])]我尝试了join命令,但是这并没有给我我想要的结果.任何帮助深表感谢.解决方法:我解决了它:rdd2.union(rdd1).reduceByKey(lambda...

python – ARRAY_CONTAINS在pyspark中的多个值【代码】

我正在使用pyspark.sql.dataframe.DataFrame.我想基于多个变量而不是单个变量{val}来过滤堆栈的行.我正在使用Python 2 Jupyter笔记本.目前,我做了以下事情:stack = hiveContext.sql("""SELECT * FROM db.tableWHERE col_1 != '' """)stack.show() +---+-------+-------+---------+ | id| col_1 | . . . | list | +---+-------+-------+---------+ | 1 | 524 | . . . |[1, 2] | | 2 | 765 | . . . |[2, 3] | . . . | 9 ...

python – Pyspark Dataframe Imputations – 根据指定条件用Column Mean替换未知和缺失值【代码】

给定Spark数据帧,我想基于该列的非缺失值和非未知值计算列平均值.然后我想采取这个意思并用它来代替列的缺失&未知的价值. 例如,假设我正在使用: >名为df的数据帧,其中每个记录代表一个个体,所有列都是整数或数字>名为年龄的列(每个记录的年龄)>名为missing_age的列(如果该个人没有年龄,则等于1,否则为0)>名为unknown_age的列(如果该个人的年龄未知,则等于1,否则为0) 然后我可以计算出这个均值,如下所示.calc_mean = df.where((col...

python spark pyspark——回归预测习题整理【代码】【图】

特征量选区:age,enducation,race,sex。目标值:income。 from pyspark.mllib.linalg import Vectors,Vector from pyspark import SparkContext from pyspark.ml.regression import LinearRegression from pyspark.ml.feature import VectorAssembler from pyspark.python.pyspark.shell import spark from pyspark.ml.feature import StringIndexer from pyspark.sql.types import * from pyspark.sql.functions import *#1.读...

集群上如何跑pyspark程序--Running Spark Python Applications【代码】

Running Spark Python Applications Accessing Spark with Java and Scala offers many advantages: platform independence by running inside the JVM, self-contained packaging of code and its dependencies into JAR files, and higher performance because Spark itself runs in the JVM. You lose these advantages when using the Spark Python API. Managing dependencies and making them available for Python jobs on ...

替换 - 相关标签