如何在Java / Kotlin中创建一个返回复杂类型的Spark UDF?
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了如何在Java / Kotlin中创建一个返回复杂类型的Spark UDF?,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含6317字,纯文字阅读大概需要10分钟。
内容图文
![如何在Java / Kotlin中创建一个返回复杂类型的Spark UDF?](/upload/InfoBanner/zyjiaocheng/708/58d6e6d3326b48e5a3f11876f6d4c59b.jpg)
我正在尝试编写一个返回复杂类型的UDF:
private val toPrice = UDF1<String, Map<String, String>> { s ->
val elements = s.split(" ")
mapOf("value" to elements[0], "currency" to elements[1])
}
val type = DataTypes.createStructType(listOf(
DataTypes.createStructField("value", DataTypes.StringType, false),
DataTypes.createStructField("currency", DataTypes.StringType, false)))
df.sqlContext().udf().register("toPrice", toPrice, type)
但是每次我用这个:
df = df.withColumn("price", callUDF("toPrice", col("price")))
我得到一个神秘的错误:
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$28: (string) => struct<value:string,currency:string>)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: scala.MatchError: {value=138.0, currency=USD} (of class java.util.LinkedHashMap)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:236)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:231)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:379)
... 19 more
我尝试使用自定义数据类型:
class Price(val value: Double, val currency: String) : Serializable
使用返回该类型的UDF:
private val toPrice = UDF1<String, Price> { s ->
val elements = s.split(" ")
Price(elements[0].toDouble(), elements[1])
}
但后来我得到另一个MatchError,它抱怨价格类型.
如何正确编写可以返回复杂类型的UDF?
解决方法:
TL; DR该函数应返回类org.apache.spark.sql.Row的对象.
Spark提供了UDF定义的两个主要变体.
>使用Scala反射的udf变体:
> def udf [RT](f :()?RT)(隐式arg0:TypeTag [RT]):UserDefinedFunction
> def udf [RT,A1](f:(A1)?RT)(隐式arg0:TypeTag [RT],arg1:TypeTag [A1]):UserDefinedFunction
> ……
> def udf [RT,A1,A2,…,A10](f:(A1,A2,…,A10)?RT)(隐式arg0:TypeTag [RT],arg1:TypeTag [A1],arg2 :TypeTag [A2],…,arg10:TypeTag [A10])
哪个定义
Scala closure of … arguments as user-defined function (UDF). The data types are automatically inferred based on the Scala closure’s signature.
这些变体在没有原子或代数数据类型的模式的情况下使用.例如,有问题的函数将在Scala中定义:
case class Price(value: Double, currency: String)
val df = Seq("1 USD").toDF("price")
val toPrice = udf((s: String) => scala.util.Try {
s split(" ") match {
case Array(price, currency) => Price(price.toDouble, currency)
}
}.toOption)
df.select(toPrice($"price")).show
// +----------+
// |UDF(price)|
// +----------+
// |[1.0, USD]|
// +----------+
在此变体中,返回类型是自动编码的.
由于它依赖于反射,因此该变体主要用于Scala用户.
>提供模式定义的udf变体(您在此处使用的变体).此变体的返回类型应与数据集[Row]的返回类型相同:
>正如在另一个答案中指出的那样,您只能使用SQL types mapping table中列出的类型(原子类型为盒装或未装箱,java.sql.Timestamp / java.sql.Date,以及高级集合).
>使用org.apache.spark.sql.Row表示复杂结构(结构/结构类型).不允许与代数数据类型或等效数据混合.例如(Scala代码)
struct<_1:int,_2:struct<_1:string,_2:struct<_1:double,_2:int>>>
应该表达为
Row(1, Row("foo", Row(-1.0, 42))))
不
(1, ("foo", (-1.0, 42))))
或任何混合变体,如
Row(1, Row("foo", (-1.0, 42))))
提供此变体主要是为了确保Java互操作性.
在这种情况下(相当于有问题的那个),定义应类似于以下定义:
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.Row
val schema = StructType(Seq(
StructField("value", DoubleType, false),
StructField("currency", StringType, false)
))
val toPrice = udf((s: String) => scala.util.Try {
s split(" ") match {
case Array(price, currency) => Row(price.toDouble, currency)
}
}.getOrElse(null), schema)
df.select(toPrice($"price")).show
// +----------+
// |UDF(price)|
// +----------+
// |[1.0, USD]|
// | null|
// +----------+
排除异常处理的所有细微差别(通常UDF应该控制空输入,按照惯例优雅地处理格式错误的数据)Java等效应该看起来或多或少像这样:
UserDefinedFunction price = udf((String s) -> {
String[] split = s.split(" ");
return RowFactory.create(Double.parseDouble(split[0]), split[1]);
}, DataTypes.createStructType(new StructField[]{
DataTypes.createStructField("value", DataTypes.DoubleType, true),
DataTypes.createStructField("currency", DataTypes.StringType, true)
}));
语境:
为了给你一些上下文,这种区别也反映在API的其他部分.例如,您可以从架构和一系列行创建DataFrame:
def createDataFrame(rows: List[Row], schema: StructType): DataFrame
或使用一系列产品的反射
def createDataFrame[A <: Product](data: Seq[A])(implicit arg0: TypeTag[A]): DataFrame
但不支持混合变体.
换句话说,您应该提供可以使用RowEncoder编码的输入.
当然你通常不会使用udf来执行这样的任务:
import org.apache.spark.sql.functions._
df.withColumn("price", struct(
split($"price", " ")(0).cast("double").alias("price"),
split($"price", " ")(1).alias("currency")
))
有关:
> Creating a SparkSQL UDF in Java outside of SQLContext
内容总结
以上是互联网集市为您收集整理的如何在Java / Kotlin中创建一个返回复杂类型的Spark UDF?全部内容,希望文章能够帮你解决如何在Java / Kotlin中创建一个返回复杂类型的Spark UDF?所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。