scala spark and dataframe example
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了scala spark and dataframe example,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含12628字,纯文字阅读大概需要19分钟。
内容图文
![scala spark and dataframe example](/upload/InfoBanner/zyjiaocheng/1179/40d980e85dc54cd8a5966c2e2264e0ba.jpg)
承接上篇pyspark,这里再给一个我写的scala的例子。这个的目的是从埋点事件里统计需要的几个事件并分区域累计,kafka stream实时计算
要说一下,版本特别重要,一个是spark版本(<2, 2.0, >2.0),一个是scala版本(主要是<2.11和2.11),注意匹配
pom.xml
<? xml version="1.0" encoding="UTF-8" ?> < project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > < modelVersion >4.0.0</modelVersion><groupId>statLiveSuccessRate</groupId><artifactId>statLiveSuccessRate</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><encoding>UTF-8</encoding><scala.compat.version>2.11</scala.compat.version><spark.version>2.2.0</spark.version><scala.version>2.11.8</scala.version><scala.binary.version>2.11</scala.binary.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_${scala.binary.version} --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_${scala.binary.version}</artifactId><version>${spark.version}</version><!--<scope>provided</scope>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_${scala.binary.version} --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_${scala.binary.version}</artifactId><version>${spark.version}</version><!--<scope>provided</scope>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_${scala.binary.version} --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${scala.binary.version}</artifactId><version>${spark.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.10.0.1</version></dependency><!-- https://mvnrepository.com/artifact/joda-time/joda-time --><dependency><groupId>joda-time</groupId><artifactId>joda-time</artifactId><version>2.9.7</version></dependency><!-- https://mvnrepository.com/artifact/com.google.code.gson/gson --><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.6.2</version></dependency><dependency><groupId>com.jayway.jsonpath</groupId><artifactId>json-path</artifactId><version>2.2.0</version></dependency><!--<dependency> <groupId>com.amazon.redshift</groupId> <artifactId>redshift-jdbc4</artifactId> <version>1.2.1.1001</version> </dependency> <!– https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-s3 –> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-s3</artifactId> <version>1.11.91</version> </dependency> <!– https://mvnrepository.com/artifact/com.databricks/spark-redshift_2.11 –> <dependency> <groupId>com.databricks</groupId> <artifactId>spark-redshift_2.11</artifactId> <version>3.0.0-preview1</version> </dependency> <!– https://mvnrepository.com/artifact/com.twitter/algebird-core_2.11 –> <dependency> <groupId>com.twitter</groupId> <artifactId>algebird-core_2.11</artifactId> <version>0.12.4</version> </dependency> <!– https://mvnrepository.com/artifact/com.rabbitmq/amqp-client –> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.2</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> <type>jar</type> <scope>compile</scope> </dependency>--><dependency><groupId>org.scalatest</groupId><artifactId>scalatest_2.11</artifactId><version>3.0.1</version><scope>test</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><!-- https://mvnrepository.com/artifact/junit/junit --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency></dependencies><build><!--scala待编译的文件目录--><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.1.0</version><executions><execution><phase>compile</phase><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals></execution></executions><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><finalName>statsLive</finalName><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>reference.conf</resource></transformer><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.....di.live_and_watch</mainClass><!--main方法--></transformer></transformers></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.7</version><configuration><skipTests>true</skipTests></configuration></plugin><plugin><groupId>org.scalatest</groupId><artifactId>scalatest-maven-plugin</artifactId><version>1.0</version><executions><execution><id>test</id><goals><goal>test</goal></goals></execution></executions></plugin></plugins></build><repositories><repository><id>redshift</id><url>http://redshift-maven-repository.s3-website-us-east-1.amazonaws.com/release</url></repository></repositories></project>
scala
1 package com....di 2 3 import java.util.Properties 4 5 import org.apache.kafka.common.serialization.StringDeserializer 6 import org.apache.spark.sql.{SaveMode, SparkSession} 7 import org.apache.spark.sql.types.{StringType, StructField, StructType} 8 import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 9 import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 10 import org.apache.spark.streaming.kafka010._ 11 import org.apache.spark.streaming.{Minutes, StreamingContext} 12 13 /** 14 * Author: Elson 15 * Since: 2017.10.3 16 * Desc: https://... 17 */ 18 19 20 class live_and_watch {} 21 22 object live_and_watch { 23 def main(args: Array[String]): Unit = { 24//TODO add logger 25//all variables26 val sparkMem = "1g" 27 val kafkaBootstrap = "...:9092" 28 var monitordb = "..." 29 var windows_minutes = 5 30313233 val spark = SparkSession.builder().master("local[*]").appName("live_and_watch_success_or_fail") 34 .config("spark.driver.memory", sparkMem) 35// for redshift 36// .config("fs.s3.awsAccessKeyId", "...") 37// .config("fs.s3.awsSecretAccessKey", "...")38 .getOrCreate() 3940 spark.sparkContext.setLogLevel("ERROR") 4142// for redshift 43// spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "...") 44// spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "...")4546 val myprop = new Properties 47 myprop.setProperty("driver", "com.mysql.jdbc.Driver") 48 myprop.setProperty("user", "...") 49 myprop.setProperty("password", "...") 5051 val ssc = new StreamingContext(spark.sparkContext, Minutes(windows_minutes)) 52 val kafkaParams = Map[String, Object]( 53 "bootstrap.servers" -> kafkaBootstrap, 54 "key.deserializer" -> classOf[StringDeserializer], 55 "value.deserializer" -> classOf[StringDeserializer], 56 "group.id" -> "live_and_watch_success_or_fail", 57 "auto.offset.reset" -> "latest", 58 "enable.auto.commit" -> (false: java.lang.Boolean) 59 ) 60 val schemaStrings = "k v ts ua e_ts d_ts s_ts zone" 61 val fields = schemaStrings.split(" ").map(fieldname => StructField(fieldname, StringType, nullable = true)) 62 val schema = StructType(fields) 63 val topic1 = Array("loop.event") 64 val stream = KafkaUtils.createDirectStream[String, String]( 65 ssc, 66 PreferConsistent, 67 Subscribe[String, String](topic1, kafkaParams) 68 ) 69// stream.map(record => (record.key, record.value))70 stream.map(v => v.value).filter(k => k.contains("bdc.start") || k.contains("viewer.pull.stream.succes") || k.contains("go.live.failed") || k.contains("live.watch.failed")) 71/*window(Minutes(5), Minutes(5)).*/ .foreachRDD { rdd => 72 val now = System.currentTimeMillis() / 1000 73 val jsonDF = spark.read.schema(schema).json(rdd).createOrReplaceTempView("events") 74/*val row = spark.sql(s"select case when k not like ‘-*‘ then split(k, ‘-‘)[3] else split(k, ‘-‘)[4] end as event" + 75 s",zone,count(*) cnt,$now as ts from events group by event,zone")*/76 val row = spark.sql(s"select case when k not like ‘-%‘ then split(regexp_replace(k,‘--‘,‘-‘), ‘-‘)[3] else split(regexp_replace(k,‘--‘,‘-‘), ‘-‘)[4] end as event,zone,count(1) cnt,$now as ts from events group by event,zone") 77// row.show(10, truncate = false) 78// rdd.foreach(println)79 row.coalesce(1).write 80 .mode(SaveMode.Append) 81 .jdbc( 82 s"jdbc:mysql://$monitordb/loops_monitor?useUnicode=true&characterEncoding=UTF-8", 83 "live_success_rate_stats", 84 myprop 85 ) 86 } 87 ssc.start() 88 ssc.awaitTermination() 89 } 90} 919293/*94+---------+----+---+-------------+ 95|event |zone|cnt|ts | 96+---------+----+---+-------------+ 97|bdc.start|sa |1 |1507013520857| 98+---------+----+---+-------------+ 99*/
执行就是java一样的,打jar包去执行即可
mvn clean package
java -cp xxx.jar (前提是pom.xml里指定了主类)
几个要点:
- 这个job比较临时性,所以只写了个空类,全部代码放object里了。正式而复杂的job需要代码规范性
- 需要非常了解数据源格式,比如这里的k字段就包含了事件,需要抽取
- 可以用传统的rdd map-reduce来处理,但是从代码可以看出spark dataframe非常好用,因为可以写sql,直接group by了,也可以不用sql做聚合。所以只要是有格式的数据源,个人推荐能用sql就用sql
原文:http://www.cnblogs.com/elsonwe/p/7640563.html
内容总结
以上是互联网集市为您收集整理的scala spark and dataframe example全部内容,希望文章能够帮你解决scala spark and dataframe example所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。