Spark2.3(三十七):Stream join Stream(res文件每天更新一份)
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了Spark2.3(三十七):Stream join Stream(res文件每天更新一份),小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含5436字,纯文字阅读大概需要8分钟。
内容图文
![Spark2.3(三十七):Stream join Stream(res文件每天更新一份)](/upload/InfoBanner/zyjiaocheng/1242/b527217c5196410c9edf007557438865.jpg)
kafka测试数据生成:
package com.dx.kafka; import java.util.Properties; import java.util.Random; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class KafkaProducer { public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.0.141:9092,192.168.0.142:9092,192.168.0.143:9092,192.168.0.144:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer(props); int i = 0; Random random=new Random(); while (true) { i++; producer.send(new ProducerRecord<String, String>("my-topic", "key-" + Integer.toString(i), i%3+","+random.nextInt(100))); System.out.println(i); Thread.sleep(1000); if(i%100==0) { Thread.sleep(60*1000); } } // producer.close(); } }
Stream join Stream测试代码:
要求:使用spark structured streaming实时读取kafka中的数据,kafka中的数据包含字段int_id;kafka上数据需要关联资源信息(通过kafka的int_id与资源的int_id进行关联),同时要求资源每天都更新。
使用spark structured streaming实时读取kafka中的数据
Dataset<Row> linesDF = this.sparkSession.readStream()// .format("kafka")// .option("failOnDataLoss", false)// .option("kafka.bootstrap.servers", "192.168.0.141:9092,192.168.0.142:9092,192.168.0.143:9092,192.168.0.144:9092")// .option("subscribe", "my-topic")// .option("startingOffsets", "earliest")// .option("maxOffsetsPerTrigger", 10)// .load(); StructType structType = new StructType(); structType = structType.add("int_id", DataTypes.StringType, false); structType = structType.add("rsrp", DataTypes.StringType, false); structType = structType.add("mro_timestamp", DataTypes.TimestampType, false); ExpressionEncoder<Row> encoder = RowEncoder.apply(structType); Dataset<Row> mro = linesDF.select("value").as(Encoders.STRING()).map(new MapFunction<String, Row>() { privatestaticfinallong serialVersionUID = 1L; @Override public Row call(String t) throws Exception { List<Object> values = new ArrayList<Object>(); String[] fields = t.split(","); values.add(fields.length >= 1 ? fields[0] : "null"); values.add(fields.length >= 2 ? fields[1] : "null"); values.add(new Timestamp(new Date().getTime())); return RowFactory.create(values.toArray()); } }, encoder); mro=mro.withWatermark("mro_timestamp", "15 minutes"); mro.printSchema();
加载资源信息
StructType resulStructType = new StructType(); resulStructType = resulStructType.add("int_id", DataTypes.StringType, false); resulStructType = resulStructType.add("enodeb_id", DataTypes.StringType, false); resulStructType = resulStructType.add("res_timestamp", DataTypes.TimestampType, false); ExpressionEncoder<Row> resultEncoder = RowEncoder.apply(resulStructType); Dataset<Row> resDs = sparkSession.readStream().option("maxFileAge", "1ms").textFile(resourceDir) .map(new MapFunction<String, Row>() { privatestaticfinallong serialVersionUID = 1L; @Override public Row call(String value) throws Exception { String[] fields = value.split(","); Object[] objItems = new Object[3]; objItems[0] = fields[0]; objItems[1] = fields[1]; objItems[2] = Timestamp.valueOf(fields[2]); return RowFactory.create(objItems); } }, resultEncoder); resDs = resDs.withWatermark("res_timestamp", "1 seconds"); resDs.printSchema();
kafka上数据与资源关联
关联条件int_id相同,同时要求res.timestamp<=mro.timestmap & res.timestamp<(mro.timestmap-1天)
res如果放入broadcast经过测试发现也是可行的。
// JavaSparkContext jsc = // JavaSparkContext.fromSparkContext(sparkSession.sparkContext()); Dataset<Row> cellJoinMro = mro.as("t10")// .join(resDs.as("t11"),// jsc.broadcast(resDs).getValue() functions.expr("t11.int_id=t10.int_id "// + "and t11.res_timestamp<=t10.mro_timestamp "// + "and timestamp_diff(t11.res_timestamp,t10.mro_timestamp,‘>‘,‘-86400000‘)"),// "left_outer")// .selectExpr("t10.int_id", "t10.rsrp", "t11.enodeb_id", "t10.mro_timestamp", "t11.res_timestamp"); StreamingQuery query = cellJoinMro.writeStream().format("console").outputMode("update") // .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))// .start();
udf:timestamp_diff定义
sparkSession.udf().register("timestamp_diff", new UDF4<Timestamp, Timestamp, String, String, Boolean>() { privatestaticfinallong serialVersionUID = 1L; @Override public Boolean call(Timestamp t1, Timestamp t2, String operator, String intervalMsStr) throws Exception { long diffValue=t1.getTime()-t2.getTime(); long intervalMs=Long.valueOf(intervalMsStr); if(operator.equalsIgnoreCase(">")){ return diffValue>intervalMs; }elseif(operator.equalsIgnoreCase(">=")){ return diffValue>=intervalMs; }elseif(operator.equalsIgnoreCase("<")){ return diffValue<intervalMs; }elseif(operator.equalsIgnoreCase("<=")){ return diffValue<=intervalMs; }elseif(operator.equalsIgnoreCase("=")){ return diffValue==intervalMs; }else{ thrownew RuntimeException("unknown error"); } } },DataTypes.BooleanType);
如果删除资源历史数据,不会导致正在运行的程序抛出异常;当添加新文件到res hdfs路径下时,可以自动被加载进来。
备注:要求必须每天资源文件只能有一份,否则会导致kafka上数据关联后结果重复,同时,res上的每天的文件中包含timestmap字段格式都为yyyy-MM-dd 00:00:00。
原文:https://www.cnblogs.com/yy3b2007com/p/10124722.html
内容总结
以上是互联网集市为您收集整理的Spark2.3(三十七):Stream join Stream(res文件每天更新一份)全部内容,希望文章能够帮你解决Spark2.3(三十七):Stream join Stream(res文件每天更新一份)所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。