【java – Kafka – 经纪人:团队协调员不可用】教程文章相关的互联网学习教程文章

java – Kafka – producer – 句柄“发送失败”

我正在运行0.8 Kafka,并使用提供的Java API构建生产者.发送消息(或消息)的API函数返回void. 有没有办法获取已发送邮件的状态?如果发送或失败? 这对我们来说非常重要,因为我们正在从文件中读取消息,并且我们希望在发送所有消息后删除该文件.但如果有错误并且没有发送一些消息,我删除了该文件将导致丢失非常重要的数据.解决方法:您可以将生产者配置为等待它从Kafka集群获取n acks(request.required.acks),以便在删除源文件之前保证...

java – Spring Integration Kafka适配器不生成消息【代码】

我现在正在奋斗这几天. 我在Spring-boot容器下使用SI适配器用于kafka. 我在我的机器上配置了zookeeper和kafka.我还创建了控制台生产者和消费者测试它,一切正常(我设法生成控制台消息并让控制台消费者使用它们). 我现在尝试通过Spring集成kafka出站适配器生成消息,但控制台消费者不会使用该消息 SI / Spring xd xml:<int:publish-subscribe-channel id="inputToKafka"/><int-kafka:outbound-channel-adapter id="kafkaOutboundChan...

java – kafka-log4j-appender 0.9不起作用【代码】

我在我的log4j.properties中添加了一个log4j kafka appender,但它没有像我预期的那样工作. 在我发布这个问题之前,我根据this similar question on stackoverflow检查了我的log4j.properties,大约是0.8.但是,我不是运气. 这是我的log4j.propertieslog4j.appender.Kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender log4j.appender.Kafka.topic=my-topic log4j.appender.Kafka.brokerList=localhost:9092 log4j.appender.Kaf...

java – Consumer.endOffsets如何在Kafka中运行?【代码】

