python – 重命名spark数据帧中的嵌套字段
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了python – 重命名spark数据帧中的嵌套字段,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含3520字,纯文字阅读大概需要6分钟。
内容图文
![python – 重命名spark数据帧中的嵌套字段](/upload/InfoBanner/zyjiaocheng/714/b86bf58109574f0d80e25fffed6c7ff5.jpg)
在Spark中有一个dataframe df:
|-- array_field: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: string (nullable = true)
| | |-- b: long (nullable = true)
| | |-- c: long (nullable = true)
如何将字段array_field.a重命名为array_field.a_renamed?
[更新]:
.withColumnRenamed()不适用于嵌套字段,所以我尝试了这个hacky和不安全的方法:
# First alter the schema:
schema = df.schema
schema['array_field'].dataType.elementType['a'].name = 'a_renamed'
ind = schema['array_field'].dataType.elementType.names.index('a')
schema['array_field'].dataType.elementType.names[ind] = 'a_renamed'
# Then set dataframe's schema with altered schema
df._schema = schema
我知道设置私有属性不是一个好习惯,但我不知道为df设置架构的其他方法
我认为我在正确的轨道上但是df.printSchema()仍然显示了array_field.a的旧名称,尽管df.schema == schema为True
解决方法:
Python
无法修改单个嵌套字段.您必须重新创建整个结构.在这种特殊情况下,最简单的解决方案是使用演员表.
首先是一堆进口:
from collections import namedtuple
from pyspark.sql.functions import col
from pyspark.sql.types import (
ArrayType, LongType, StringType, StructField, StructType)
和示例数据:
Record = namedtuple("Record", ["a", "b", "c"])
df = sc.parallelize([([Record("foo", 1, 3)], )]).toDF(["array_field"])
让我们确认架构与您的情况相同:
df.printSchema()
root
|-- array_field: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: string (nullable = true)
| | |-- b: long (nullable = true)
| | |-- c: long (nullable = true)
您可以将新模式定义为字符串:
str_schema = "array<struct<a_renamed:string,b:bigint,c:bigint>>"
df.select(col("array_field").cast(str_schema)).printSchema()
root
|-- array_field: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a_renamed: string (nullable = true)
| | |-- b: long (nullable = true)
| | |-- c: long (nullable = true)
或数据类型:
struct_schema = ArrayType(StructType([
StructField("a_renamed", StringType()),
StructField("b", LongType()),
StructField("c", LongType())
]))
df.select(col("array_field").cast(struct_schema)).printSchema()
root
|-- array_field: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a_renamed: string (nullable = true)
| | |-- b: long (nullable = true)
| | |-- c: long (nullable = true)
斯卡拉
Scala中可以使用相同的技术:
case class Record(a: String, b: Long, c: Long)
val df = Seq(Tuple1(Seq(Record("foo", 1, 3)))).toDF("array_field")
val strSchema = "array<struct<a_renamed:string,b:bigint,c:bigint>>"
df.select($"array_field".cast(strSchema))
要么
import org.apache.spark.sql.types._
val structSchema = ArrayType(StructType(Seq(
StructField("a_renamed", StringType),
StructField("b", LongType),
StructField("c", LongType)
)))
df.select($"array_field".cast(structSchema))
可能的改进:
如果您使用富有表现力的数据操作或JSON处理库,则可以更容易地将数据类型转储到dict或JSON字符串,并从那里获取它(例如,Python / toolz):
from toolz.curried import pipe, assoc_in, update_in, map
from operator import attrgetter
# Update name to "a_updated" if name is "a"
rename_field = update_in(
keys=["name"], func=lambda x: "a_updated" if x == "a" else x)
updated_schema = pipe(
# Get schema of the field as a dict
df.schema["array_field"].jsonValue(),
# Update fields with rename
update_in(
keys=["type", "elementType", "fields"],
func=lambda x: pipe(x, map(rename_field), list)),
# Load schema from dict
StructField.fromJson,
# Get data type
attrgetter("dataType"))
df.select(col("array_field").cast(updated_schema)).printSchema()
内容总结
以上是互联网集市为您收集整理的python – 重命名spark数据帧中的嵌套字段全部内容,希望文章能够帮你解决python – 重命名spark数据帧中的嵌套字段所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。