【java – Kafka – 经纪人:团队协调员不可用】教程文章相关的互联网学习教程文章

Java架构师带你深入理解Kafka:核心设计与实践原理【图】

Kafka简介Kafka 是 LinkedIn 开发并开源的一套分布式的高性能消息引擎服务,后来被越来越多的公司应用在自己的系统中,可以说,Kafka 是大数据时代数据管道技术的的首选。在设计的时候,它就实现了高可靠、高吞吐、高可用和可伸缩,得益于这些特性,加上活跃的社区,Kafka 成为了一个完备的分布式消息引擎解决方案。历经多年发展,Kafka 的功能和特性也在不断迭代,如今的 Kafka 集消息系统、存储系统和流式处理平台于一身,并作为...

java – Kafka Streams:如何更改记录时间戳(0.11.0)?【代码】

我正在使用FluentD(第12版稳定版)向Kafka发送消息.但是FluentD使用旧的KafkaProducer,因此记录时间戳始终设置为-1.因此,当消息到达kafka时,我必须使用WallclockTimestampExtractor将记录的时间戳设置为时间点. 我真正感兴趣的时间戳是由流利的信息发送的:“timestamp”:”1507885936″,”host”:”V.X.Y.Z.”卡夫卡的记录表示:offset = 0, timestamp= – 1, key = null, value = {“timestamp”:”1507885936″,”host”:”V.X.Y...

如何找回Kafka生产者和消费者配置(Java API)?

用例如下.我在Java代码中通过许多对象实例传递生产者或消费者引用.在其中一些我想对Kafka配置进行一些检查.这意味着我想回来,Kafka Producer / Consumer中存储了哪些有效的配置(包括默认值).我没有看到java docs中的anthing: > KafkaProducer> KafkaConsumer 那么,如何找回Kafka生产者和消费者配置呢?解决方法:不幸的是,这是不可能的.我不得不承认它至少可以显示“核心”配置属性(例如,避免获得认证内容的“秘密”的可能性).我今...

报错:java.lang.AbstractMethodError: nl.techop.kafka.KafkaHttpMetricsReporter.logger()Lcom/typesafe/sca【代码】【图】

报错背景: CDH启动kafka的时候出现报错情况,并且报错的节点挂掉。 报错现象: Exiting Kafka due to fatal exception java.lang.AbstractMethodError: nl.techop.kafka.KafkaHttpMetricsReporter.logger()Lcom/typesafe/scalalogging/Logger;at kafka.utils.Logging$class.info(Logging.scala:66)at nl.techop.kafka.KafkaHttpMetricsReporter.info(KafkaHttpMetricsReporter.scala:23)at nl.techop.kafka.KafkaHttpMetricsRepor...

java – 将字节数组发送到storm kafka bolt【代码】

我写了一个风暴拓扑.我基本上想要以字节数组的形式将avro架构中的元组发送到kafka主题. 这就是我设置螺栓的方法:builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, byte[]>()).fieldsGrouping(BOLT1, new Fields("key"));这就是我转换为字节数组的方式Schema schema = avroObject.getSchema();DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);ByteArrayOutputStream out = new B...

java – Kafka模式订阅.新主题没有触发重新平衡【代码】

根据kafka javadocs的文件,如果我: >订阅模式>创建与模式匹配的主题 应该发生重新平衡,这使消费者从该新主题中读取.但那并没有发生. 如果我停止并启动消费者,它确实会选择新主题.所以我知道新主题与模式匹配.这个问题可能在https://stackoverflow.com/questions/37120537/whitelist-filter-in-kafka-doesnt-pick-up-new-topics中有重复,但这个问题无处可去. 我看到kafka日志并没有错误,它只是不会触发重新平衡.当消费者加入或死亡...

Kafka Java API+自定义分区

kafka的API 第一步:导入kafka的开发jar包 <dependencies><!-- <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.1</version> </dependency> --> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <ver...

java – Apache Kafka从代码创建主题【代码】

我们知道Kafka中的主题创建应该在服务器初始化部分进行处理.我们使用默认脚本./kafka-topics –zookeeper …,但是如果我们需要动态创建主题呢?最佳答案:幸运的是,Kafka 0.10.1.0为我们带来了这种能力.我在Confluence Jira板上看到了这些引人入胜的功能,但找不到与该主题相关的任何文档,具有讽刺意味,不是吗? 所以,我去了源代码,找到了动态创建主题的方法.希望它对你们中的一些人有所帮助.当然,如果您有更好的解决方案,请不要犹豫...

java – 将ObjectMapper注入Spring Kafka serialiser / deserialiser【代码】

我正在使用Spring Kafka 1.1.2-RELEASE和Spring Boot 1.5.0 RC,我已经配置了一个自定义值serialiser / deserialiser类,扩展了org.springframework.kafka.support.serializer.JsonSerializer / org.springframework.kafka.support. serializer.JsonDeserializer.这些类确实使用了Jackson ObjectMapper,它可以通过构造函数提供. 是否有可能从我的Spring上下文中注入ObjectMapper?我已经配置了一个ObjectMapper,我想在serialiser / d...

kafka java项目测试使用【代码】

引入依赖<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>1.1.0</version> </dependency>生产者import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Properties;public class Produ...

kafka java实例

启动zookeeper和Kafka server 启动zookeeper: bin\windows\kafka-topics.bat --list --zookeeper localhost:2181 启动Kafka Server: bin\windows\kafka-server-start.bat config/server.properties 创建topic: bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mykafka编写java程序 maven pom文件添加依赖 <dependency><g...

kafka启动报java.net.UnknownHostException

kafka启动报java.net.UnknownHostException 参考资料: 1、https://blog.csdn.net/zdxiq000/article/details/62587657,本文从源码角度分析了UnknowHostException的出现原因,并且在一开始给出了解决方案,分析对策。

kafka消费者实时消费数据存入hdfs java scalca 代码【代码】

hadoop-client依赖很乱 调试很多次cdh版本好多jar没有 用hadoop2.7.3可以 自定义输出流的池子进行流管理public void writeLog2HDFS(String path, byte[] log) {try {//得到我们的装饰流FSDataOutputStream out = HDFSOutputStreamPool.getInstance().takeOutputStream(path);out.write(log);out.write("\r\n".getBytes());out.hsync();out.close();} catch (Exception e) {e.printStackTrace();}} }/*** @created by imp ON 2019/...

kafka java api客户端编程【代码】

环境 Ubuntu18.04 zookeeper3.4.13 kafka2.1.1 说明 因为是使用了最新的kafka,所以很多方法都过时了。研究了很久新api的用法,然后在此记录。 zkUtil已经不用了,改用AdminClient。 主要功能包括: 创建Topic:createTopics(Collection newTopics) 删除Topic:deleteTopics(Collection topics) 显示所有Topic:listTopics() 查询Topic:describeTopics(Collection topicNames) 查询集群信息:describeCluster() 查询ACL信息:desc...

Kafka对Java程序员有多重要?连阿里都再用它处理亿万级数据统计【图】

一.了解淘宝Kafka架构在ActiveMQ、RabbitMQ、RocketMQ、Kafka消息中间件之间,我们为什么要选择Kafka?下面详细介绍一下,2012年9月份我在支付宝做余额宝研发,2013年6月支付宝正式推出余额宝,2013年8月担任支付宝淘宝项目经理带领兄弟们一起做研发,期间需要与淘宝和500万对接竞彩接口数据,业余时间与淘宝的同事沟通,了解天猫在电商节如何处理这些大数据的?技术架构上采用了哪些策略呢? 一、应用无状态(淘宝session框架) 二、...