【java – Kafka – 经纪人:团队协调员不可用】教程文章相关的互联网学习教程文章

java – 为什么Apache Kafka使用者不使用Log4j2根记录器?【代码】

我有这个配置: 的pom.xml<dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>2.11.1</version> </dependency> <dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.11.1</version> </dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.1</version> </depend...

使用Java更新kafka中特定主题的TTL【代码】

更新主题的TTL,以便记录在主题中保留10天.我必须为特定主题执行此操作,只需将所有其他主题TTL保持相同,当前配置,我必须使用java执行此操作,因为我正在通过Java将主题推送到kafka.我正在设置以下属性以将主题推送到kafkaProperties props = new Properties(); props.put("bootstrap.servers", KAFKA_SERVERS); props.put("acks", ACKS); props.put("retries", RETRIES); props.put("linger.ms", new Integer(LINGER_MS)); props.put...

java – Kafka Streams本地国营商店【代码】

我有一个简单的流应用程序将一个主题作为输入流并将KeyValues转换为另一个,如:StoreBuilder<KeyValueStore<Long, CategoryDto>> builder =Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(CategoryTransformer.STORE_NAME),Serdes.Long(), CATEGORY_JSON_SERDE);streamsBuilder.addStateStore(builder).stream(categoryTopic, Consumed.with(Serdes.Long(), CATEGORY_JSON_SERDE)).transform(CategoryTransformer::new...

java – 春天Kafka听正则表达式【代码】

我试图用下面的代码听新创建的主题,但是没有用.如果下面的代码是正确的,你能告诉我吗?public class KafkaMessageListener {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);private final ProcessEventModel eventModel;@KafkaListener(topicPattern = "betsyncDataTopic*")public void receive(ConsumerRecord<String, String> consumerRecord) {LOGGER.info("received payload at '...

Kafka-Java生产者配置属性中文对照

写在前面,内容来源于 Kafka中文网。更多详情请自行查阅 http://kafka.apachecn.org/ 名称 描述 类型 默认值 有效值bootstrap.servers用于与Kafka集群客户端进行初始化连接, 连接成功以后, 客户端会负责负载均衡的与集群中所有机器建立连接。list

java – Kafka Consumer将过多的DEBUG语句输出到控制台(ecilpse)【代码】

我正在运行一些来自http://www.javaworld.com/article/3060078/big-data/big-data-messaging-with-kafka-part-1.html?page=2的示例代码,并且kafkaconsumer正在根据需要从主题中消耗,但每次轮询都会导致许多调试日志的打印(到标准输出),这是我不想要的. 我已经尝试在/config/log4j.properties中将所有INFO和DEBUG更改为ERROR(甚至确实是grep以确保),特别是设置log4j.logger.kafka = ERROR,kafkaAppender,但问题仍然存在.我提到了How...

Spring Boot Java Kafka配置,覆盖端口【代码】

我使用Spring Boot Kafka.这是我目前对Kafka非常简单的配置:@Configuration @EnableKafka public class KafkaConfig {}此配置非常正常,并且能够在默认Kafka端口上连接到Kafka实例:9092 现在我需要改变端口,让我们说9093. 如何更新此Kafka配置以便能够在9093上连接?解决方法:我认为你的属性文件中的这样的东西会起作用spring.kafka.bootstrap-servers=localhost:9093您可以指定逗号分隔的host:port列表

java – 在Apache Kafka中读取消息偏移量

我对Kafka很新,我们正在使用Kafka 0.8.1. 我需要做的是从主题中消费一条消息.为此,我将不得不用Java编写一个使用者,它将使用来自主题的消息,然后将该消息保存到数据库.保存消息后,将向Java使用者发送一些确认.如果确认为真,则应从主题中消耗下一条消息.如果confirmldgement为false(这意味着由于某些错误消息,从主题中读取,无法保存到数据库中),则应再次读取该消息. 我想我需要使用Simple Consumer来控制消息偏移,并且已经通过了这个...

java – 从Kafka多次读同一条消息【代码】

我使用Spring Kafka API来实现手动偏移管理的Kafka消费者:@KafkaListener(topics = "some_topic") public void onMessage(@Payload Message message, Acknowledgment acknowledgment) {if (someCondition) {acknowledgment.acknowledge();} }在这里,我希望消费者只有在someCondition成立时才提交偏移量.否则,消费者应该睡一段时间并再次阅读相同的消息. 卡夫卡配置:@Bean public ConcurrentKafkaListenerContainerFactory<String...

如何在java程序中获取kafka消耗滞后【代码】

我写了一个java程序来消耗来自kafka的消息.我想监视消耗滞后,如何通过java获取它? 顺便说一句,我使用:<groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.1.1</version>提前致谢.解决方法:我个人直接查询来自我的消费者的jmx信息.我只在java中使用JMX bean:kafka.consumer:type = consumer-fetch-manager-metrics,client-id = * / records-lag-max可用. 如果jolokia在你的类路径中,你可以...

java – kafka生产者非常慢【代码】

我是卡夫卡的新人,我有一个我无法解决的问题. 我已经在我自己的计算机上安装了Kafka和Zookeeper(不是在Linux中),我创建了一个主题包含多个分区(在6到12个分区之间播放)的代理. 当我创建消费者时,他们完美地工作并且阅读速度很快,但是在引用制作人的时候,我创建了一个可以在许多网站上看到的简单制作人.生产者在循环内部并发送许多短消息(大约2000条非常短的消息). 我可以看到消费者非常谨慎地阅读2000条消息,但是生产者每秒或多或少...

java – 调试自定义Kafka连接器的简单有效方法是什么?

我正在使用几个Kafka连接器,我在控制台输出中没有看到它们的创建/部署中的任何错误,但是我没有得到我正在寻找的结果(没有任何结果,无论如何,期望或除此以外).我根据Kafka的示例FileStream连接器制作了这些连接器,因此我的调试技术基于使用示例中使用的SLF4J Logger.我已经搜索了我认为会在控制台输出中生成的日志消息,但无济于事.我在错误的地方找这些消息吗?或者是否有更好的方法来调试这些连接器? 我为实现引用的SLF4J Logger的...

java – 为什么我的Kafka Consumer在首次运行时会快速消耗消息,但在将来的运行中会大幅减慢消息?【代码】

我是一名研究和玩卡夫卡的学生.在关注Apache文档的示例后,我正在使用当前Github仓库中的示例部分. 截至目前,该示例实现了其消费者的“较旧”版本,并且未使用新的KafkaConsumer.在文档之后,我编写了自己的KafkaConsumer版本,认为它会更快. 这是一个模糊的问题,但是在runthrough上我向主题“test”生成了5000条简单的消息,例如“Message_CurrentMessageNumber”,然后使用我的消费者获取这些消息并将它们打印到stdout.当我运行示例代码...

java – Kafka生产者发送无效字符【代码】

使用以下代码,我发送Elasticsearch文档以进行索引.我尝试将基本对象转换为JSON并通过制作人发送.但是,每条消息(从控制台检查)都附加了像 – t {“productId”:2455这样的乱码字符public boolean sendMessage() {PageRequest page = new PageRequest(0, 1); Product p = product.findByName("Cream", page).getContent().get(0);String json = "";ObjectMapper mapper = new ObjectMapper();try {json = mapper.writeValueAsStri...

java – 如何从Kafka访问记录中的文件路径并从中创建数据集?【代码】

我正在使用Java. 我收到了Kafka消息的文件路径.我需要将此文件加载到spark RDD中,处理它并将其转储到HDFS中. 我能够从Kafka消息中检索文件路径.我希望在这个文件上创建一个数据集/ RDD. 我无法在Kafka消息数据集上运行map函数.由于sparkContext在worker上不可用,因此它会因NPE而出错. 我不能在Kafka消息数据集上运行foreach.它错误地显示消息:Queries with streaming sources must be executed with writeStream.start();" 我无法...