Spark 常用的 Transformation 算子示例 ===> Java 版
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了Spark 常用的 Transformation 算子示例 ===> Java 版,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含7727字,纯文字阅读大概需要12分钟。
内容图文
![Spark 常用的 Transformation 算子示例 ===> Java 版](/upload/InfoBanner/zyjiaocheng/1110/c5c7c5db5c4f4351941b9e7b30730532.jpg)
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import scala.Int;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
public class TransformationCases {
public static void main(String[] args) {
//准备测试数据
List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
List<String> text = Arrays.asList("cat,dog,rabbit","apple,pear,peach","eyes,nose,mouth");
List<Tuple2<String,Integer>> scores = Arrays.asList(
new Tuple2<String, Integer>("class1",88),
new Tuple2<String, Integer>("class2",90),
new Tuple2<String, Integer>("class2",85),
new Tuple2<String, Integer>("class1",95),
new Tuple2<String, Integer>("class2",89)
);
List<Tuple2<Integer,String>> students = Arrays.asList(
new Tuple2<Integer, String>(1,"s1"),
new Tuple2<Integer, String>(2,"s2"),
new Tuple2<Integer, String>(3,"s3"),
new Tuple2<Integer, String>(4,"s4")
);
List<Tuple2<Integer,Integer>> stuScores = Arrays.asList(
new Tuple2<Integer, Integer>(1,100),
new Tuple2<Integer, Integer>(2,98),
new Tuple2<Integer, Integer>(3,98),
new Tuple2<Integer, Integer>(3,99),
new Tuple2<Integer, Integer>(2,99)
);
//拿到 SparkContext 对象
JavaSparkContext sc = getContext();
//测试 Transformation 方法:
// mapDemo(sc,numbers);
// filterDemo(sc,numbers);
// flatMapDemo(sc,text);
// groupByKeyDemo(sc,scores);
// reduceByKeyDemo(sc,scores);
// sortByKeyDemo(sc,scores);
// joinDemo(sc,students,stuScores);
cogroupDemo(sc,students,stuScores);
closeContext(sc);
}
//创建SparkConf 和 SparkContext 对象。
public static JavaSparkContext getContext(){
SparkConf conf = new SparkConf()
.setAppName("TransformationCases")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
return sc;
}
//关闭 SparkContext 对象。
public static void closeContext(JavaSparkContext sc){
if (sc != null){
sc.close();
}
}
//调用 map 算子实现功能:将集合中的每个元素乘以 2 .
public static void mapDemo(JavaSparkContext sc, List<Integer> numbers){
JavaRDD<Integer> rdd = sc.parallelize(numbers,1);
JavaRDD<Integer> doubledNumbers = rdd.map(new Function<Integer,Integer>() {
public Integer call(Integer v1) throws Exception {
return v1 * 2;
}
});
doubledNumbers.foreach(new VoidFunction<Integer>() {
public void call(Integer number) throws Exception {
System.out.println(number);
}
});
}
//调用 filter 算子实现功能:返回集合中所有的偶数。
public static void filterDemo(JavaSparkContext sc,List<Integer> numbers){
JavaRDD<Integer> rdd = sc.parallelize(numbers,1);
JavaRDD<Integer> evenNumbers = rdd.filter(new Function<Integer, Boolean>() {
public Boolean call(Integer v1) throws Exception {
return v1 % 2 == 0;
}
});
evenNumbers.foreach(new VoidFunction<Integer>() {
public void call(Integer number) throws Exception {
System.out.println(number);
}
});
}
//调用 flatMap 算子实现功能:将每个字符串拆分成单个的单词。
public static void flatMapDemo(JavaSparkContext sc,List<String> text){
JavaRDD<String> rdd = sc.parallelize(text);
JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String line) throws Exception {
return Arrays.asList(line.split(",")).iterator();
}
});
words.foreach(new VoidFunction<String>() {
public void call(String word) throws Exception {
System.out.println(word);
}
});
}
//调用 groupByKey 算子实现功能:根据班级分组,将同一个班级的分数归为一组。
public static void groupByKeyDemo(JavaSparkContext sc, List<Tuple2<String,Integer>> scores){
JavaPairRDD<String, Integer> lists = sc.parallelizePairs(scores);
JavaPairRDD<String,Iterable<Integer>> groupedScores = lists.groupByKey();
groupedScores.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
public void call(Tuple2<String, Iterable<Integer>> scores) throws Exception {
System.out.println(scores._1);
Iterator<Integer> iterator = scores._2.iterator();
while (iterator.hasNext()){
System.out.println(iterator.next());
}
System.out.println("========================================");
}
});
}
//调用 reduceByKey 算子实现功能:计算每个班级分数总和。
public static void reduceByKeyDemo(JavaSparkContext sc,List<Tuple2<String,Integer>> scores){
JavaPairRDD<String,Integer> rdd = sc.parallelizePairs(scores);
JavaPairRDD<String,Integer> reducedScores = rdd.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
reducedScores.foreach(new VoidFunction<Tuple2<String, Integer>>() {
public void call(Tuple2<String, Integer> scores) throws Exception {
System.out.println(scores._1 + " : " + scores._2);
}
});
}
//调用 sortedByKey 算子实现功能:按照分数做升序排序。
public static void sortByKeyDemo(JavaSparkContext sc,List<Tuple2<String,Integer>> scores){
JavaPairRDD<String,Integer> rdd = sc.parallelizePairs(scores);
//因为是要根据分数排序,而原始数据的key是class,所以将key和value临时调换一下。
JavaPairRDD<Integer,String> swapedRdd = rdd.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
public Tuple2<Integer, String> call(Tuple2<String, Integer> pair) throws Exception {
return new Tuple2<Integer, String>(pair._2,pair._1);
}
});
//根据现在的key(分数)升序排序。
JavaPairRDD<Integer,String> sortedRdd = swapedRdd.sortByKey();
//排序完成后,还是要按照原始数据的key和value来保存,所以再把key和value调换回来。
JavaPairRDD<String,Integer> result = sortedRdd.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
public Tuple2<String, Integer> call(Tuple2<Integer, String> pair) throws Exception {
return new Tuple2<String, Integer>(pair._2,pair._1);
}
});
result.foreach(new VoidFunction<Tuple2<String, Integer>>() {
public void call(Tuple2<String, Integer> pairs) throws Exception {
System.out.println(pairs._1 + " : " + pairs._2);
}
});
}
//调用 join 算子实现功能:将两个RDD的元素按照key做连接。
public static void joinDemo(JavaSparkContext sc,List<Tuple2<Integer,String>> students,List<Tuple2<Integer,Integer>> stuScores){
JavaPairRDD<Integer,String> stuRdd = sc.parallelizePairs(students);
JavaPairRDD<Integer,Integer> scoreRdd = sc.parallelizePairs(stuScores);
JavaPairRDD<Integer,Tuple2<String,Integer>> lists = stuRdd.join(scoreRdd);
lists.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String, Integer>>>() {
public void call(Tuple2<Integer, Tuple2<String, Integer>> pairs) throws Exception {
System.out.println(pairs._1 + " : " + pairs._2._1 + " : " + pairs._2._2);
}
});
}
//调用 cogroup 算子实现功能:将两个RDD的元素按照key做连接。 它跟join实现的功能是一样的,但是它们的返回值不同。
public static void cogroupDemo(JavaSparkContext sc,List<Tuple2<Integer,String>> students,List<Tuple2<Integer,Integer>> stuScores){
JavaPairRDD<Integer,String> stuRdd = sc.parallelizePairs(students);
JavaPairRDD<Integer,Integer> scoreRdd = sc.parallelizePairs(stuScores);
JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> cogroupedRdd = stuRdd.cogroup(scoreRdd);
cogroupedRdd.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>>>() {
public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> pairs) throws Exception {
System.out.println(pairs._1 + " : " + pairs._2._1 + " : " + pairs._2._2);
}
});
}
}
版' ref='nofollow'>Spark 常用的 Transformation 算子示例 ===> Java 版
原文:https://www.cnblogs.com/rabbit624/p/10656567.html
内容总结
以上是互联网集市为您收集整理的Spark 常用的 Transformation 算子示例 ===> Java 版全部内容,希望文章能够帮你解决Spark 常用的 Transformation 算子示例 ===> Java 版所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。