【python-找不到Pyspark模块】教程文章相关的互联网学习教程文章

python-如何使用来自另一个数据框的新值更新pyspark数据框?【代码】

我有两个Spark数据框: 数据框A:|col_1 | col_2 | ... | col_n | |val_1 | val_2 | ... | val_n |和数据框B:|col_1 | col_2 | ... | col_m | |val_1 | val_2 | ... | val_m |数据框B可以包含来自数据框A的重复行,更新行和新行.我想在spark中编写操作,在其中可以创建一个新数据框,其中包含数据框A的行以及数据框B的更新行和新行. 我从创建仅包含不可更新列的哈希列开始.这是唯一的ID.因此,假设col1和col2可以更改值(可以更新),但是...

python – 在pyspark中指定多个列数据类型更改为不同的数据类型【代码】

我有一个DataFrame(df),它包含50多列和不同类型的数据类型,例如df3.printSchema()CtpJobId: string (nullable = true)|-- TransformJobStateId: string (nullable = true)|-- LastError: string (nullable = true)|-- PriorityDate: string (nullable = true)|-- QueuedTime: string (nullable = true)|-- AccurateAsOf: string (nullable = true)|-- SentToDevice: string (nullable = true)|-- StartedAtDevice: string (nullabl...

python – PySpark,通过JSON文件导入模式【代码】

tbschema.json看起来像这样:[{"TICKET":"integer","TRANFERRED":"string","ACCOUNT":"STRING"}]我使用以下代码加载它>>> df2 = sqlContext.jsonFile("tbschema.json") >>> f2.schema StructType(List(StructField(ACCOUNT,StringType,true),StructField(TICKET,StringType,true),StructField(TRANFERRED,StringType,true))) >>> df2.printSchema() root|-- ACCOUNT: string (nullable = true)|-- TICKET: string (nullable = true...

python – PySpark – 将列表作为参数传递给UDF【代码】

我需要将列表传递给UDF,列表将确定距离的分数/类别.就目前而言,我很难将所有距离编码为第4分.a= spark.createDataFrame([("A", 20), ("B", 30), ("D", 80)],["Letter", "distances"])from pyspark.sql.functions import udf def cate(label, feature_list):if feature_list == 0:return label[4] label_list = ["Great", "Good", "OK", "Please Move", "Dead"] udf_score=udf(cate, StringType()) a.withColumn("category", udf_sc...

python – pyspark错误:AttributeError:’SparkSession’对象没有属性’parallelize’【代码】

我在Jupyter笔记本上使用pyspark.以下是Spark设置的方式:import findspark findspark.init(spark_home='/home/edamame/spark/spark-2.0.0-bin-spark-2.0.0-bin-hadoop2.6-hive', python_path='python2.7')import pysparkfrom pyspark.sql import *sc = pyspark.sql.SparkSession.builder.master("yarn-client").config("spark.executor.memory", "2g").config('spark.driver.memory', '1g').config('spark.driver.cores', '4').en...

python – PySpark:使用过滤函数后取一列的平均值【代码】

我使用以下代码来获得薪水大于某个阈值的人的平均年龄.dataframe.filter(df['salary'] > 100000).agg({"avg": "age"})列的年龄是数字(浮点数),但我仍然收到此错误.py4j.protocol.Py4JJavaError: An error occurred while calling o86.agg. : scala.MatchError: age (of class java.lang.String)您是否知道在不使用groupBy函数和SQL查询的情况下获得avg等的任何其他方法.解决方法:聚合函数应该是值,列名称是键:dataframe.filter(d...

python – PySpark用其他列中的值替换列中的null【代码】

我想用一个相邻列中的值替换一列中的空值,例如,如果我有A|B 0,1 2,null 3,null 4,2我希望它是:A|B 0,1 2,2 3,3 4,2试过df.na.fill(df.A,"B")但是没有用,它说值应该是float,int,long,string或dict 有任何想法吗?解决方法:最后找到了另一种选择:df.withColumn("B",coalesce(df.B,df.A))

python – PySpark DataFrame上的Sum运算在type为fine时给出TypeError【代码】

我在PySpark中有这样的DataFrame(这是take(3)的结果,数据帧非常大):sc = SparkContext() df = [Row(owner=u'u1', a_d=0.1), Row(owner=u'u2', a_d=0.0), Row(owner=u'u1', a_d=0.3)]同一所有者将拥有更多行.我需要做的是在分组之后将每个所有者的字段a_d的值相加为b = df.groupBy('owner').agg(sum('a_d').alias('a_d_sum'))但这会引发错误TypeError: unsupported operand type(s) for +: ‘int’ and ‘str’但是,架构包含双精度...

python – PySpark:StructField(…,…,False)总是返回`nullable = true`而不是`nullable = false`【代码】

我是PySpark的新手,面临一个奇怪的问题.我正在尝试在加载CSV数据集时将某些列设置为不可为空.我可以用一个非常小的数据集(test.csv)重现我的情况:col1,col2,col3 11,12,13 21,22,23 31,32,33 41,42,43 51,,53在第5行第2列有一个空值,我不想在我的DF中获得该行.我将所有字段设置为非可空(nullable = false)但我得到一个模式,其中所有三列都具有nullable = true.即使我将所有三列都设置为不可为空,也会发生这种情况!我正在运行最新...

python – PySpark:TypeError:condition应该是string或Column【代码】

我试图过滤基于如下的RDD:spark_df = sc.createDataFrame(pandas_df) spark_df.filter(lambda r: str(r['target']).startswith('good')) spark_df.take(5)但是得到了以下错误:TypeErrorTraceback (most recent call last) <ipython-input-8-86cfb363dd8b> in <module>()1 spark_df = sc.createDataFrame(pandas_df) ----> 2 spark_df.filter(lambda r: str(r['target']).startswith('good'))3 spark_df.take(5)/usr/local/spark-...

python – Pyspark从日期到字符串更改列的类型【代码】

我有以下数据帧:corr_temp_df [('vacationdate', 'date'),('valueE', 'string'),('valueD', 'string'),('valueC', 'string'),('valueB', 'string'),('valueA', 'string')]现在我想将列vacationdate的数据类型更改为String,这样数据帧也会采用这种新类型并覆盖所有条目的数据类型数据.例如.写完后:corr_temp_df.dtypes应该覆盖vacationdate的数据类型. 我已经使用了诸如cast,StringType或astype之类的函数,但我没有成功.你知道怎么...

python – pyspark解析固定宽度的文本文件【代码】

试图解析固定宽度的文本文件. 我的文本文件如下所示,我需要一个行id,日期,字符串和整数:00101292017you1234 00201302017 me5678我可以使用sc.textFile(path)将文本文件读取到RDD.我可以使用解析的RDD和模式createDataFrame.这是在这两个步骤之间的解析.解决方法:Spark的substr功能可以处理固定宽度的列,例如:df = spark.read.text("/tmp/sample.txt") df.select(df.value.substr(1,3).alias('id'),df.value.substr(4,8).alias('d...

python – PySpark.将Dataframe传递给pandas_udf并返回一个系列【代码】

我正在使用PySpark的新pandas_udf装饰器,我试图让它将多列作为输入并返回一个系列作为输入,但是,我得到一个TypeError:无效的参数 示例代码@pandas_udf(df.schema, PandasUDFType.SCALAR) def fun_function(df_in):df_in.loc[df_in['a'] < 0] = 0.0return (df_in['a'] - df_in['b']) / df_in['c']解决方法:A SCALAR udf期望pandas系列作为输入而不是数据帧.对于您的情况,没有必要使用udf.剪切后列a,b,c的直接计算应该起作用:impor...

python – Pyspark RDD ReduceByKey多功能【代码】

我有一个名为DF的PySpark DataFrame,带有(K,V)对.我想用ReduceByKey应用多个函数.例如,我有以下三个简单的功能:def sumFunc(a,b): return a+bdef maxFunc(a,b): return max(a,b)def minFunc(a,b): return min(a,b)当我只应用一个函数时,例如,以下三个函数:DF.reduceByKey(sumFunc) #works DF.reduceByKey(maxFunc) #works DF.reduceByKey(minFunc) #works但是,当我应用多个功能时,它不起作用,例如,以下操作不起作用.DF.reduce...

python – PySpark使用dict创建新列【代码】

使用Spark 1.6,我有一个Spark DataFrame列(名为let,比如col1),其值为A,B,C,DS,DNS,E,F,G和H,我想用值创建一个新列(比如col2)从下面的词典中,我该如何映射? (所以f.i.’A’需要映射到’S’等……)dict = {'A': 'S', 'B': 'S', 'C': 'S', 'DS': 'S', 'DNS': 'S', 'E': 'NS', 'F': 'NS', 'G': 'NS', 'H': 'NS'}解决方法:使用UDF的低效解决方案(版本无关):from pyspark.sql.types import StringType from pyspark.sql.functions impo...