【java – Kafka – 经纪人:团队协调员不可用】教程文章相关的互联网学习教程文章

kafka.consumer.SimpleConsumer:由于套接字错误而重新连接:java.nio.channels.ClosedChannelException【代码】

我正在为kafka运行一个简单的消费者,例如:int timeout = 80000; int bufferSize = 64*1024; consumer = new SimpleConsumer(host, port,timeout, bufferSize, clientName);这可以正常运行几个小时,但出现异常稍后的kafka.consumer.SimpleConsumer:由于套接字错误而重新连接:java.nio.channels.ClosedChannelException消费者停止了……以前有人遇到过这个问题吗?解决方法:一个稍有不同的问题,但也许具有相同的根本原因和解决方案...

JAVA面试——kafka

1、kafka可以脱离zookeeper使用吗?为什么?kafka不能脱离zookeeper单独使用,因为kafka使用zookeeper管理和协调kafka的节点服务器。 2、kafka有几种数据保留的策略?kafka有两种数据保存策略:按照过期时间保留和 按照存储的消息大小保留。 3、Kafka同时设置了7天和10G清除数据,到第五天的消息达到了10G,这个时候kafka将如何处理?这个时候kafka会执行数据清除工作,时间和大小不论满足条件,都会清空数据。 4、什...

java-Kafka流:从应用程序的每个实例中的所有分区读取【代码】

当使用KTable时,当实例/使用者数等于分区数时,Kafka流不允许实例从特定主题的多个分区中读取.我尝试使用GlobalKTable来实现这一点,但问题是数据将被覆盖,并且聚合也无法应用于其上. 假设我有一个名为“ data_in”的主题,具有3个分区(P1,P2,P3).当我运行Kafka流应用程序的3个实例(I1,I2,I3)时,我希望每个实例都从“ data_in”的所有分区中读取数据.我的意思是,I1可以从P1,P2和P3读取,I2可以从P1,P2和P3,I2以及其他方式读取. 编辑:请...

java – 无法在IDE中删除Kafka Stream Application的状态目录【代码】

我正在开发一个简单的Kafka Stream应用程序,它从主题中提取消息并在转换后将其放入另一个主题.我正在使用Intelij进行开发. 当我调试/运行这个应用程序时,如果我的IDE和Kafka服务器位于SAME机器中,它将完美运行(i.e. with the BOOTSTRAP_SERVERS_CONFIG = localhost:9092 andSCHEMA_REGISTRY_URL_CONFIG = localhost:8081)但是,当我尝试使用另一台机器进行开发时(i.e. with the BOOTSTRAP_SERVERS_CONFIG = XXX.XXX.XXX:9092 andSCH...

java – Kafka Streams错误 – 分区上的偏移提交失败,请求超时【代码】

我们使用Kafka Streams来消费,处理和生成消息,而在PROD环境中,我们遇到了多个主题的错误:ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=app-xxx-StreamThread-3-consumer, groupId=app] Offset commit failed on partition xxx-1 at offset 13920: The request timed out.[]对于负载较小的主题,这些错误很少发生,但对于具有高负载(和峰值)的主题,每个主题每天会发生数十次错误....

Kafka Java Producer与kerberos【代码】

在kerberosed环境中向kafka主题发送消息时收到错误.我们在hdp 2.3上有集群 我跟着这个http://henning.kropponline.de/2016/02/21/secure-kafka-java-producer-with-kerberos/ 但是对于发送消息,我必须首先明确地执行kinit,然后才能将消息发送到kafka主题.我试图通过java类编织,但这也行不通.PFB代码:package com.ct.test.kafka;import java.util.Date; import java.util.Properties; import java.util.Random;import kafka.javaap...

java – kafka获取主题的分区计数【代码】

如何从代码中获取任何kafka主题的分区数.我研究了许多链接,但似乎都没有. 提到一些: http://grokbase.com/t/kafka/users/148132gdzk/find-topic-partition-count-through-simpleclient-api http://grokbase.com/t/kafka/users/151cv3htga/get-replication-and-partition-count-of-a-topic http://qnalist.com/questions/5809219/get-replication-and-partition-count-of-a-topic 看起来像是类似的讨论. 在SO上也有类似的链接,没有...

将自定义Java对象发送到Kafka主题【代码】

