java – Kafka生产者发送无效字符
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了java – Kafka生产者发送无效字符,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含2748字,纯文字阅读大概需要4分钟。
内容图文
![java – Kafka生产者发送无效字符](/upload/InfoBanner/zyjiaocheng/773/2f1d3943ce7c40599c41d8d5bee4c471.jpg)
使用以下代码,我发送Elasticsearch文档以进行索引.我尝试将基本对象转换为JSON并通过制作人发送.但是,每条消息(从控制台检查)都附加了像 – t {“productId”:2455这样的乱码字符
public boolean sendMessage()
{
PageRequest page = new PageRequest(0, 1);
Product p = product.findByName("Cream", page).getContent().get(0);
String json = "";
ObjectMapper mapper = new ObjectMapper();
try {
json = mapper.writeValueAsString(p);
} catch (JsonProcessingException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
logger.info("JSON = " + json);
boolean status = inputToKafka.send(org.springframework.integration.support.MessageBuilder.withPayload(json).build());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return status;
}
出站配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int:channel id="inputToKafka">
<int:queue/>
</int:channel>
<int-kafka:outbound-channel-adapter
id="kafkaOutboundChannelAdapter"
kafka-producer-context-ref="kafkaProducerContext"
channel="inputToKafka">
<int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
</int-kafka:outbound-channel-adapter>
<task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="500"/>
<int-kafka:producer-context id="kafkaProducerContext">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration broker-list="localhost:9092"
topic="test_topic"
compression-codec="default"/>
</int-kafka:producer-configurations>
</int-kafka:producer-context>
</beans>
任何线索?
使用的插件:Spring Extension Kafka
解决方法:
我今天遇到了这个问题,并通过在生产者配置中设置正确的value-serializer类来解决它,如下所示:
<int-kafka:producer-configuration
broker-list="localhost:9092" topic="headers['kafka_topic']"
key-class-type="java.lang.String" value-class-type="java.lang.String"
key-serializer="kafkaSerializer" value-serializer="kafkaSerializer"/>
<bean id="kafkaSerializer" class="org.apache.kafka.common.serialization.StringSerializer" />
内容总结
以上是互联网集市为您收集整理的java – Kafka生产者发送无效字符全部内容,希望文章能够帮你解决java – Kafka生产者发送无效字符所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。