首页 / JAVA / Kafka整合Java API
Kafka整合Java API
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了Kafka整合Java API,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含18827字,纯文字阅读大概需要27分钟。
内容图文
![Kafka整合Java API](/upload/InfoBanner/zyjiaocheng/644/ff7f98f81e51448d972fbcd256d8d410.jpg)
一、开发准备
首先,在搭建好kafka(1.0.0版本)环境之后,这里用的开发语言是Java,构建工具Maven。
Maven的依赖如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>1.0.0</groupId>
<artifactId>kafka-consumerExample</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafka-consumerExample</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.7</java.version>
<slf4j.version>1.7.25</slf4j.version>
<logback.version>1.2.3</logback.version>
<kafka.version>1.0.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>
</project>
二、Kafka Producer
package com.lxk.kafka.producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class KafkaProducerTest implements Runnable {
private final KafkaProducer<String, String> producer;
private final String topic;
public KafkaProducerTest(String topicName) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.18.103:9092,192.168.18.104:9092,192.168.18.105:9092");
// acks=0:如果设置为0,生产者不会等待kafka的响应。
// acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。
// acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。
props.put("acks", "all");
// 配置为大于0的值的话,客户端会在消息发送失败时重新发送。
props.put("retries", 0);
// 当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率
props.put("batch.size", 16384);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
this.producer = new KafkaProducer<String, String>(props);
this.topic = topicName;
}
public void run() {
int messageNo = 1;
try {
for (;;) {
String messageStr = "你好,这是第" + messageNo + "条数据";
producer.send(new ProducerRecord<String, String>(topic, "Message", messageStr));
// 生产了10条就打印
if (messageNo % 10 == 0) {
System.out.println("发送的信息:" + messageStr);
}
// 生产100条就退出
if (messageNo % 100 == 0) {
System.out.println("成功发送了" + messageNo + "条");
break;
}
messageNo++;
// Utils.sleep(1);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
public static void main(String args[]) {
KafkaProducerTest test = new KafkaProducerTest("GMALL_STARTUP");
Thread thread = new Thread(test);
thread.start();
}
}
output:
14:10:43.436 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
acks = all
batch.size = 16384
bootstrap.servers = [192.168.18.103:9092, 192.168.18.104:9092, 192.168.18.105:9092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializerCreatePartitions(37): 0 [usable: 0])
发送的信息:你好,这是第10条数据
14:10:44.113 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating connection to node node05:9092 (id: 2 rack: null)
发送的信息:你好,这是第20条数据
发送的信息:你好,这是第30条数据
发送的信息:你好,这是第40条数据
发送的信息:你好,这是第50条数据
发送的信息:你好,这是第60条数据
发送的信息:你好,这是第70条数据
发送的信息:你好,这是第80条数据
发送的信息:你好,这是第90条数据
发送的信息:你好,这是第100条数据
成功发送了100条
14:10:44.116 [Thread-1] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
配置说明
bootstrap.servers: kafka的地址。
acks:消息的确认机制,默认值是0。
- acks=0:如果设置为0,生产者不会等待kafka的响应。
- acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。
- acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。
retries:配置为大于0的值的话,客户端会在消息发送失败时重新发送。
batch.size:当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率。
key.serializer: 键序列化,默认org.apache.kafka.common.serialization.StringDeserializer。
value.deserializer:值序列化,默认org.apache.kafka.common.serialization.StringDeserializer。
…
还有更多配置,可以去查看官方文档,这里就不在说明了。
kafka的配置添加之后,我们便开始生产数据,生产数据代码只需如下就行:
producer.send(new ProducerRecord<String, String>(topic,key,value));
topic: 消息队列的名称,可以先行在kafka服务中进行创建。如果kafka中并未创建该topic,那么便会自动创建!
key:键值,也就是value对应的值,和Map类似。
value:要发送的数据,数据格式为String类型的。
在写好生产者程序之后,就可以生产了,我这里发送的消息为:
String messageStr="你好,这是第"+messageNo+"条数据";
并且只发送100条就退出。
三、Kafka Consumer
package com.lxk.kafka.consumer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class KafkaConsumerTest implements Runnable {
private KafkaConsumer<String, String> consumer;
private ConsumerRecords<String, String> msgList;
private String topic;
private static final String GROUPID = "groupE4";
public KafkaConsumerTest(String topicName) {
this.topic = topicName;
init();
}
public void run() {
System.out.println("---------开始消费---------");
int messageNo = 1;
List<String> list = new ArrayList<String>();
List<Long> list2 = new ArrayList<Long>();
try {
for (;;) {
msgList = consumer.poll(100);
if (null != msgList && msgList.count() > 0) {
for (ConsumerRecord<String, String> record : msgList) {
if (messageNo % 10 == 0) {
System.out.println(messageNo + "=======receive: key = " + record.key() + ", value = "
+ record.value() + " offset===" + record.offset());
}
list.add(record.value());
list2.add(record.offset());
messageNo++;
}
if (list.size() == 50) {
// 手动提交
consumer.commitSync();
System.out.println("成功提交" + list.size() + "条,此时的offset为:" + list2.get(49));
} else if (list.size() > 50) {
consumer.close();
init();
list.clear();
list2.clear();
}
} else {
Thread.sleep(1000);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
private void init() {
Properties props = new Properties();
// kafka消费的的地址
props.put("bootstrap.servers", "node03:9092,node04:9092,node05:9092");
// 组名 不同组名可以重复消费
props.put("group.id", GROUPID);
// 是否自动提交
props.put("enable.auto.commit", "false");
// 超时时间
props.put("session.timeout.ms", "30000");
// 一次最大拉取的条数
props.put("max.poll.records", 10);
// earliest当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
// latest
// 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
// none
// topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
props.put("auto.offset.reset", "earliest");
// 序列化
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<String, String>(props);
// 订阅主题列表topic
this.consumer.subscribe(Arrays.asList(topic));
System.out.println("初始化!");
}
public static void main(String args[]) {
KafkaConsumerTest test1 = new KafkaConsumerTest("GMALL_STARTUP");
Thread thread1 = new Thread(test1);
thread1.start();
}
}
output:
14:13:41.119 [Thread-1] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [node03:9092, node04:9092, node05:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = groupE4
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 10
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 30000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer14:13:41.123 [Thread-1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.0.0
14:13:41.123 [Thread-1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : aaa7af6d4a11b29d
14:13:41.123 [Thread-1] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-5, groupId=groupE4] Kafka consumer initialized
14:13:41.123 [Thread-1] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-5, groupId=groupE4] Subscribed to topic(s): GMALL_STARTUP
初始化!14:13:41.157 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-5, groupId=groupE4] Fetch READ_UNCOMMITTED at offset 800 for partition GMALL_STARTUP-2 returned fetch data (error=NONE, highWaterMark=900, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=4389)
14:13:41.158 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name GMALL_STARTUP-2.records-lag
250=======receive: key = Message, value = 你好,这是第10条数据 offset===809
260=======receive: key = Message, value = 你好,这是第20条数据 offset===819
270=======receive: key = Message, value = 你好,这是第30条数据 offset===829
280=======receive: key = Message, value = 你好,这是第40条数据 offset===839
290=======receive: key = Message, value = 你好,这是第50条数据 offset===849
14:13:41.162 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-5, groupId=groupE4] Committed offset 0 for partition GMALL_STARTUP-0
14:13:41.162 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-5, groupId=groupE4] Committed offset 0 for partition GMALL_STARTUP-1
14:13:41.162 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-5, groupId=groupE4] Committed offset 850 for partition GMALL_STARTUP-2
成功提交50条,此时的offset为:84914:13:41.162 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-5, groupId=groupE4] Committed offset 850 for partition GMALL_STARTUP-2
成功提交50条,此时的offset为:849
300=======receive: key = Message, value = 你好,这是第60条数据 offset===85914:13:41.209 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-6, groupId=groupE4] Fetch READ_UNCOMMITTED at offset 850 for partition GMALL_STARTUP-2 returned fetch data (error=NONE, highWaterMark=900, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=4389)
14:13:41.209 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name GMALL_STARTUP-2.records-lag
310=======receive: key = Message, value = 你好,这是第60条数据 offset===859
320=======receive: key = Message, value = 你好,这是第70条数据 offset===869
330=======receive: key = Message, value = 你好,这是第80条数据 offset===879
340=======receive: key = Message, value = 你好,这是第90条数据 offset===889
350=======receive: key = Message, value = 你好,这是第100条数据 offset===899
14:13:41.216 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-6, groupId=groupE4] Committed offset 0 for partition GMALL_STARTUP-0
14:13:41.217 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-6, groupId=groupE4] Committed offset 0 for partition GMALL_STARTUP-1
14:13:41.217 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-6, groupId=groupE4] Committed offset 900 for partition GMALL_STARTUP-2
成功提交50条,此时的offset为:899
配置说明
kafka消费这块应该来说是重点,毕竟大部分的时候,我们主要使用的是将数据进行消费。
kafka消费的配置如下:
bootstrap.servers: kafka的地址。
group.id:组名 不同组名可以重复消费。例如你先使用了组名A消费了kafka的1000条数据,但是你还想再次进行消费这100条数据,并且不想重新去产生,那么这里你只需要更改组名就可以重复消费了。
enable.auto.commit:是否自动提交,默认为true。
auto.commit.interval.ms: 从poll(拉)的回话处理时长。
session.timeout.ms:超时时间。
max.poll.records:一次最大拉取的条数。
auto.offset.reset:消费规则,默认earliest 。
- earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 。
- latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 。
- none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。
key.serializer: 键序列化,默认org.apache.kafka.common.serialization.StringDeserializer。
value.deserializer:值序列化,默认org.apache.kafka.common.serialization.StringDeserializer。
由于我这是设置的自动提交,所以消费代码如下:
我们需要先订阅一个topic,也就是指定消费哪一个topic。
consumer.subscribe(Arrays.asList(topic));
订阅之后,我们再从kafka中拉取数据:
ConsumerRecords<String, String> msgList=consumer.poll(1000);
一般来说进行消费会使用监听,这里我们就用for(;;)来进行监听, 并且设置消费100条就退出!
注意一下自动提交:关于Kafka 的 consumer 消费者手动提交详解
offset:指的是kafka的topic中的每个消费组消费的下标。
简单的来说就是一条消息对应一个offset下标,每次消费数据的时候如果提交offset,那么下次消费就会从提交的offset加一那里开始消费。
比如一个topic中有100条数据,我消费了50条并且提交了,那么此时的kafka服务端记录提交的offset就是49(offset从0开始),那么下次消费的时候offset就从50开始消费。
总结
简单的开发一个kafka的程序需要以下步骤:
- 成功搭建kafka服务器,并成功启动!
- 得到kafka服务信息,然后在代码中进行相应的配置。
- 配置完成之后,监听kafka中的消息队列是否有消息产生。
- 将产生的数据进行业务逻辑处理!
kafka介绍参考官方文档:http://kafka.apache.org/intro
Jeremy_Lee123 发布了391 篇原创文章 · 获赞 348 · 访问量 7万+ 私信 关注内容总结
以上是互联网集市为您收集整理的Kafka整合Java API全部内容,希望文章能够帮你解决Kafka整合Java API所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。