flink 使用processFunction函数的sideOutPut实现filter操作(java版)
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了flink 使用processFunction函数的sideOutPut实现filter操作(java版),小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含1953字,纯文字阅读大概需要3分钟。
内容图文
![flink 使用processFunction函数的sideOutPut实现filter操作(java版)](/upload/InfoBanner/zyjiaocheng/617/b48554c619d74ae1b4ec4dc91261f66c.jpg)
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; /** * # _*_ coding:utf-8 _*_ * # Author:xiaoshubiao * # Time : 2020/12/14 19:14 **/ public class processFunction_test { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> localhost = executionEnvironment.socketTextStream("localhost", 1111); // 输入a,1这样的数据,通过ProcessFunction的sideOutPut实现filter的操作 // 区分是否属于a SingleOutputStreamOperator<Tuple2<String, Integer>> map = localhost.map( new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String s) throws Exception { String[] split = s.split(","); return new Tuple2<>(split[0], Integer.valueOf(split[1])); } } ); SingleOutputStreamOperator<String> process = map.process( /* *参数一:输入数据类型。比如(a,1) * 参数二:输出数据类型。比如(属于a) * */ new ProcessFunction<Tuple2<String, Integer>, String>() { @Override public void processElement(Tuple2<String, Integer> stringIntegerTuple2, Context context, Collector<String> collector) throws Exception { if (stringIntegerTuple2.f0.equals("a")) { // 直接返回 collector.collect("属于a"); } else { // 通过上下文输出,定义输出标签和值 context.output(new OutputTag<String>("is_not_a"){}, "不属于a"); } } } ); process.print("is_a"); process.getSideOutput(new OutputTag<String>("is_not_a"){}).print("is_not_a"); executionEnvironment.execute(); } } /* * 输入 输出 * a,1 属于a * b,1 不属于a * c,1 不属于a * * * */
内容总结
以上是互联网集市为您收集整理的flink 使用processFunction函数的sideOutPut实现filter操作(java版)全部内容,希望文章能够帮你解决flink 使用processFunction函数的sideOutPut实现filter操作(java版)所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。