我有我的自定义Java对象,并希望利用内置序列化中的JVM将其发送到Kafka主题,但序列化失败,出现以下错误org.apache.kafka.common.errors.SerializationException: Can’t convertvalue of class com.spring.kafka.Payload to classorg.apache.kafka.common.serialization.ByteArraySerializer specified invalue.serializerPayload.javapublic class Payload implements Serializable {private static final long serialVersionUID =...

java – 如何检查Kafka Server是否正在运行?【代码】

我想在开始生产和消费工作之前确保kafka服务器是否正在运行.它是在windows环境中,这是我的kafka服务器在eclipse中的代码…Properties kafka = new Properties(); kafka.setProperty("broker.id", "1"); kafka.setProperty("port", "9092"); kafka.setProperty("log.dirs", "D://workspace//"); kafka.setProperty("zookeeper.connect", "localhost:2181"); Option<String> option = Option.empty(); KafkaConfig config = new ...

java – 使用kafka和jpa时的好习惯【代码】

我目前正在使用JPA和Kafka的项目.我正在尝试找到一组合并这些操作的良好实践. 在现有代码中,生产者在与jpa相同的事务中使用,但是,根据我的阅读,似乎他们不共享事务.@PostMapping @Transactional public XDto createX(@RequestBody XRequest request) {Xdto dto = xService.create(request);kafkaProducer.putToQueue(dto, Type.CREATE);return dto; }其中kafka生产者的定义如下:public class KafkaProducer {@Autowiredprivate Ka...

java – Kafka – 经纪人:团队协调员不可用【代码】

我有以下结构:zookeeper: 3.4.12 kafka: kafka_2.11-1.1.0 server1: zookeeper + kafka server2: zookeeper + kafka server3: zookeeper + kafka使用kafka-topics shell脚本创建具有复制因子3和分区3的主题../kafka-topics.sh --create --zookeeper localhost:2181 --topic test-flow --partitions 3 --replication-factor 3并使用group localConsumers.当领导没事的时候它工作正常../kafka-topics.sh --describe --zookeeper loc...

java – 用spring管理Kafka主题【代码】

我们计划在我们的应用程序中使用Kafka排队.我在RabbitMQ和Spring方面有一些经验. 使用RabbitMQ和Spring,我们曾经在启动spring服务时管理队列创建. 有了Kafka,我不确定什么是创建主题的最佳方式?有没有办法用Spring管理主题. 或者,我们应该编写一个单独的脚本来帮助创建主题吗?维护一个单独的脚本来创建主题对我来说似乎有点奇怪. 任何建议将不胜感激.解决方法:在Spring中,可以使用bean在应用程序启动期间创建主题:@Bean public ...

java – 为什么kafka流会重新处理代理重启后生成的消息【代码】

我有一个单节点kafka代理和简单的流应用程序.我创建了2个主题(topic1和topic2). 在topic1上生成 – 处理过的消息 – 写入topic2 注意:对于生成的每条消息,只有一条消息写入目标主题 我制作了一条消息.在写入topic2之后,我停止了kafka经纪人.一段时间后,我重新启动了代理并在topic1上生成了另一条消息.现在流应用程序处理了该消息3次.现在,在不停止代理的情况下,我向topic1发出了消息,并等待流应用程序在再次生成之前写入topic2. S...

java – 预先验证发送到Kafka主题的消息

是否可以验证/过滤发送到Kafka主题的消息? 就像,我想确保只有有效的客户端/生产者向我的主题发送消息.我当然可以通过根据某些参数/标准丢弃无效消息来在消费者方面进行验证.但是,如果我想在将消息写入主题之前执行该操作,该怎么办? 比如说,Kafka会收到一条消息,执行一些验证,然后决定是否需要丢弃或将该消息写入主题.这可能吗?解决方法:简短的回答 – 当前版本的Kafka不支持开箱即用的此类功能.由于Kafka生产者被设计为在单个会...

Kafka无法从头开始阅读–Java

我是kafka的新手,并尝试使用kafka构建一个生产者 – 消费者应用程序.在这里,我能够向kalka发送消息,但是当我尝试使用消费者消费它时,它返回0条记录. 我检查了我的消费者组的偏移量,我可以看到偏移量等于日志长度是相同的(在我的情况下为1M – 与记录数相同). 如果我在创建我的消费者时使用此配置属性,则从头开始阅读. configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,“earliest”); 但我的要求是,如果我重启消费者...