【java – Kafka – 经纪人:团队协调员不可用】教程文章相关的互联网学习教程文章

java-Kafka JDBC连接器中的自定义分区分配【代码】

我有一个用例,其中我需要根据消息中的某些关键参数编写自定义逻辑来分配分区.我对此进行了一些研究,发现kafka转换支持重写Transformation接口中的某些方法,但是我无法在git hub或其他地方执行一些示例代码.有人可以共享示例代码或git hub链接在kafka JDBC源连接器中进行自定义分区分配吗? 提前致谢!.解决方法:Kafka默认使用以下方法分配分区:DefaultPartitioner(org.apache.kafka.clients.producer.internals.DefaultPartitione...

网易Java研发岗二面:讲讲 kafka 维护消费状态跟踪的方法【图】

网易面试真题:讲讲 kafka 维护消费状态跟踪的方法?解析:大部分消息系统在 broker 端的维护消息被消费的记录:一个消息被分发到consumer 后 broker 就马上进行标记或者等待 customer 的通知后进行标记。这样也可以在消息在消费后立马就删除以减少空间占用。但是这样会不会有什么问题呢?如果一条消息发送出去之后就立即被标记为消费过的,一旦 consumer 处理消息时失败了(比如程序崩溃)消息就丢失了。为了解决这个问题,很多消息...

java-如何在Spark Streaming中映射kafka主题名称和相应记录【代码】

我正在播放来自如下的kafka主题;JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(jssc,String.class, String.class,StringDecoder.class,StringDecoder.class,kafkaParams, topicSet);directKafkaStream.print(); 对于一个主题,输出如下所示:(null,"04/15/2015","18:44:14") (null,"04/15/2015","18:44:15") (null,"04/15/2015","18:44:16") (null,"04/15/2015","18:44:17") 如何映...

java-Spring Kafka在一个使用者中使用多种消息类型【代码】

我有多个生产者,可以将多种类型的事件发送到一个kafka主题. 我有一个必须使用所有类型消息的使用者.对每种消息使用不同的逻辑.@KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory") public void handleEvent(Message<EventOne> event) {logger.info("event={}", event); }但是在这种情况下,不仅EventOne,所有消息都传给该方法 如果我实现了两种方法(对于每种消息类型),那么所有消息都只会变成一种...

java-Kafka KStream-衡量消费者滞后

由于我的基于KStream的应用程序未遵循传统的Kafka消费者路线,我应如何跟踪消费者的滞后时间?通常我会使用ConsumerOffsetChecker(或类似的值),但是它需要使用者组名称. 我应该怎么用呢? (我想对此进行跟踪,以便我可以判断是否/何时启动新消费者)解决方法:Kafka Streams内部使用KafkaConsumer,并将应用程序ID用作使用者组ID.因此,您可以像监视其他使用者一样监视滞后. 查看http://docs.confluent.io/current/streams/developer-gui...

最好用的 Kafka Json Logger Java客户端,赶紧尝试一下【图】

最好用的 Kafka Json Logger Java客户端。slf4j4json 最好用的 Kafka Json Logger 库;不尝试一下可惜了! Description 一款为 Kafka 提供的 json logger 客户端,支持将 json 格式的 log 输出到 kafka、文件、控制台。 支持 slf4j 的全部功能。 比 KafkaLog4jAppender 更好用,可配置性更好。 支持 close logger, 在程序关闭之前 flush log to kafka。 支持链式编程模式,方便用户使用。 maven dependency添加repository<reposit...

java-使用Spark Streaming从Kafka读取数据时lz4异常【代码】

我试图使用火花流式API从kafka读取json数据,当我这样做时,它将引发java.lang.NoSuchMethodError:net.jpountz.lz4.LZ4BlockInputStream.init异常.堆栈跟踪为-java.lang.NoSuchMethodError: net.jpountz.lz4.LZ4BlockInputStream.<init>(Ljava/io/InputStream;Z)V at org.apache.spark.io.LZ4CompressionCodec.compressedInputStream(CompressionCodec.scala:122) at org.apache.spark.serializer.SerializerManager.wrapForCompres...

java-Spring Boot Embedded Kafka无法连接【代码】

我正在尝试为我的Kafka使用者编写集成测试.我已经遵循了official reference documentation,但是当我开始测试时,我只会看到这个重复的广告无限:-2019-04-03 15:47:34.002 WARN 13120 — [ main] org.apache.kafka.clients.NetworkClient : [ConsumerclientId=consumer-1, groupId=my-group] Connection to node -1 could notbe established. Broker may not be available.我究竟做错了什么? 我正在使用JUnit5,Spring ...

java-在消费者端通过kafka对UUID进行Avro自定义解码【代码】

我已经编写了一个类,用于将UUID类型的对象自定义编码为字节,以跨kafka和avro进行传输. 要使用此类,我在目标对象的uuid变量上方放置了一个@AvroEncode(using = UUIDAsBytesEncoding.class). (这是由Apache Avro反射库实现的) 我很难弄清楚如何让消费者自动使用自定义解码器. (或者我是否必须手动解码?). 这是我的UUIDAsBytesEncoder扩展CustomEncoding类:public class UUIDAsBytesEncoding extends CustomEncoding<UUID> {public ...

java-Kafka-如何同时使用filter和filternot?【代码】

我有一个Kafka流,它从一个主题获取数据,并且需要将该信息过滤到两个不同的主题.KStream<String, Model> stream = builder.stream(Serdes.String(), specificAvroSerde, "not-filtered-topic"); stream.filter((key, value) -> new Processor().test(key, value)).to(Serdes.String(), specificAvroSerde, "good-topic"); stream.filterNot((key, value) -> new Processor().test(key, value)).to(Serdes.String(), specificAvroSer...

java-在Kafka连接器中设置分区策略【代码】

我正在使用自定义的Kafka连接器(使用Kafka Connect的Java API用Java编写)从外部源提取数据并存储在主题中.我需要设置自定义分区策略.我了解可以通过设置partitioner.class property在Kafka Producer中设置自定义partitioner.但是,此属性对于Kafka连接器似乎没有任何作用.如何配置Kafka Connect(我使用独立连接脚本来运行连接器)以使用编写的自定义分区程序?解决方法:源连接器可以通过SourceRecord的partition字段控制将每个源记录...

Java-Kafka崩溃时,Kafka使用者挂起轮询【代码】

我一直在研究Zookeeper和Kafka的基本设置,以学习如何使用它,但是我在与消费者打交道时遇到了麻烦.当Kafka不可用时,对poll()方法的调用将挂起,直到它重新联机. 卡夫卡版本:0.10.1.0 我的代码如下所示:KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props); consumer.subscribe(topics);while (!stopped) {// If by any reason Kafka is not available this call will hang// until Kafka is back online.records...

java-如何将两个Kafka流结合在一起,并在具有Avro值的主题中产生结果【代码】

我有两个Kafka Streams,它们具有String键和我使用KSQL创建的Avro格式的值. 这是第一个:DESCRIBE EXTENDED STREAM_1; Type : STREAM Key field : IDUSER Timestamp field : Not set - using <ROWTIME> Key format : STRING Value format : AVRO Kafka output topic : STREAM_1 (partitions: 4, replication: 1)Field | Type -------------------------...

java-如何将Apache Kafka与Amazon S3连接?【代码】

我想使用Kafka Connect将数据从Kafka存储到存储桶s3中.我已经在运行一个Kafka的主题,并且创建了一个s3存储桶.我的主题包含有关Protobuffer的数据,我尝试使用https://github.com/qubole/streamx并获得了下一个错误:[2018-10-04 13:35:46,512] INFO Revoking previously assigned partitions [] for group connect-s3-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:280)[2018-10-04 13:35:46,512] INFO (...

java-Spring启动application.yml中的Spring Kafka SSL设置【代码】

我正在尝试使用Kafka客户端设置Spring Boot应用程序以使用SSL.由于以下原因,我将我的keystore.jks和truststore.jks存储在文件系统上(在Docker容器上):https://github.com/spring-projects/spring-kafka/issues/710 这是我的application.yml:spring:kafka:ssl:key-password: passkeystore-location: /tmp/kafka.client.keystore.jkskeystore-password: passtruststore-location: /tmp/kafka.client.truststore.jkstruststore-pass...