JavaDStream将lambda中的RDD打印到控制台
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了JavaDStream将lambda中的RDD打印到控制台,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含5689字,纯文字阅读大概需要9分钟。
内容图文
我是新手,我正在尝试使用基于spark-testing的API创建简单的JavaDStream来测试我的工作.到目前为止我所做的是:
JavaStreamingContext streamingContext = new
JavaStreamingContext(jsc(),Durations.seconds(10));
List<String> list = new LinkedList<String>();
list.add("first");
list.add("second");
list.add("third");
JavaRDD<String> myVeryOwnRDD = jsc().parallelize(list);
Queue<JavaRDD<String>> queue = new LinkedList<JavaRDD<String>>();
queue.add( myVeryOwnRDD );
JavaDStream<String> javaDStream = streamingContext.queueStream( queue );
javaDStream.foreachRDD( x-> {
x.collect().stream().forEach(n-> System.out.println("item of list: "+n));
});
我在跳,它会打印我的清单..它没有.我得到了它:
12:19:05.454 [main] DEBUG org.apache.spark.util.ClosureCleaner - +++ Cleaning closure <function1> (org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3) +++
12:19:05.468 [main] DEBUG org.apache.spark.util.ClosureCleaner - + declared fields: 3
12:19:05.469 [main] DEBUG org.apache.spark.util.ClosureCleaner - public static final long org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.serialVersionUID
12:19:05.469 [main] DEBUG org.apache.spark.util.ClosureCleaner - private final org.apache.spark.streaming.api.java.JavaDStreamLike org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.$outer
12:19:05.469 [main] DEBUG org.apache.spark.util.ClosureCleaner - private final org.apache.spark.api.java.function.VoidFunction org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.foreachFunc$3
12:19:05.469 [main] DEBUG org.apache.spark.util.ClosureCleaner - + declared methods: 2
12:19:05.470 [main] DEBUG org.apache.spark.util.ClosureCleaner - public final java.lang.Object org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(java.lang.Object)
12:19:05.470 [main] DEBUG org.apache.spark.util.ClosureCleaner - public final void org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(org.apache.spark.rdd.RDD)
12:19:05.470 [main] DEBUG org.apache.spark.util.ClosureCleaner - + inner classes: 0
12:19:05.471 [main] DEBUG org.apache.spark.util.ClosureCleaner - + outer classes: 1
12:19:05.472 [main] DEBUG org.apache.spark.util.ClosureCleaner - org.apache.spark.streaming.api.java.JavaDStreamLike
12:19:05.472 [main] DEBUG org.apache.spark.util.ClosureCleaner - + outer objects: 1
12:19:05.473 [main] DEBUG org.apache.spark.util.ClosureCleaner - org.apache.spark.streaming.api.java.JavaDStream@7209ffb5
12:19:05.474 [main] DEBUG org.apache.spark.util.ClosureCleaner - + populating accessed fields because this is the starting closure
12:19:05.478 [main] DEBUG org.apache.spark.util.ClosureCleaner - + fields accessed by starting closure: 1
12:19:05.479 [main] DEBUG org.apache.spark.util.ClosureCleaner - (interface org.apache.spark.streaming.api.java.JavaDStreamLike,Set())
12:19:05.479 [main] DEBUG org.apache.spark.util.ClosureCleaner - + outermost object is not a closure, so do not clone it: (interface org.apache.spark.streaming.api.java.JavaDStreamLike,org.apache.spark.streaming.api.java.JavaDStream@7209ffb5)
12:19:05.480 [main] DEBUG org.apache.spark.util.ClosureCleaner - +++ closure <function1> (org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3) is now cleaned +++
12:19:05.481 [main] DEBUG org.apache.spark.util.ClosureCleaner - +++ Cleaning closure <function2> (org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3) +++
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner - + declared fields: 2
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner - public static final long org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.serialVersionUID
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner - private final scala.Function1 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.cleanedF$1
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner - + declared methods: 2
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner - public final java.lang.Object org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(java.lang.Object,java.lang.Object)
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner - public final void org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(org.apache.spark.rdd.RDD,org.apache.spark.streaming.Time)
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner - + inner classes: 0
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner - + outer classes: 0
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner - + outer objects: 0
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner - + populating accessed fields because this is the starting closure
12:19:05.483 [main] DEBUG org.apache.spark.util.ClosureCleaner - + fields accessed by starting closure: 0
12:19:05.483 [main] DEBUG org.apache.spark.util.ClosureCleaner - + there are no enclosing objects!
12:19:05.483 [main] DEBUG org.apache.spark.util.ClosureCleaner - +++ closure <function2> (org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3) is now cleaned +++
我错过了什么吗?
PS:给定的输出就在我打印列表的位置,我正在使用Spring Unit Tests进行工作:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = config.class)
public class myTester extends SharedJavaSparkContext implements Serializable{
解决方法:
我想你需要启动流媒体上下文.
streamingContext.start()
内容总结
以上是互联网集市为您收集整理的JavaDStream将lambda中的RDD打印到控制台全部内容,希望文章能够帮你解决JavaDStream将lambda中的RDD打印到控制台所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。