java – 默认情况下如何使用Kafka Spring Cloud Stream并使用汇合API生成的Kafka消息?
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了java – 默认情况下如何使用Kafka Spring Cloud Stream并使用汇合API生成的Kafka消息?,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含1753字,纯文字阅读大概需要3分钟。
内容图文
![java – 默认情况下如何使用Kafka Spring Cloud Stream并使用汇合API生成的Kafka消息?](/upload/InfoBanner/zyjiaocheng/789/e857ff77c1e84a5c90ed24b5289d6fab.jpg)
我正在构建一个微服务组件,它将默认使用由其他(SCS)组件生成的Spring Cloud Stream(SCS)Kafka消息.
但我还要求使用来自使用汇合API的其他组件的Kafka消息.
我有一个示例存储库,显示我正在尝试做的事情.
https://github.com/donalthurley/KafkaConsumeScsAndConfluent
这是下面的应用程序配置,带有SCS输入绑定和汇合输入绑定.
spring:
application:
name: kafka
kafka:
consumer:
properties.schema.registry.url: http://192.168.99.100:8081
cloud:
stream:
kafka:
binder:
brokers: PLAINTEXT://192.168.99.100:9092
# configuration:
# specific:
# avro:
# reader: true
# key:
# deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value:
# deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
bindings:
inputConfluent:
contentType: application/*+avro
destination: confluent-destination
group: input-confluent-group
inputScs:
contentType: application/*+avro
destination: scs-destination
group: input-scs-group
通过上面的配置,我可以使用SCS默认配置创建两个使用者
例如,类org.apache.kafka.common.serialization.ByteArrayDeserializer是两个输入绑定的值反序列化器.
如果我删除上述配置中的注释,我会让两个消费者从Confluent客户端发送配置
例如,类io.confluent.kafka.serializers.KafkaAvroDeserializer是两个输入绑定的值反序列化器.
我理解,因为配置在Kafka活页夹上,它将应用于使用该活页夹定义的所有消费者.
有没有什么方法可以定义这些特定的属性,以便它们只应用于汇合的特定使用者绑定,所有其他输入绑定可以使用默认的SCS配置?
解决方法:
您可以通过配置属性设置特定于绑定的使用者和生产者属性.
spring.cloud.stream.kafka.bindings.<channelName>.consumer.configuration.foo.bar=baz
使用非标准序列化器/反序列化器时,必须分别为生产者和使用者设置useNativeEncoding和useNativeDecoding.再次参见参考手册.
内容总结
以上是互联网集市为您收集整理的java – 默认情况下如何使用Kafka Spring Cloud Stream并使用汇合API生成的Kafka消息?全部内容,希望文章能够帮你解决java – 默认情况下如何使用Kafka Spring Cloud Stream并使用汇合API生成的Kafka消息?所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。