【python-PySpark广播变量加入】教程文章相关的互联网学习教程文章

python-PySpark数字窗口分组依据【代码】

我希望能够按步长设置Spark组,而不是单个值.有什么火花类似于PySpark 2.x的用于数字(非日期)值的窗口函数? 类似于以下内容:sqlContext = SQLContext(sc) df = sqlContext.createDataFrame([10, 11, 12, 13], "integer").toDF("foo") res = df.groupBy(window("foo", step=2, start=10)).count()解决方法:您可以重用时间戳一并以秒为单位表示参数.翻滚:from pyspark.sql.functions import col, windowdf.withColumn("window",win...

python-PySpark:如何判断数据框的列类型【代码】

假设我们有一个称为df的数据框.我知道有使用df.dtypes的方法.但是我喜欢类似的东西 type(123)== int#注意int不是字符串 我想知道是否有类似的东西: type(df.select(< column_name>).collect()[0] [1])== IntegerType 基本上,我想知道从数据帧直接获取IntegerType,StringType之类的对象,然后对其进行判断的方法. 谢谢!解决方法:TL; DR使用外部数据类型(普通Python类型)测试值,使用内部数据类型(DataType子类)测试模式. 首先-您不应...

python-PySpark广播变量加入【代码】

我正在执行联接,我的数据跨100多个节点.因此,我有一小段要与另一个键/值对加入的键/值. 我的清单如下所示:[[1, 0], [2, 0], [3, 0], [4, 0], [5, 0], [6, 0], [7, 0], [8, 0], [9, 0], [10, 0], [11, 0], [16, 0], [18, 0], [19, 0], [20, 0], [21, 0], [22, 0], [23, 0], [24, 0], [25, 0], [26, 0], [27, 0], [28, 0], [29, 0], [36, 0], [37, 0], [38, 0], [39, 0], [40, 0], [41, 0], [42, 0], [44, 0], [46, 0]]我有广播变量...

python-找不到Pyspark模块【代码】

我正在尝试在纱线中执行一个简单的Pyspark作业.这是代码:from pyspark import SparkConf, SparkContextconf = (SparkConf().setMaster("yarn-client").setAppName("HDFS Filter").set("spark.executor.memory", "1g")) sc = SparkContext(conf = conf)inputFile = sc.textFile("hdfs://myserver:9000/1436304078054.json.gz").cache() matchTerm = "spark" numMatches = inputFile.filter(lambda line: matchTerm in line).count(...

python-如何在pyspark的RDD上访问元组中的单个元素?【代码】

可以说我有一个RDD [(u’Some1′,(u’ABC’,9989)), (u’Some2′,(u’XYZ’,235)), (u’Some3′,(u’BBB’,5379)), (u’Some4′,(u’ABC’,5379))] 我正在使用map一次获取一个元组,但是如何访问元组的各个元素,例如查看元组是否包含某些字符.实际上,我想过滤掉那些包含某些字符的字符.这里包含ABC的元组 我试图做这样的事情,但没有帮助def foo(line):if(line[1]=="ABC"):return (line)new_data = data.map(foo)我也是火花和Python的...

python-指定实木复合地板属性pyspark【代码】

如何在PySpark中指定镶木地板块大小和页面大小?我到处搜索,但是找不到函数调用或导入库的任何文档.解决方法:根据spark-user archivessc.hadoopConfiguration.setInt("dfs.blocksize", some_value) sc.hadoopConfiguration.setInt("parquet.block.size", some_value)所以在PySparksc._jsc.hadoopConfiguration().setInt("dfs.blocksize", some_value) sc._jsc.hadoopConfiguration().setInt("parquet.block.size", some_value)

python-从PySpark中的几列从groupby获取具有最大值的行【代码】

我有一个类似于的数据框from pyspark.sql.functions import avg, firstrdd = sc.parallelize( [ (0, "A", 223,"201603", "PORT"), (0, "A", 22,"201602", "PORT"), (0, "A", 22,"201603", "PORT"), (0, "C", 22,"201605", "PORT"), (0, "D", 422,"201601", "DOCK"), (0, "D", 422,"201602", "DOCK"), (0, "C", 422,"201602", "DOCK"), (1,"B", 3213,"201602", "DOCK"), (1,"A", 3213,"201602", "DOCK"), (1,"C", 3213,"20...

python-如何在不使用RDD API的情况下摆脱pyspark数据帧中的行包装器对象?【代码】

我针对临时视图发布以下SQL语句cloudantdata.createOrReplaceTempView("washingflat")sqlDF = spark.sql("SELECT temperature FROM washingflat") sqlDF.rdd.map(lambda row : row.temperature).collect()我只是对普通的(展开的)整数值感兴趣.到目前为止,我使用dataframe API进行的所有尝试始终返回包装了我感兴趣的值的行对象. 有没有一种方法可以在不使用RDD API的情况下获取标量内容?解决方法:所以给定一个输入DataFrameimport...

python-PySpark groupby和最大值选择【代码】

我有一个PySpark数据框name city datesatya Mumbai 13/10/2016satya Pune 02/11/2016satya Mumbai 22/11/2016satya Pune 29/11/2016satya Delhi 30/11/2016panda Delhi 29/11/2016brata BBSR 28/11/2016brata Goa 30/10/2016brata Goa 30/10/2016我需要为每个名称找出最喜欢的CITY,逻辑是“如果在“名称”“城市”对上具有最大城市出现次数的城市,则将城市作为fav_city.如果发现多个相同的事...

python-将PySpark数据框列类型转换为字符串并替换方括号【代码】

我需要将PySpark df列类型从数组转换为字符串,还要删除方括号.这是数据框的架构.需要处理的列是CurrencyCode和TicketAmount>>> plan_queryDF.printSchema()root|-- event_type: string (nullable = true)|-- publishedDate: string (nullable = true)|-- plannedCustomerChoiceID: string (nullable = true)|-- assortedCustomerChoiceID: string (nullable = true)|-- CurrencyCode: array (nullable = true)| |-- element: st...

python-PySpark安装错误【代码】

我已按照包括this、this、this和this在内的各种博客文章中的说明在笔记本电脑上安装pyspark.但是,当我尝试从终端或jupyter笔记本电脑使用pyspark时,我一直收到以下错误. 我已经安装了所有必要的软件,如问题底部所示. 我已将以下内容添加到我的.bashrc中function sjupyter_init() { #Set anaconda3 as python export PATH=~/anaconda3/bin:$PATH#Spark path (based on your computer) SPARK_HOME=/opt/spark export PATH=$SPARK_HOM...

python-PySpark中的高效列处理【代码】

我有一个数据列,其中的列数非常多(> 30000). 我根据这样的第一列用1和0填充它:for column in list_of_column_names:df = df.withColumn(column, when(array_contains(df['list_column'], column), 1).otherwise(0))但是,此过程需要很多时间.有办法更有效地做到这一点吗?告诉我列处理可以并行化. 编辑: 样本输入数据+----------------+-----+-----+-----+ | list_column | Foo | Bar | Baz | +----------------+-----+-----+-...

python-pyspark:系统找不到指定的路径【代码】

我刚刚使用conda安装了pyspark 2.2.0(在Windows 7 64bit,java v1.8上使用python v3.6)$conda install pyspark它已下载并似乎正确安装,没有错误.现在,当我在命令行上运行pyspark时,它只是告诉我“系统找不到指定的路径”.$pyspark The system cannot find the path specified. The system cannot find the path specified.我尝试在PATH环境变量中包含pyspark路径目录,但这似乎仍然无效,但是也许我输入的路径错误?任何人都可以请指教...

python-读取pySpark中的文件范围【代码】

我需要在pySpark中读取连续文件.以下对我有用.from pyspark.sql import SQLContext file = "events.parquet/exportDay=2015090[1-7]" df = sqlContext.read.load(file)我如何读取文件8-14?解决方法:使用花括号. file =“ events.parquet / exportDay = 201509 {08,09,10,11,12,13,14}” 这是一个关于堆栈溢出的类似问题:Pyspark select subset of files using regex glob.他们建议要么使用大括号,要么执行多次读取,然后合并对...

python-用同一列的平均值填充Pyspark数据框列的空值【代码】

有了这样的数据框rdd_2 = sc.parallelize([(0,10,223,"201601"), (0,10,83,"2016032"),(1,20,None,"201602"),(1,20,3003,"201601"), (1,20,None,"201603"), (2,40, 2321,"201601"), (2,30, 10,"201602"),(2,61, None,"201601")])df_data = sqlContext.createDataFrame(rdd_2, ["id", "type", "cost", "date"]) df_data.show()+---+----+----+-------+ | id|type|cost| date| +---+----+----+-------+ | 0| 10| 223| 201601| | ...