/**
* 创建dataframe
*/
public class DataFrameCreate {public static void main (String[] args){SparkConf conf = new SparkConf().setAppName("DataFrameCreate").setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);SQLContext sqlContext = new SQLContext(sc);sqlContext.read().json("hdfs://spark1:9000/test.json").show();}
}//=======================分隔符=====================================...
我正在使用Spark 1.6.1
我有一个DataFrame,我需要对其进行遍历并将每一行写入Kafka.截至目前,我正在执行以下操作:Producer<String><String> message;
for(Row x: my_df.collect()){kafka_message = new Producer<String><String>(topic, String.valueOf(x))my_kafka_producer.send(kafka_message);
}这里的问题是,收集将数据发送到驱动程序,然后推送到kafka.鉴于我大约有250位执行者,我的1个驱动程序无法有效处理工作量.因此,我想...
我正在用Java编写Spark应用程序,该应用程序读取HiveTable并将输出以Json格式存储在HDFS中.
我使用HiveContext读取了蜂巢表,它返回了DataFrame.下面是代码片段.SparkConf conf = new SparkConf().setAppName("App");JavaSparkContext sc = new JavaSparkContext(conf);HiveContext hiveContext = new org.apache.spark.sql.hive.HiveContext(sc);DataFrame data1= hiveContext.sql("select * from tableName")现在我想将DataFrame转...
我是新手,我想要使用group-by& reduce从CSV中找到以下内容(使用一行):Department, Designation, costToCompany, StateSales, Trainee, 12000, UPSales, Lead, 32000, APSales, Lead, 32000, LASales, Lead, 32000, TNSales, Lead, 32000, APSales, Lead, 32000, TN Sales, Lead, 32000, LASales, Lead, 32000, LAMarketing, Associate, 18000, TNMarketing, Associate, 18000, TNHR, Manager, 58000, TN我想通过Department,Design...
我有一个json数据文件,其中包含一个属性[creationDate],它是“long”数字类型的unix epoc. Apache Spark DataFrame架构如下所示:root |-- creationDate: long (nullable = true) |-- id: long (nullable = true) |-- postTypeId: long (nullable = true)|-- tags: array (nullable = true)| |-- element: string (containsNull = true)|-- title: string (nullable = true)|-- viewCount: long (nullable = true)我想做一些gro...
描述here(零点323)的解决方案非常接近我想要的两个曲折:
>我如何用Java做到这一点?>如果列具有字符串列表而不是单个字符串,并且我想在GroupBy(其他列)之后将所有这些列表收集到单个列表中,该怎么办?
我正在使用Spark 1.6并尝试使用
org.apache.spark.sql.functions.collect_list(Column col),如该问题的解决方案中所述,但得到以下错误Exception in thread “main” org.apache.spark.sql.AnalysisException: undefined function...
1.问题描述
RDD转换为DataFrame,运行命令:val spark=SparkSession.builder().appName("RDD2DataFrameSpark").master("local[2]").getOrCreate()//RDD==>DataFrameval rdd= spark.sparkContext.textFile("datas/info.txt")// For implicit conversions from RDDs to DataFramesimport spark.implicits._val infoDF=rdd.map( _.split(",")).map(line=>Info(line(0).toInt,line(1),line(2).toInt)).toDF()infoDF.show()
报错:18/10...
在Spark SQL中,当我尝试在DataFrame上使用map函数时,我遇到了错误.
DataFrame类型中的方法映射(Function1,ClassTag)不适用于参数(new Function(){})
我也在关注spark 1.3文档. https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection有任何解决方案吗?
这是我的测试代码.// SQL can be run over RDDs that have been registered as tables.
DataFrame teenagers = sqlContext.s...
> DataFrame a =包含列x,y,z,k> DataFrame b =包含列x,y,aa.join(b,<condition to use in java to use x,y >) ??? 我试过用a.join(b,a.col("x").equalTo(b.col("x")) && a.col("y").equalTo(b.col("y"),"inner")但Java正在抛出错误说&&不被允许.解决方法:Spark SQL在Column上提供了一组标记为java_expr_ops的方法,这些方法专为Java互操作性而设计.它包括and(参见or)方法,可在此处使用:a.col("x").equalTo(b.col("x")).and(a.col("...