java-Kafka-如何同时使用filter和filternot?
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了java-Kafka-如何同时使用filter和filternot?,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含1872字,纯文字阅读大概需要3分钟。
内容图文
![java-Kafka-如何同时使用filter和filternot?](/upload/InfoBanner/zyjiaocheng/684/ec7cab0712074f198a7b05901166839b.jpg)
我有一个Kafka流,它从一个主题获取数据,并且需要将该信息过滤到两个不同的主题.
KStream<String, Model> stream = builder.stream(Serdes.String(), specificAvroSerde, "not-filtered-topic");
stream.filter((key, value) -> new Processor().test(key, value)).to(Serdes.String(), specificAvroSerde, "good-topic");
stream.filterNot((key, value) -> new Processor().test(key, value)).to(Serdes.String(), specificAvroSerde, "bad-topic");
但是,当我这样做时,它将两次从主题读取数据-不确定随着数据变大,这是否会对性能产生影响.有没有办法只过滤一次并将其推送到两个主题?
解决方法:
您的方法是正确的,并且不会从该主题两次读取数据,并且也没有进行内部数据复制.这种方法的唯一缺点是,每个记录都会对两个过滤谓词进行评估-但是,这非常便宜,并且不应该成为性能问题.
但是,您仍然可以通过使用KStream#branch()来提高性能,该方法确实接受多个谓词并彼此求值,然后为每个谓词返回一个输入流.如果一条记录与谓词匹配,则将其放入相应的输出流中,并停止求值(即,不对该单个记录进行进一步的谓词求值;这确保将每条记录添加到最大一个输出流中;否则,将其删除)没有谓词匹配).
因此,您可以为branch()提供两个谓词:第一个与您的原始filter()谓词相同,第二个谓词始终返回true.
KStream<String, Model> stream = builder.stream(
Serdes.String(),
specificAvroSerde,
"not-filtered-topic"
);
KStream[] splitStreams = stream.branch(
(key, value) -> new Processor().test(key,value),
(key, value) -> true
);
splitStreams[0].to(Serdes.String(), specificAvroSerde, "good-topic");
splitStreams[1].to(Serdes.String(), specificAvroSerde, "bad-topic");
但不确定此代码是否比原始版本可读性更好.我想这是一个问题,我个人更喜欢您的原始代码,因为它确实更好地表达了语义.
我添加的版本应该稍微提高CPU效率,因为对于所有确实满足该谓词的记录,它只会被评估一次.对于所有不满足该结果的记录,将返回一个简单的true(即,没有第二个谓词求值).
如果您知道大多数记录将以splitStream [1]结尾,那么您还可以反转谓词(并将splitStream [0]用作“不良流”)以减少对第二个真返回谓词的调用次数.但是,这些只是微优化,不应有问题.
内容总结
以上是互联网集市为您收集整理的java-Kafka-如何同时使用filter和filternot?全部内容,希望文章能够帮你解决java-Kafka-如何同时使用filter和filternot?所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。