Kafka教程
Kafka教程
官网地址:https://kafka.apache.org
本文以 2.12
版本为例。
1、Kafka CLI
1.1 启动kafka服务
$ bin/kafka-server-start.sh config/server.properties
1.2 创建topic
# 创建topic(1个分区,1个副本)
# --partitions: 分区数
# --replication-factor: 副本数
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
1.3 查看所有topic
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
1.4 查看所有topic的详细信息
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181
1.5 查看指定topic的详细信息
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
1.6 删除指定topic
# 配置delete.topic.enable为true,这样才能删除topic
$ bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
1.7 启动生产者
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
1.8 启动消费者
# --from-beginning: 表示从头开始接收数据
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
# 也可以指定分组
#--group: 指定消费者组
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --group t1
1.9 查看消费者组
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
1.10 查看特定消费者组的消费情况
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-consumer-group
1.11 查看broker信息
$ bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092
1.12 切换leader
$ bin/kafka-preferred-replica-election.sh --zookeeper localhost:9092
1.13 kafka自带压测命令
$ bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=localhost:9092
1.14 kafka持续发送消息
持续发送消息到指定的topic中,且每条发送的消息都会有响应信息:
$ bin/kafka-verifiable-producer.sh --broker-list localhost:9092 --topic test --max-messages 10
1.15 通过zookeeper-shell.sh连接zookeeper
如果 kafka 集群的 zk 配置了 chroot 路径,那么需要加上/path
。
$ bin/zookeeper-shell.sh localhost:2181[/path]
$ ls /brokers/ids
$ get /brokers/ids/0
1.16 迁移分区
1、创建规则json
$ cat > increase-replication-factor.json <<EOF
{"version":1, "partitions":[
{"topic":"__consumer_offsets","partition":0,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":1,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":2,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":3,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":4,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":5,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":6,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":7,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":8,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":9,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":10,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":11,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":12,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":13,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":14,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":15,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":16,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":17,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":18,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":19,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":20,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":21,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":22,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":23,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":24,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":25,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":26,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":27,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":28,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":29,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":30,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":31,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":32,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":33,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":34,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":35,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":36,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":37,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":38,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":39,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":40,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":41,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":42,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":43,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":44,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":45,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":46,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":47,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":48,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":49,"replicas":[0,1]}]
}
EOF
2、执行
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
3、验证
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
2、Kafka API
2.1 导入依赖
<!-- kafka客户端工具 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
<!-- 工具类 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
</dependency>
2.2 生产消息到Kafka中(同步)
同步是发送消息完成之后,需要等待对方响应之后才能继续干其他的。
异步则是发送完消息之后,就可以继续往下执行业务逻辑。
将1-10的数字消息写入到Kafka中。
package com.example.kafka.demo1;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* 生产消息到Kafka中(同步)
*
* @author tom
*/
public class MyProducer1 {
public static void main(String[] args) {
// 1、创建用于连接Kafka的Properties配置
Properties properties = new Properties();
// 指定连接的kafka集群
properties.put("bootstrap.servers", "127.0.0.1:9092");
// ack应答级别
// all等价于-1 0 1
properties.put("acks", "all");
// 重试次数
properties.put("retries", 1);
// 批次大小
// 16k
properties.put("batch.size", 16384);
// 等待时间
properties.put("linger.ms", 1);
// recordAccumulator缓冲区大小
// 32m
properties.put("buffer.memory", 33554432);
// key,Value的序列化类
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2、创建一个生产者对象KafkaProducer
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 3、调用send发送1-10消息到指定Topic topic1
for (int i = 1; i < 11; ++i) {
try {
// 获取返回值Future,该对象封装了返回值
Future<RecordMetadata> future = producer.send(new ProducerRecord<>("topic1", null, i + ""));
// 4、调用一个Future.get()方法等待响应
RecordMetadata recordMetadata = future.get();
System.out.println("success->" + " partition = " + recordMetadata.partition() + " | offset = " + recordMetadata.offset());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
// 5、关闭生产者
producer.close();
}
}
程序输出:
注意:如果在 send() 方法后接着调用 get() 方法,那么就是有序的同步方法,消息会一条接一条的发送
send(xxx).get()。
2.2.1 生产者中ack的配置
在同步发送的前提下,生产者在获得集群返回的 ack 之前那会一直阻塞。那么集群什么时候返回ack呢?
此时ack有3个配置:
-
ack=0:kafka-cluster 不需要任何的 broker 收到消息,就立即返回 ack 给生产者,最容易丢消息,但是效率
是最高的。
-
ack=1(默认):多副本之间得leader已经收到消息,并把消息写入到本地log中,才会返回ack给生产者,性
能和安全是最均衡的。
-
ack=-1/all:里面有默认的配置min.insync.replicas=2(默认为1,推荐配置大于等于2),此时就需要leader和
一个follower同步完成之后,才会返回ack给生产者(此时集群中有2个broker已完成数据的接收),这种方式
最安全,但性能最差。
下面是关于ack和重试(如果没有收到ack,就开启重试)的配置:
// 发送失败会重试,默认重试时间间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,比如网络抖动,所以需要在接收者那边做好消息接收的幂等性处理
// 重试次数设置
props.put(ProducerConfig.RETRIES_CONFIG, 3);
// 重试间隔设置
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
2.2.2 其它参数介绍
Kafka默认会创建一个消息缓冲区,用来存放要发送的消息,缓冲区是32MB
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
Kafka本地线程会去缓冲区拉一次16K的数据,发送到broker
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
如果拉不到16K的数据,间隔10ms也会将已拉到的数据发送到broker
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
2.3 生产消息到Kafka中(异步回调)
将1-10的数字消息写入到Kafka中。
package com.example.kafka.demo1;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* 生产消息到Kafka中(异步回调)
*
* @author tom
*/
public class MyProducer2 {
public static void main(String[] args) {
// 1、创建用于连接Kafka的Properties配置
Properties properties = new Properties();
// 指定连接的kafka集群
properties.put("bootstrap.servers", "127.0.0.1:9092");
// ack应答级别
// all等价于-1 0 1
properties.put("acks", "all");
// 重试次数
properties.put("retries", 1);
// 批次大小
// 16k
properties.put("batch.size", 16384);
// 等待时间
properties.put("linger.ms", 1);
// recordAccumulator缓冲区大小
// 32m
properties.put("buffer.memory", 33554432);
// key,Value的序列化类
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2、创建一个生产者对象KafkaProducer
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 3、调用send发送1-10消息到指定Topic topic2
for (int i = 1; i < 11; i++) {
// 回调函数,该方法会在Producer收到ack时调用,为异步调用
// 如果key为确定值,那么分区也就确定了,所以我这里没有写死
producer.send(new ProducerRecord<>("topic2", null, i + ""), (metadata, exception) -> {
if (exception == null) {
System.out.println("success->" + " partition = " + metadata.partition() + " | offset = " + metadata.offset());
} else {
exception.printStackTrace();
}
});
}
// 5、关闭生产者
producer.close();
}
}
程序输出:
2.4 生产消息到Kafka中(自定义分区策略的插入)
分区的分配基本由 ProducerRecord 的参数决定。
指明 partition 的情况下,直接将指明的值直接作为 partiton 值。
没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值。
既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),
将这个值与topic可用的partition总数取余得到partition值,也就是常说的round-robin算法。
将1-10的数字消息写入到Kafka中。
package com.example.kafka.demo1;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* 自定义分区
* @author tom
*/
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// Integer num = cluster.partitionCountForTopic(topic);
// return key.toString().hashCode() % num;
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
package com.example.kafka.demo1;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* 生产消息到Kafka中(自定义分区策略的插入)
*
* @author tom
*/
public class MyProducer3 {
public static void main(String[] args) {
// 1、创建用于连接Kafka的Properties配置
Properties properties = new Properties();
// 指定连接的kafka集群
properties.put("bootstrap.servers", "127.0.0.1:9092");
// ack应答级别
// all等价于-1 0 1
properties.put("acks", "all");
// 重试次数
properties.put("retries", 1);
// 批次大小
// 16k
properties.put("batch.size", 16384);
// 等待时间
properties.put("linger.ms", 1);
// recordAccumulator缓冲区大小
// 32m
properties.put("buffer.memory", 33554432);
// 设置分区
properties.put("partitioner.class", "com.example.kafka.demo1.MyPartitioner");
// key,Value的序列化类
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2、创建一个生产者对象KafkaProducer
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 3、调用send发送1-10消息到指定Topic topic3
for (int i = 1; i < 11; i++) {
// 回调函数,该方法会在Producer收到ack时调用,为异步调用
// 如果key为确定值,那么分区也就确定了,所以我这里没有写死
producer.send(new ProducerRecord<>("topic3", null, i + ""), (metadata, exception) -> {
if (exception == null) {
System.out.println("success->" + " partition = " + metadata.partition() + " | offset = " + metadata.offset());
} else {
exception.printStackTrace();
}
});
}
// 5、关闭生产者
producer.close();
}
}
程序输出:
2.5 从Kafka的topic中消费消息(自动提交offset的消费者)
Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故不用担心数据丢失问题。
由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消
费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。所以 offset 的维护是
Consumer 消费数据是必须考虑的问题。
从 topic1中,将消息都消费,并将记录的offset、key、value打印出来。
package com.example.kafka.demo1;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
/**
* 从Kafka的topic中消费消息(自动提交offset的消费者)
*
* @author tom
*/
public class MyConsumer1 {
public static void main(String[] args) throws InterruptedException {
// 1、创建消费者配置信息
Properties properties = new Properties();
// 连接的集群 bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// 开启自动提交offset enable.auto.commit
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 自动提交offset的时间间隔 auto.commit.interval.ms
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// key,value的反序列化 key.deserializer、value.deserializer
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 消费者组 group.id
// 可以使用消费者组将若干个消费者组织到一起,共同消费Kafka中topic的数据
// 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group1");
// auto.offset.reset
// 重置消费者offset的方法(达到重复消费的目的),设置该属性也只在两种情况下生效:
// 1.上面设置的消费组还未消费(可以更改组名来消费)
// 2.该offset已经过期
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 2、创建Kafka消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 3、订阅要消费的主题
// 指定消费者从哪个topic中拉取数据
// kafkaConsumer.subscribe(Collections.singletonList("topic1"));
kafkaConsumer.subscribe(Arrays.asList("topic1"));
// 4、使用一个while循环,不断从Kafka的topic中拉取消息
while (true) {
// Kafka的消费者一次拉取一批的数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
// 5、将将记录(record)的offset、key、value都打印出来
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
// 主题
String topic = consumerRecord.topic();
// offset: 这条消息处于Kafka分区中的哪个位置
long offset = consumerRecord.offset();
// key、value
String key = consumerRecord.key();
String value = consumerRecord.value();
System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);
System.out.println(consumerRecord.key() + "----" + consumerRecord.value());
}
Thread.sleep(1000);
}
//consumer无需close()
}
}
程序输出:
2.6 从Kafka的topic中消费消息(手动提交offset的消费者)
手动提交 offset 的方法有两种:分别是 commitSync(同步提交)和 commitAsync(异步提交)。两者的相同点
是,都会将本次 poll 的一批数据最高的偏移量提交;不同点是,commitSync 阻塞当前线程,一直到提交成功,
并且会自动失败重试(由不可控因素导致,也会出现提交失败);而 commitAsync 则没有失败重试机制,故有可
能提交失败。
package com.example.kafka.demo1;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
/**
* 从Kafka的topic中消费消息(手动提交offset的消费者)
*
* @author tom
*/
public class MyConsumer2 {
public static void main(String[] args) throws InterruptedException {
// 1、创建消费者配置信息
Properties properties = new Properties();
// 连接的集群 bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// 关闭自动提交 offset enable.auto.commit
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// key,value的反序列化 key.deserializer、value.deserializer
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 消费者组 group.id
// 可以使用消费者组将若干个消费者组织到一起,共同消费Kafka中topic的数据
// 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group2");
// auto.offset.reset
// 重置消费者offset的方法(达到重复消费的目的),设置该属性也只在两种情况下生效:
// 1.上面设置的消费组还未消费(可以更改组名来消费)
// 2.该offset已经过期
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 2、创建Kafka消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 3、订阅要消费的主题
// 指定消费者从哪个topic中拉取数据
// kafkaConsumer.subscribe(Collections.singletonList("topic2"));
kafkaConsumer.subscribe(Arrays.asList("topic2"));
// 4、使用一个while循环,不断从Kafka的topic中拉取消息
while (true) {
// Kafka的消费者一次拉取一批的数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
// 5、将将记录(record)的offset、key、value都打印出来
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
// 主题
String topic = consumerRecord.topic();
// offset: 这条消息处于Kafka分区中的哪个位置
long offset = consumerRecord.offset();
// key、value
String key = consumerRecord.key();
String value = consumerRecord.value();
System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);
System.out.println(consumerRecord.key() + "----" + consumerRecord.value());
}
// 1.同步提交,当前线程会阻塞直到offset提交成功
kafkaConsumer.commitSync();
// 2.异步提交
// kafkaConsumer.commitAsync();
Thread.sleep(1000);
}
//consumer无需close()
}
}
程序输出:
2.6.1 消费者自动提交与手动提交offset
1、提交的内容
消费者无论是自动提交还是手动提交,都需要把所属的消费者组+消费的某个主题+消费的某个分区+消费的偏移
量,这样的信息提交到集群的_consumer_offsets主题里面。
2、自动提交
消费者poll消息下来以后就会自动提交到offset
//是否自动提交offset,默认:true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//自动提交offset的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
3、手动提交
将自动提交的配置改为false
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
手动提交分为了两种:
-
手动同步提交:在消费完消费后调用同步提交的方法,当集群返回ack前一直阻塞,返回ack后表示提交成功,
执行之后的逻辑。
// 所有消息已经消费完 // 有消息 if(records.count() > 0){ // 手动同步提交offset,当前线程会阻塞,直到offset提交成功 // 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了 //=========阻塞==== 提交成功 consumer.commitSync(); }
-
手动异步提交:在消息消费完后提交,不需要等到集群ack,直接执行之后的逻辑,可以设置一个回调方法,
供集群调用。
//所有消息已经消费完 //有消息 if(records.count() > 0){ //手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑 consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) { if(e != null){ System.err.println("Commit failed for " + offsets); System.err.println("Commit failed exception: " + e.getStackTrace()); } } }); }
2.6.2 长轮询poll消息
默认情况下,消费者一次会poll 500条消息。
//一次poll最大拉取消息的条数,可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
代码中设置了长轮询的时间是1000毫秒:
// 设置长轮询时间是1000ms
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for(ConsumerRecord<String, String> record : records){
System.out.printf("收到消息:partition = %d, offset = %d, key = %s, value = %s%n",
record.partition(), record.offset(), record.key(), record.value());
}
意味着:
如果一次poll到500条,直接执行for循环。
如果这一次没有poll到500条,且时间在1s内,要么长轮询继续poll,要么到500条,要么到1s。
如果多次poll都没达到500条,且1s时间到了,那么直接执行for循环。
如果两次poll的间隔超过30s,集群会认为该消费者的消费能力过弱,该消费者被提出消费组,触发rebalance机
制,rebalance机制会造成性能开销。可以通过设置这个参数,让一次poll的消息条数少一点。
//一次poll最大拉取消息的条数,可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
//如果两次poll的时间超出了30s的时间间隔,kafka会认为其消费能力过弱,将其提出消费组,将分区分配给其他消费者。-rebalance
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
2.6.3 消费者的健康状态检查
消费者每隔1s向kafka集群发送心跳,集群发现如果有超过10s没有续约的消费者,将被踢出消费组,触发该消费
组的rebalance机制,将该分区交给其他消费组里的其他消费者进行消费。
//consumer给broker发送?跳的间隔时间
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
//kafka如果超过10秒没有收到消费者的?跳,则会把消费者踢出消费组,进?rebalance,把分区分配给其他消费者。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000)
2.6.4 指定分区消费
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
// 只消费foo的分区0和分区1,不可以与subscribe方法同用
consumer.assign(Arrays.asList(partition0, partition1));
//TOPIC_NAME主题下的0号分区
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
2.6.5 从头消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
2.6.6 指定offset消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);//offset=10
2.6.7 获取最早offset或者最新offset
TopicPartition partition = new TopicPartition("foo", 0);
// 获取foo的分区0的最早offset
long beginningOffset = consumer.beginningOffsets(Collections.singletonList(partition)).get(partition);
// 获取foo的分区0的最新offset
long endOffset = consumer.endOffsets(Collections.singletonList(partition)).get(partition);
2.6.8 从某一个offset开始消费
TopicPartition partition = new TopicPartition("foo", 0);
long offset = args[0];
// 移动到指定offset
consumer.seek(partition, offset);
// 移动到最新offset
consumer.seekToEnd(Collections.singletonList(partition));
// 移动到最早offset
consumer.seekToBeginning(Collections.singletonList(partition));
2.6.9 指定时间消费
根据时间,去所有的partition中确定该时间对应的offset,然后去所有的partition中找到该offset之后的消息开始
消费。
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
//从1?时前开始消费
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {
map.put(new TopicPartition(TOPIC_NAME, par.partition()),
fetchDataTime);
}
Map<TopicPartition, OffsetAndTimestamp> parMap =
consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :
parMap.entrySet()) {
TopicPartition key = entry.getKey();
OffsetAndTimestamp value = entry.getValue();
if (key == null || value == null) continue;
Long offset = value.offset();
System.out.println("partition-" + key.partition() +
"|offset-" + offset);
System.out.println();
//根据消费?的timestamp确定offset
if (value != null) {
consumer.assign(Arrays.asList(key));
consumer.seek(key, offset);
}
}
2.6.10 新消费组的消费offset规则
新消费组的消费者在启动以后,默认会从当前分区的最后一条消息的offset+1开始消费(消费新消息)。可以通过
以下的设置,让新的消费者第一次从头开始消费。之后开始消费新消息(最后消费位置的偏移量+1)
- Latest:默认, 消费最新消息
- earliest:第一次从头开始消费,之后开始消费新消息(最后消费位置的偏移量+1)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
参考文档:
https://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
2.7 从Kafka的topic中消费消息(自定义提交策略)
package com.example.kafka.demo1;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
/**
* 从Kafka的topic中消费消息(自定义提交策略)
* 简单消费者--订阅主题带再均衡处理器
*
* @author tom
*/
public class MyConsumer3 {
public static void main(String[] args) throws InterruptedException {
// 1、创建消费者配置信息
Properties properties = new Properties();
// 连接的集群 bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// 开启自动提交offset enable.auto.commit
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 自动提交offset的时间间隔 auto.commit.interval.ms
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// key,value的反序列化 key.deserializer、value.deserializer
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 消费者组 group.id
// 可以使用消费者组将若干个消费者组织到一起,共同消费Kafka中topic的数据
// 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group3");
// auto.offset.reset
// 重置消费者offset的方法(达到重复消费的目的),设置该属性也只在两种情况下生效:
// 1.上面设置的消费组还未消费(可以更改组名来消费)
// 2.该offset已经过期
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 2、创建Kafka消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 3、订阅要消费的主题
// 指定消费者从哪个topic中拉取数据
kafkaConsumer.subscribe(Arrays.asList("topic3"), new ConsumerRebalanceListener() {
// 在均衡开始之前和消费者停止读取消息之后调用
// 如果在这个方法中提交偏移量,则下一个消费者就可以获得读取的偏移量
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 提交位移
kafkaConsumer.commitSync();
}
// 在重新分配分区之后和消费者开始读取消息之前调用
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
long committedOffset = -1;
for (TopicPartition topicPartition : partitions) {
// 获取该分区已消费的位移
committedOffset = kafkaConsumer.committed(topicPartition).offset();
// 重置位移到上一次提交的位移处开始消费
kafkaConsumer.seek(topicPartition, committedOffset + 1);
}
}
});
try {
// 4、使用一个while循环,不断从Kafka的topic中拉取消息
while (true) {
// Kafka的消费者一次拉取一批的数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
// 5、将将记录(record)的offset、key、value都打印出来
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
// 主题
String topic = consumerRecord.topic();
// offset:这条消息处于Kafka分区中的哪个位置
long offset = consumerRecord.offset();
// key、value
String key = consumerRecord.key();
String value = consumerRecord.value();
System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);
System.out.println(consumerRecord.key() + "----" + consumerRecord.value());
}
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaConsumer.close();
}
}
}
程序输出:
package com.example.kafka.demo1;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
/**
* 从Kafka的topic中消费消息(自定义提交策略)
* 提交指定偏移量和再均衡处理器实现
*
* @author tom
*/
public class MyConsumer4 {
public static void main(String[] args) {
// 1、创建消费者配置信息
Properties properties = new Properties();
// 连接的集群 bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// 关闭自动提交 offset enable.auto.commit
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// key,value的反序列化 key.deserializer、value.deserializer
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 消费者组 group.id
// 可以使用消费者组将若干个消费者组织到一起,共同消费Kafka中topic的数据
// 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group3");
// auto.offset.reset
// 重置消费者offset的方法(达到重复消费的目的),设置该属性也只在两种情况下生效:
// 1.上面设置的消费组还未消费(可以更改组名来消费)
// 2.该offset已经过期
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 2、创建Kafka消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
// 3、订阅要消费的主题
// 指定消费者从哪个topic中拉取数据
kafkaConsumer.subscribe(Collections.singletonList("topic3"), new ConsumerRebalanceListener() {
// 在均衡开始之前和消费者停止读取消息之后调用
// 如果在这个方法中提交偏移量,则下一个消费者就可以获得读取的偏移量
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Lost partitions in rebalance committing current offsets:" + currentOffsets);
// 提交位移
kafkaConsumer.commitSync(currentOffsets);
}
// 在重新分配分区之后和消费者开始读取消息之前调用
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 不做任何操作,也可以如下指定消费偏移量
long committedOffset = -1;
for (TopicPartition topicPartition : partitions) {
// 获取该分区已消费的位移
committedOffset = kafkaConsumer.committed(topicPartition).offset();
// 重置位移到上一次提交的位移处开始消费
kafkaConsumer.seek(topicPartition, committedOffset + 1);
}
}
});
try {
// 4、使用一个while循环,不断从Kafka的topic中拉取消息
while (true) {
// Kafka的消费者一次拉取一批的数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
// 5、将将记录(record)的offset、key、value都打印出来
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
// 主题
String topic = consumerRecord.topic();
// offset:这条消息处于Kafka分区中的哪个位置
long offset = consumerRecord.offset();
// key、value
String key = consumerRecord.key();
String value = consumerRecord.value();
System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);
System.out.println(consumerRecord.key() + "----" + consumerRecord.value());
// 设置需要提交的偏移量
currentOffsets.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1, "no metadata"));
}
// 手动异步提交指定偏移量
kafkaConsumer.commitAsync(currentOffsets, null);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
kafkaConsumer.commitSync(currentOffsets);
} finally {
kafkaConsumer.close();
System.out.println("closed consumer...");
}
}
}
}
程序输出:
2.8 kafka中发送和消费复杂类型
1、往kafka中写入user数据
2、消费kafka中user的数据
3、kafka的topic是userinfo
2.8.1 将复杂类型和字节数字相互转换
package com.example.kafka.demo2;
import java.io.*;
/**
* @author tom
*/
public class BeanConversion {
/**
* 对象转字节数组
*
* @param obj
* @return
*/
public static byte[] ObjectToBytes(Object obj) {
byte[] bytes = null;
ByteArrayOutputStream bo = null;
ObjectOutputStream oo = null;
try {
bo = new ByteArrayOutputStream();
oo = new ObjectOutputStream(bo);
oo.writeObject(obj);
bytes = bo.toByteArray();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (bo != null) {
bo.close();
}
if (oo != null) {
oo.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return bytes;
}
/**
* 字节数组转对象
*
* @param bytes
* @return
*/
public static Object BytesToObject(byte[] bytes) {
Object obj = null;
ByteArrayInputStream bi = null;
ObjectInputStream oi = null;
try {
bi = new ByteArrayInputStream(bytes);
oi = new ObjectInputStream(bi);
obj = oi.readObject();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (bi != null) {
bi.close();
}
if (oi != null) {
oi.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return obj;
}
}
2.8.2 实现kafka的序列化接口
package com.example.kafka.demo2;
import org.apache.kafka.common.serialization.Serializer;
/**
* @author tom
*/
public class EncodingKafka implements Serializer<Object> {
@Override
public byte[] serialize(String topic, Object data) {
return BeanConversion.ObjectToBytes(data);
}
}
2.8.3 实现kafka的反序列化接口
package com.example.kafka.demo2;
import org.apache.kafka.common.serialization.Deserializer;
/**
* @author tom
*/
public class DecodingKafka implements Deserializer<Object> {
@Override
public Object deserialize(String topic, byte[] data) {
return BeanConversion.BytesToObject(data);
}
}
2.8.4 对象
package com.example.kafka.demo2;
import java.io.Serializable;
/**
* @author tom
*/
public class User implements Serializable {
private int id;
private int age;
private String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
2.8.5 生产者发送数据
package com.example.kafka.demo2;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* 生产者发送数据
*
* @author tom
*/
public class KafkaProducerUser {
public final static String TOPIC_NAME = "userinfo";
public static void main(String[] args) {
// 1、创建用于连接Kafka的Properties配置
Properties properties = new Properties();
// 指定连接的kafka集群
properties.put("bootstrap.servers", "127.0.0.1:9092");
// ack应答级别
// all等价于-1 0 1
properties.put("acks", "all");
// 重试次数
properties.put("retries", 1);
// 批次大小
// 16k
properties.put("batch.size", 16384);
// 等待时间
properties.put("linger.ms", 1);
// recordAccumulator缓冲区大小
// 32m
properties.put("buffer.memory", 33554432);
// key,Value的序列化类
// key的序列化,其类型是Integer
properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
// 自己定义的序列化
// value的序列化,其类型是Object
properties.put("value.serializer", "com.example.kafka.demo2.EncodingKafka");
// 2、创建一个生产者对象KafkaProducer
KafkaProducer producer = new KafkaProducer<Integer, User>(properties);
for (int i = 0; i < 100; i++) {
User u = new User();
u.setId(i * 3);
u.setName("name" + i);
if (i < 20) {
u.setAge(i + 20);
} else {
u.setAge(i);
}
// public ProducerRecord(String topic, K key, V value)
ProducerRecord<Integer, User> producerRecord = new ProducerRecord<>(TOPIC_NAME, u.getId(), u);
// 3、调用send发送消息到指定Topic
// send()方法会返回一个包含RecordMetadata的Future对象,不过因为我们会忽略返回值,所以无法知道消息是否发送成功
// 如果不关心发送结果,那么可以使用这种发送方式
Future<RecordMetadata> future = producer.send(producerRecord);
// send()方住先返回一个 Future对象,然后调用Future对象的get()方法等待Kafka响应
// 如果服务器返回错误,get()方法会抛出异
// 如果没有发生错误,我们会得到一个RecordMetadata对象,可以用它获取消息的偏移量
// 如果在发送数据之前或者在发送过程中发生了任何错误,比如broker返回了一个不允许重发消息的异常或者已经超过了重发的次数,那么就会抛出异常
RecordMetadata rm = null;
try {
// 4、调用一个Future.get()方法等待响应
rm = future.get();
System.out.println(rm.topic() + " 分区:" + rm.partition() + " 偏移量:" + rm.offset());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
// 5、关闭生产者
producer.close();
}
}
程序输出:
2.8.6 消费者消费数据
package com.example.kafka.demo2;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
/**
* 消费者消费数据
*
* @author tom
*/
public class KafkaConsumerUser {
public final static String TOPIC_NAME = "userinfo";
public static void main(String[] args) throws Exception {
// 1、创建消费者配置信息
Properties properties = new Properties();
// 连接的集群 bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// 开启自动提交offset enable.auto.commit
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 自动提交offset的时间间隔 auto.commit.interval.ms
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// key,value的反序列化 key.deserializer、value.deserializer
// key的反序列化,其类型是Integer
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
// value的反序列化,其类型是Object
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.example.kafka.demo2.DecodingKafka");
// 消费者组 group.id
// 可以使用消费者组将若干个消费者组织到一起,共同消费Kafka中topic的数据
// 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
properties.put(ConsumerConfig.GROUP_ID_CONFIG, TOPIC_NAME);
// auto.offset.reset
// 重置消费者offset的方法(达到重复消费的目的),设置该属性也只在两种情况下生效:
// 1.上面设置的消费组还未消费(可以更改组名来消费)
// 2.该offset已经过期
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 2、创建Kafka消费者
KafkaConsumer<Integer, User> kafkaConsumer = new KafkaConsumer<>(properties);
// 3、订阅要消费的主题
// 指定消费者从哪个topic中拉取数据
kafkaConsumer.subscribe(Arrays.asList(TOPIC_NAME));
// 4、使用一个while循环,不断从Kafka的topic中拉取消息
while (true) {
// Kafka的消费者一次拉取一批的数据
ConsumerRecords<Integer, User> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
// 5.将将记录(record)的offset、key、value都打印出来
for (ConsumerRecord<Integer, User> consumerRecord : consumerRecords) {
// 主题
String topic = consumerRecord.topic();
// offset:这条消息处于Kafka分区中的哪个位置
long offset = consumerRecord.offset();
// key\value
Integer key = consumerRecord.key();
User value = consumerRecord.value();
System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);
}
Thread.sleep(1000);
}
}
}
程序输出:
2.9 消费kafka topic中的历史数据
消费topic已经存在的数据,类似命令中 --from-beginning
参数。
代码在该服务启动前,如果topic中存在数据,是可以全部读出来,但如果topic数据部分已经被消费了,也会被读
出来。
package com.example.kafka.demo3;
import com.example.kafka.demo2.User;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
/**
* @author tom
*/
public class KafkaConsumerHistoryOfUser {
public final static String TOPIC_NAME = "userinfo";
public static void main(String[] args) throws Exception {
// 1、创建消费者配置信息
Properties properties = new Properties();
// 连接的集群 bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// 开启自动提交offset enable.auto.commit
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 自动提交offset的时间间隔 auto.commit.interval.ms
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// key,value的反序列化 key.deserializer、value.deserializer
// key的反序列化,其类型是Integer
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
// value的反序列化,其类型是Object
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.example.kafka.demo2.DecodingKafka");
// 消费者组 group.id
// 可以使用消费者组将若干个消费者组织到一起,共同消费Kafka中topic的数据
// 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
properties.put(ConsumerConfig.GROUP_ID_CONFIG, TOPIC_NAME);
// auto.offset.reset
// 重置消费者offset的方法(达到重复消费的目的),设置该属性也只在两种情况下生效:
// 1.上面设置的消费组还未消费(可以更改组名来消费)
// 2.该offset已经过期
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 2、创建Kafka消费者
KafkaConsumer<Integer, User> kafkaConsumer = new KafkaConsumer<>(properties);
// 基于再均衡监听器,在给消费者分配分区的时候将消息偏移量跳转到起始位置
kafkaConsumer.subscribe(Collections.singletonList(TOPIC_NAME), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
// 读取历史数据 --from-beginning
// 基于seekToBeginning方法
kafkaConsumer.seekToBeginning(collection);
// Map<TopicPartition, Long> beginningOffset = kafkaConsumer.beginningOffsets(collection);
// for (Map.Entry<TopicPartition, Long> entry : beginningOffset.entrySet()) {
// 基于seek方法
// TopicPartition tp = entry.getKey();
// long offset = entry.getValue();
// consumer.seek(tp,offset);
//}
}
});
while (true) {
// Kafka的消费者一次拉取一批的数据
ConsumerRecords<Integer, User> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
// 5.将将记录(record)的offset、key、value都打印出来
for (ConsumerRecord<Integer, User> consumerRecord : consumerRecords) {
// 主题
String topic = consumerRecord.topic();
// offset:这条消息处于Kafka分区中的哪个位置
long offset = consumerRecord.offset();
// key\value
Integer key = consumerRecord.key();
User value = consumerRecord.value();
System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);
}
Thread.sleep(1000);
}
}
}
程序输出:
至此,结束。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!