假设我有一个无限期运行的计时器任务,它迭代kafka集群中的所有使用者组,并为每个组的所有分区输出滞后,提交的偏移量和结束偏移量.与Kafka控制台消费者组脚本的工作方式类似,但适用于所有组. 就像是 单个消费者 – 不工作 – 不返回某些提供的主题分区的偏移量(例如,提供10个 – 返回5个偏移量)Consumer consumer;static {consumer = createConsumer(); }run() { List<String> groupIds = getConsumerGroups();for(String groupId: ...

java – Kafka突然重置消费者Offset【代码】

我正在和Kafka 0.8& zookeeper 3.3.5.实际上,我们有十几个主题,我们正在消费没有任何问题. 最近,我们开始提供并消费一个有奇怪行为的新主题.消耗的偏移突然重置.它尊重我们设置的auto.offset.reset策略(实际上是最小的),但我无法理解为什么该主题突然重置其偏移量. 我正在使用高级消费者. 以下是我发现的一些错误日志:我们有一堆这个错误日志:[2015-03-26 05:21:17,789] INFO Fetching metadata from broker id:1,host:172.16.23...

java – Generic Spring Kafka Listener【代码】

我在一个maven项目中创建了一个kafka制作人和消费者. 我想在另一个maven项目中使用它,所以我添加了上述kafka项目的依赖项.现在的问题是生产者是好的,但如何使监听器通用,可以被添加此项目的所有其他项目覆盖. 目前我在一个项目中有Listenerpublic class Listener {public CountDownLatch countDownLatch0 = new CountDownLatch(3);public CountDownLatch countDownLatch1 = new CountDownLatch(3);public CountDownLatch countDown...

java – Kafka Connect SourceTask的轮询间隔

我正在使用Kafka-Connect API实现自定义Source Connector,可用于轮询REST-API并将JSON响应接收到Kafka主题中.现在我想知道如何实现SourceTask的轮询间隔,JDBC Connector如何提供.某处我必须将线程设置为睡眠状态,但我必须在哪里执行此操作?解决方法:我通过添加long类型的私有字段来存储时间戳,从而在SourceTask实现中解决了这个用例.在第一次poll()调用时,该字段尚未初始化,因此将对已配置的REST-API进行轮询.在第一次调用时,所提...

java – Spring Kafka-将KafkaTemplate与Producer Listener配置并使用Listenable Future注册回调之间的区别【代码】

所以我正在阅读Spring kafka文档并遇到了Producer Listener.这就是Spring Kafka文档所说的 – “或者,您可以使用ProducerListener配置KafkaTemplate,以获取带有发送结果(成功或失败)的异步回调,而不是等待Future完成.” 他们还指定了界面 – public interface ProducerListener<K, V> {void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata);void one rror(String topic, Integer par...

java – 默认情况下如何使用Kafka Spring Cloud Stream并使用汇合API生成的Kafka消息?【代码】

我正在构建一个微服务组件,它将默认使用由其他(SCS)组件生成的Spring Cloud Stream(SCS)Kafka消息. 但我还要求使用来自使用汇合API的其他组件的Kafka消息. 我有一个示例存储库,显示我正在尝试做的事情. https://github.com/donalthurley/KafkaConsumeScsAndConfluent 这是下面的应用程序配置,带有SCS输入绑定和汇合输入绑定.spring:application:name: kafkakafka:consumer:properties.schema.registry.url: http://192.168.99.100:...

java – object kafka不是包org.apache的成员【代码】

将java驱动程序导入我的scala项目时,我在编译时遇到以下消息:对象kafka不是包org.apache的成员.以下是我的import语句的设置方法:import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata} import org.apache.kafka.common.serialization.Serializer我已经尝试过几次运行激活剂清洁和活化剂清洁文件已经取得了任何成功. 编辑:这可能会有所帮助解决方法:事实证明,SBT和Activator的一...

java – Spring Kafka JsonSerializer用法

我想按照这里的说明操作: http://docs.spring.io/spring-kafka/docs/1.1.1.RELEASE/reference/htmlsingle/#_serialization_deserialization_and_message_conversion 设置一个KafkaTemplate,它可以序列化并发送我拥有的一些简单的Java POJO.但我发现文档含糊不清,特别是这一部分:For this purpose Spring for Apache Kafka also providesJsonSerializer/JsonDeserializer implementations based on the JacksonJSON processor. Wh...

java – 无法在kafka consumer下设置’max.poll.records’,其中cons.poll仍然返回分区下的所有记录【代码】

我创建了多线程消费者应用程序来处理各种分区.查看各种博客,我开始了解’max.poll.records’属性,以便控制来自给定主题,分区的记录集.(因此它可以很快从记录循环中出来,因此调用cons.poll ()保持活力) 问题是我的处理逻辑需要时间来处理每条记录.在启动Cons-2时,两者都开始在相同的分区上工作,因为Cons-1仍未进行重新平衡(即尚未发生cons.poll()). 增加消费者以便他们可以重新平衡他们自己,cons.poll()将不会发生,除非处理所有记录...

java – Kafka使用者异常和偏移提交【代码】

我一直在尝试为Spring Kafka做一些POC工作.具体来说,我想尝试在Kafka中消费消息时处理错误的最佳实践. 我想知道是否有人能够提供帮助: >分享围绕Kafka消费者应该做的最佳实践当出现故障时>帮助我了解AckMode Record如何工作,以及在侦听器方法中抛出异常时如何防止提交到Kafka偏移队列. 2的代码示例如下: 鉴于AckMode设置为RECORD,根据documentation:commit the offset when the listener returns after processing therecord.我...

java – spring kafka thorws设置并发后的InstanceAlreadyExistsException异常> 1【代码】

我正在使用spring-kafka,如果我没有设置ConcurrentKafkaListenerContainerFactory的并发性,一切正常,当我将它设置为大于1的数字时,我得到一个异常:javax.management.InstanceAlreadyExistsException:kafka.consumer:type=app-info,id=client-3我的配置:@Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factor...

java – Kafka Producer Exception NoClassDefFoundError【代码】

我对kafka Producer有一些问题,但我不知道我怎么能解决它 我的Maven依赖:<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.10</artifactId><version>0.10.1.1</version> </dependency>如果我创建:Producer<String, byte[]> producer = createKafkaProducer();我变成了例外:java.lang.NoClassDefFoundError: org/apache/kafka/clients/producer/Producer at de.dienes.opitz.node.NodesValue.onSubscription...