Kafka教程

2024-01-08 20:38:00

Kafka教程

官网地址:https://kafka.apache.org

本文以 2.12 版本为例。

1、Kafka CLI

1.1 启动kafka服务

$ bin/kafka-server-start.sh config/server.properties

1.2 创建topic

# 创建topic(1个分区,1个副本)
# --partitions: 分区数
# --replication-factor: 副本数
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

1.3 查看所有topic

$ bin/kafka-topics.sh --list --zookeeper localhost:2181

1.4 查看所有topic的详细信息

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181

1.5 查看指定topic的详细信息

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

1.6 删除指定topic

# 配置delete.topic.enable为true,这样才能删除topic
$ bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

1.7 启动生产者

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

1.8 启动消费者

# --from-beginning: 表示从头开始接收数据
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
# 也可以指定分组
#--group: 指定消费者组
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --group t1

1.9 查看消费者组

$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

1.10 查看特定消费者组的消费情况

$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-consumer-group

1.11 查看broker信息

$ bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092

1.12 切换leader

$ bin/kafka-preferred-replica-election.sh --zookeeper localhost:9092

1.13 kafka自带压测命令

$ bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100  --producer-props bootstrap.servers=localhost:9092

1.14 kafka持续发送消息

持续发送消息到指定的topic中,且每条发送的消息都会有响应信息:

$ bin/kafka-verifiable-producer.sh --broker-list localhost:9092 --topic test --max-messages 10

1.15 通过zookeeper-shell.sh连接zookeeper

如果 kafka 集群的 zk 配置了 chroot 路径,那么需要加上/path

$ bin/zookeeper-shell.sh localhost:2181[/path]
$ ls /brokers/ids
$ get /brokers/ids/0

1.16 迁移分区

1、创建规则json

$ cat > increase-replication-factor.json <<EOF
{"version":1, "partitions":[
{"topic":"__consumer_offsets","partition":0,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":1,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":2,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":3,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":4,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":5,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":6,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":7,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":8,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":9,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":10,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":11,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":12,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":13,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":14,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":15,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":16,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":17,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":18,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":19,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":20,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":21,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":22,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":23,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":24,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":25,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":26,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":27,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":28,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":29,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":30,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":31,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":32,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":33,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":34,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":35,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":36,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":37,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":38,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":39,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":40,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":41,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":42,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":43,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":44,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":45,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":46,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":47,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":48,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":49,"replicas":[0,1]}]
}
EOF

2、执行

$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute

3、验证

$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify

2、Kafka API

2.1 导入依赖

<!-- kafka客户端工具 -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.1</version>
</dependency>
<!-- 工具类 -->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-io</artifactId>
    <version>1.3.2</version>
</dependency>

2.2 生产消息到Kafka中(同步)

同步是发送消息完成之后,需要等待对方响应之后才能继续干其他的。

异步则是发送完消息之后,就可以继续往下执行业务逻辑。

将1-10的数字消息写入到Kafka中。

package com.example.kafka.demo1;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
 * 生产消息到Kafka中(同步)
 *
 * @author tom
 */
public class MyProducer1 {
    public static void main(String[] args) {
        // 1、创建用于连接Kafka的Properties配置
        Properties properties = new Properties();
        // 指定连接的kafka集群
        properties.put("bootstrap.servers", "127.0.0.1:9092");
        // ack应答级别
        // all等价于-1   0    1
        properties.put("acks", "all");
        // 重试次数
        properties.put("retries", 1);
        // 批次大小
        // 16k
        properties.put("batch.size", 16384);
        // 等待时间
        properties.put("linger.ms", 1);
        // recordAccumulator缓冲区大小
        // 32m
        properties.put("buffer.memory", 33554432);
        // key,Value的序列化类
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 2、创建一个生产者对象KafkaProducer
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        // 3、调用send发送1-10消息到指定Topic topic1
        for (int i = 1; i < 11; ++i) {
            try {
                // 获取返回值Future,该对象封装了返回值
                Future<RecordMetadata> future = producer.send(new ProducerRecord<>("topic1", null, i + ""));
                // 4、调用一个Future.get()方法等待响应
                RecordMetadata recordMetadata = future.get();
                System.out.println("success->" + "   partition = " + recordMetadata.partition() + "  |  offset = " + recordMetadata.offset());
            } catch (ExecutionException | InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 5、关闭生产者
        producer.close();
    }
}

程序输出:

在这里插入图片描述

注意:如果在 send() 方法后接着调用 get() 方法,那么就是有序的同步方法,消息会一条接一条的发送

send(xxx).get()。

2.2.1 生产者中ack的配置

在同步发送的前提下,生产者在获得集群返回的 ack 之前那会一直阻塞。那么集群什么时候返回ack呢?

此时ack有3个配置:

  • ack=0:kafka-cluster 不需要任何的 broker 收到消息,就立即返回 ack 给生产者,最容易丢消息,但是效率

    是最高的。

  • ack=1(默认):多副本之间得leader已经收到消息,并把消息写入到本地log中,才会返回ack给生产者,性

    能和安全是最均衡的。

  • ack=-1/all:里面有默认的配置min.insync.replicas=2(默认为1,推荐配置大于等于2),此时就需要leader和

    一个follower同步完成之后,才会返回ack给生产者(此时集群中有2个broker已完成数据的接收),这种方式

    最安全,但性能最差。

下面是关于ack和重试(如果没有收到ack,就开启重试)的配置:

// 发送失败会重试,默认重试时间间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,比如网络抖动,所以需要在接收者那边做好消息接收的幂等性处理
// 重试次数设置
props.put(ProducerConfig.RETRIES_CONFIG, 3);
// 重试间隔设置
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
2.2.2 其它参数介绍

Kafka默认会创建一个消息缓冲区,用来存放要发送的消息,缓冲区是32MB

props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); 

Kafka本地线程会去缓冲区拉一次16K的数据,发送到broker

props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

如果拉不到16K的数据,间隔10ms也会将已拉到的数据发送到broker

props.put(ProducerConfig.LINGER_MS_CONFIG, 10);

2.3 生产消息到Kafka中(异步回调)

将1-10的数字消息写入到Kafka中。

package com.example.kafka.demo1;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * 生产消息到Kafka中(异步回调)
 *
 * @author tom
 */
public class MyProducer2 {
    public static void main(String[] args) {
        // 1、创建用于连接Kafka的Properties配置
        Properties properties = new Properties();
        // 指定连接的kafka集群
        properties.put("bootstrap.servers", "127.0.0.1:9092");
        // ack应答级别
        // all等价于-1   0    1
        properties.put("acks", "all");
        // 重试次数
        properties.put("retries", 1);
        // 批次大小
        // 16k
        properties.put("batch.size", 16384);
        // 等待时间
        properties.put("linger.ms", 1);
        // recordAccumulator缓冲区大小
        // 32m
        properties.put("buffer.memory", 33554432);
        // key,Value的序列化类
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 2、创建一个生产者对象KafkaProducer
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        // 3、调用send发送1-10消息到指定Topic topic2
        for (int i = 1; i < 11; i++) {
            // 回调函数,该方法会在Producer收到ack时调用,为异步调用
            // 如果key为确定值,那么分区也就确定了,所以我这里没有写死
            producer.send(new ProducerRecord<>("topic2", null, i + ""), (metadata, exception) -> {
                if (exception == null) {
                    System.out.println("success->" + "   partition = " + metadata.partition() + "  |  offset = " + metadata.offset());
                } else {
                    exception.printStackTrace();
                }
            });
        }
        // 5、关闭生产者
        producer.close();
    }

}

程序输出:

在这里插入图片描述

2.4 生产消息到Kafka中(自定义分区策略的插入)

分区的分配基本由 ProducerRecord 的参数决定。

指明 partition 的情况下,直接将指明的值直接作为 partiton 值。

没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值。

既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),

将这个值与topic可用的partition总数取余得到partition值,也就是常说的round-robin算法。

将1-10的数字消息写入到Kafka中。

package com.example.kafka.demo1;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * 自定义分区
 * @author tom
 */
public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // Integer num = cluster.partitionCountForTopic(topic);
        // return key.toString().hashCode() % num;
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}
package com.example.kafka.demo1;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * 生产消息到Kafka中(自定义分区策略的插入)
 *
 * @author tom
 */
public class MyProducer3 {
    public static void main(String[] args) {
        // 1、创建用于连接Kafka的Properties配置
        Properties properties = new Properties();
        // 指定连接的kafka集群
        properties.put("bootstrap.servers", "127.0.0.1:9092");
        // ack应答级别
        // all等价于-1   0    1
        properties.put("acks", "all");
        // 重试次数
        properties.put("retries", 1);
        // 批次大小
        // 16k
        properties.put("batch.size", 16384);
        // 等待时间
        properties.put("linger.ms", 1);
        // recordAccumulator缓冲区大小
        // 32m
        properties.put("buffer.memory", 33554432);
        // 设置分区
        properties.put("partitioner.class", "com.example.kafka.demo1.MyPartitioner");
        // key,Value的序列化类
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 2、创建一个生产者对象KafkaProducer
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        // 3、调用send发送1-10消息到指定Topic topic3
        for (int i = 1; i < 11; i++) {
            // 回调函数,该方法会在Producer收到ack时调用,为异步调用
            // 如果key为确定值,那么分区也就确定了,所以我这里没有写死
            producer.send(new ProducerRecord<>("topic3", null, i + ""), (metadata, exception) -> {
                if (exception == null) {
                    System.out.println("success->" + "   partition = " + metadata.partition() + "  |  offset = " + metadata.offset());
                } else {
                    exception.printStackTrace();
                }
            });
        }
        // 5、关闭生产者
        producer.close();
    }

}

程序输出:

在这里插入图片描述

2.5 从Kafka的topic中消费消息(自动提交offset的消费者)

Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故不用担心数据丢失问题。

由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消

费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。所以 offset 的维护是

Consumer 消费数据是必须考虑的问题。

从 topic1中,将消息都消费,并将记录的offset、key、value打印出来。

package com.example.kafka.demo1;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * 从Kafka的topic中消费消息(自动提交offset的消费者)
 *
 * @author tom
 */
public class MyConsumer1 {

    public static void main(String[] args) throws InterruptedException {
        // 1、创建消费者配置信息
        Properties properties = new Properties();
        // 连接的集群 bootstrap.servers
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        // 开启自动提交offset enable.auto.commit
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        // 自动提交offset的时间间隔 auto.commit.interval.ms
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        // key,value的反序列化 key.deserializer、value.deserializer
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        // 消费者组 group.id
        // 可以使用消费者组将若干个消费者组织到一起,共同消费Kafka中topic的数据
        // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group1");
        // auto.offset.reset
        // 重置消费者offset的方法(达到重复消费的目的),设置该属性也只在两种情况下生效:
        // 1.上面设置的消费组还未消费(可以更改组名来消费)
        // 2.该offset已经过期
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // 2、创建Kafka消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        // 3、订阅要消费的主题
        // 指定消费者从哪个topic中拉取数据
        // kafkaConsumer.subscribe(Collections.singletonList("topic1"));
        kafkaConsumer.subscribe(Arrays.asList("topic1"));
        // 4、使用一个while循环,不断从Kafka的topic中拉取消息
        while (true) {
            // Kafka的消费者一次拉取一批的数据
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
            // 5、将将记录(record)的offset、key、value都打印出来
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                // 主题
                String topic = consumerRecord.topic();
                // offset: 这条消息处于Kafka分区中的哪个位置
                long offset = consumerRecord.offset();
                // key、value
                String key = consumerRecord.key();
                String value = consumerRecord.value();
                System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);
                System.out.println(consumerRecord.key() + "----" + consumerRecord.value());
            }
            Thread.sleep(1000);
        }
        //consumer无需close()
    }
}

程序输出:

在这里插入图片描述

2.6 从Kafka的topic中消费消息(手动提交offset的消费者)

手动提交 offset 的方法有两种:分别是 commitSync(同步提交)和 commitAsync(异步提交)。两者的相同点

是,都会将本次 poll 的一批数据最高的偏移量提交;不同点是,commitSync 阻塞当前线程,一直到提交成功,

并且会自动失败重试(由不可控因素导致,也会出现提交失败);而 commitAsync 则没有失败重试机制,故有可

能提交失败。

package com.example.kafka.demo1;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * 从Kafka的topic中消费消息(手动提交offset的消费者)
 *
 * @author tom
 */
public class MyConsumer2 {

    public static void main(String[] args) throws InterruptedException {
        // 1、创建消费者配置信息
        Properties properties = new Properties();
        // 连接的集群 bootstrap.servers
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        // 关闭自动提交 offset enable.auto.commit
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        // key,value的反序列化 key.deserializer、value.deserializer
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        // 消费者组 group.id
        // 可以使用消费者组将若干个消费者组织到一起,共同消费Kafka中topic的数据
        // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group2");
        // auto.offset.reset
        // 重置消费者offset的方法(达到重复消费的目的),设置该属性也只在两种情况下生效:
        // 1.上面设置的消费组还未消费(可以更改组名来消费)
        // 2.该offset已经过期
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // 2、创建Kafka消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        // 3、订阅要消费的主题
        // 指定消费者从哪个topic中拉取数据
        // kafkaConsumer.subscribe(Collections.singletonList("topic2"));
        kafkaConsumer.subscribe(Arrays.asList("topic2"));
        // 4、使用一个while循环,不断从Kafka的topic中拉取消息
        while (true) {
            // Kafka的消费者一次拉取一批的数据
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
            // 5、将将记录(record)的offset、key、value都打印出来
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                // 主题
                String topic = consumerRecord.topic();
                // offset: 这条消息处于Kafka分区中的哪个位置
                long offset = consumerRecord.offset();
                // key、value
                String key = consumerRecord.key();
                String value = consumerRecord.value();
                System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);
                System.out.println(consumerRecord.key() + "----" + consumerRecord.value());
            }
            // 1.同步提交,当前线程会阻塞直到offset提交成功
            kafkaConsumer.commitSync();
            // 2.异步提交
            // kafkaConsumer.commitAsync();
            Thread.sleep(1000);
        }
        //consumer无需close()
    }
}

程序输出:

在这里插入图片描述

2.6.1 消费者自动提交与手动提交offset

1、提交的内容

消费者无论是自动提交还是手动提交,都需要把所属的消费者组+消费的某个主题+消费的某个分区+消费的偏移

量,这样的信息提交到集群的_consumer_offsets主题里面。

2、自动提交

消费者poll消息下来以后就会自动提交到offset

//是否自动提交offset,默认:true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//自动提交offset的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

3、手动提交

将自动提交的配置改为false

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

手动提交分为了两种:

  • 手动同步提交:在消费完消费后调用同步提交的方法,当集群返回ack前一直阻塞,返回ack后表示提交成功,

    执行之后的逻辑。

    // 所有消息已经消费完
    // 有消息
    if(records.count() > 0){
        // 手动同步提交offset,当前线程会阻塞,直到offset提交成功
        // 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了
        //=========阻塞==== 提交成功
        consumer.commitSync(); 
    }
    
  • 手动异步提交:在消息消费完后提交,不需要等到集群ack,直接执行之后的逻辑,可以设置一个回调方法,

    供集群调用。

    //所有消息已经消费完
    //有消息
    if(records.count() > 0){
        //手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑
        consumer.commitAsync(new OffsetCommitCallback() {
            @Override
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
                if(e != null){
                    System.err.println("Commit failed for " + offsets);
                    System.err.println("Commit failed exception: " + e.getStackTrace());
                }
            }
        });
    }
    
2.6.2 长轮询poll消息

默认情况下,消费者一次会poll 500条消息。

//一次poll最大拉取消息的条数,可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);

代码中设置了长轮询的时间是1000毫秒:

// 设置长轮询时间是1000ms
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for(ConsumerRecord<String, String> record : records){
         System.out.printf("收到消息:partition = %d, offset = %d, key = %s, value = %s%n",
                 record.partition(), record.offset(), record.key(), record.value());
}

意味着:

如果一次poll到500条,直接执行for循环。

如果这一次没有poll到500条,且时间在1s内,要么长轮询继续poll,要么到500条,要么到1s。

如果多次poll都没达到500条,且1s时间到了,那么直接执行for循环。

如果两次poll的间隔超过30s,集群会认为该消费者的消费能力过弱,该消费者被提出消费组,触发rebalance机

制,rebalance机制会造成性能开销。可以通过设置这个参数,让一次poll的消息条数少一点。

//一次poll最大拉取消息的条数,可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
//如果两次poll的时间超出了30s的时间间隔,kafka会认为其消费能力过弱,将其提出消费组,将分区分配给其他消费者。-rebalance
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
2.6.3 消费者的健康状态检查

消费者每隔1s向kafka集群发送心跳,集群发现如果有超过10s没有续约的消费者,将被踢出消费组,触发该消费

组的rebalance机制,将该分区交给其他消费组里的其他消费者进行消费。

//consumer给broker发送?跳的间隔时间
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
//kafka如果超过10秒没有收到消费者的?跳,则会把消费者踢出消费组,进?rebalance,把分区分配给其他消费者。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000)
2.6.4 指定分区消费
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
// 只消费foo的分区0和分区1,不可以与subscribe方法同用
consumer.assign(Arrays.asList(partition0, partition1)); 
//TOPIC_NAME主题下的0号分区
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
2.6.5 从头消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
2.6.6 指定offset消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);//offset=10
2.6.7 获取最早offset或者最新offset
TopicPartition partition = new TopicPartition("foo", 0);
// 获取foo的分区0的最早offset
long beginningOffset = consumer.beginningOffsets(Collections.singletonList(partition)).get(partition); 
// 获取foo的分区0的最新offset
long endOffset = consumer.endOffsets(Collections.singletonList(partition)).get(partition);
2.6.8 从某一个offset开始消费
TopicPartition partition = new TopicPartition("foo", 0);
long offset = args[0];
// 移动到指定offset
consumer.seek(partition, offset);
// 移动到最新offset
consumer.seekToEnd(Collections.singletonList(partition));
// 移动到最早offset
consumer.seekToBeginning(Collections.singletonList(partition));
2.6.9 指定时间消费

根据时间,去所有的partition中确定该时间对应的offset,然后去所有的partition中找到该offset之后的消息开始

消费。

 List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
 //从1?时前开始消费
 long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
 Map<TopicPartition, Long> map = new HashMap<>();
 for (PartitionInfo par : topicPartitions) {
     map.put(new TopicPartition(TOPIC_NAME, par.partition()),
             fetchDataTime);
 }
 Map<TopicPartition, OffsetAndTimestamp> parMap =
         consumer.offsetsForTimes(map);
 for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :
         parMap.entrySet()) {
     TopicPartition key = entry.getKey();
     OffsetAndTimestamp value = entry.getValue();
     if (key == null || value == null) continue;
     Long offset = value.offset();
     System.out.println("partition-" + key.partition() +
             "|offset-" + offset);
     System.out.println();
     //根据消费?的timestamp确定offset
     if (value != null) {
         consumer.assign(Arrays.asList(key));
         consumer.seek(key, offset);
     }
 }
2.6.10 新消费组的消费offset规则

新消费组的消费者在启动以后,默认会从当前分区的最后一条消息的offset+1开始消费(消费新消息)。可以通过

以下的设置,让新的消费者第一次从头开始消费。之后开始消费新消息(最后消费位置的偏移量+1)

  • Latest:默认, 消费最新消息
  • earliest:第一次从头开始消费,之后开始消费新消息(最后消费位置的偏移量+1)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

参考文档:

https://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

2.7 从Kafka的topic中消费消息(自定义提交策略)

package com.example.kafka.demo1;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;

/**
 * 从Kafka的topic中消费消息(自定义提交策略)
 * 简单消费者--订阅主题带再均衡处理器
 *
 * @author tom
 */
public class MyConsumer3 {

    public static void main(String[] args) throws InterruptedException {
        // 1、创建消费者配置信息
        Properties properties = new Properties();
        // 连接的集群 bootstrap.servers
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        // 开启自动提交offset enable.auto.commit
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        // 自动提交offset的时间间隔 auto.commit.interval.ms
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        // key,value的反序列化 key.deserializer、value.deserializer
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        // 消费者组 group.id
        // 可以使用消费者组将若干个消费者组织到一起,共同消费Kafka中topic的数据
        // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group3");
        // auto.offset.reset
        // 重置消费者offset的方法(达到重复消费的目的),设置该属性也只在两种情况下生效:
        // 1.上面设置的消费组还未消费(可以更改组名来消费)
        // 2.该offset已经过期
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // 2、创建Kafka消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        // 3、订阅要消费的主题
        // 指定消费者从哪个topic中拉取数据
        kafkaConsumer.subscribe(Arrays.asList("topic3"), new ConsumerRebalanceListener() {
            // 在均衡开始之前和消费者停止读取消息之后调用
            // 如果在这个方法中提交偏移量,则下一个消费者就可以获得读取的偏移量
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                // 提交位移
                kafkaConsumer.commitSync();
            }

            // 在重新分配分区之后和消费者开始读取消息之前调用
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                long committedOffset = -1;
                for (TopicPartition topicPartition : partitions) {
                    // 获取该分区已消费的位移
                    committedOffset = kafkaConsumer.committed(topicPartition).offset();
                    // 重置位移到上一次提交的位移处开始消费
                    kafkaConsumer.seek(topicPartition, committedOffset + 1);
                }
            }
        });
        try {
            // 4、使用一个while循环,不断从Kafka的topic中拉取消息
            while (true) {
                // Kafka的消费者一次拉取一批的数据
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
                // 5、将将记录(record)的offset、key、value都打印出来
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    // 主题
                    String topic = consumerRecord.topic();
                    // offset:这条消息处于Kafka分区中的哪个位置
                    long offset = consumerRecord.offset();
                    // key、value
                    String key = consumerRecord.key();
                    String value = consumerRecord.value();
                    System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);
                    System.out.println(consumerRecord.key() + "----" + consumerRecord.value());
                }
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            kafkaConsumer.close();
        }
    }
}

程序输出:

在这里插入图片描述

package com.example.kafka.demo1;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.*;

/**
 * 从Kafka的topic中消费消息(自定义提交策略)
 * 提交指定偏移量和再均衡处理器实现
 *
 * @author tom
 */
public class MyConsumer4 {

    public static void main(String[] args) {
        // 1、创建消费者配置信息
        Properties properties = new Properties();
        // 连接的集群 bootstrap.servers
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        // 关闭自动提交 offset enable.auto.commit
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        // key,value的反序列化 key.deserializer、value.deserializer
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        // 消费者组 group.id
        // 可以使用消费者组将若干个消费者组织到一起,共同消费Kafka中topic的数据
        // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group3");
        // auto.offset.reset
        // 重置消费者offset的方法(达到重复消费的目的),设置该属性也只在两种情况下生效:
        // 1.上面设置的消费组还未消费(可以更改组名来消费)
        // 2.该offset已经过期
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // 2、创建Kafka消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
        // 3、订阅要消费的主题
        // 指定消费者从哪个topic中拉取数据
        kafkaConsumer.subscribe(Collections.singletonList("topic3"), new ConsumerRebalanceListener() {
            // 在均衡开始之前和消费者停止读取消息之后调用
            // 如果在这个方法中提交偏移量,则下一个消费者就可以获得读取的偏移量
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                System.out.println("Lost partitions in rebalance committing current offsets:" + currentOffsets);
                // 提交位移
                kafkaConsumer.commitSync(currentOffsets);
            }

            // 在重新分配分区之后和消费者开始读取消息之前调用
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                // 不做任何操作,也可以如下指定消费偏移量
                long committedOffset = -1;
                for (TopicPartition topicPartition : partitions) {
                    // 获取该分区已消费的位移
                    committedOffset = kafkaConsumer.committed(topicPartition).offset();
                    // 重置位移到上一次提交的位移处开始消费
                    kafkaConsumer.seek(topicPartition, committedOffset + 1);
                }
            }
        });
        try {
            // 4、使用一个while循环,不断从Kafka的topic中拉取消息
            while (true) {
                // Kafka的消费者一次拉取一批的数据
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
                // 5、将将记录(record)的offset、key、value都打印出来
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    // 主题
                    String topic = consumerRecord.topic();
                    // offset:这条消息处于Kafka分区中的哪个位置
                    long offset = consumerRecord.offset();
                    // key、value
                    String key = consumerRecord.key();
                    String value = consumerRecord.value();
                    System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);
                    System.out.println(consumerRecord.key() + "----" + consumerRecord.value());
                    // 设置需要提交的偏移量
                    currentOffsets.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1, "no metadata"));
                }
                // 手动异步提交指定偏移量
                kafkaConsumer.commitAsync(currentOffsets, null);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                kafkaConsumer.commitSync(currentOffsets);
            } finally {
                kafkaConsumer.close();
                System.out.println("closed consumer...");
            }
        }
    }
}

程序输出:

在这里插入图片描述

2.8 kafka中发送和消费复杂类型

1、往kafka中写入user数据

2、消费kafka中user的数据

3、kafka的topic是userinfo

2.8.1 将复杂类型和字节数字相互转换
package com.example.kafka.demo2;

import java.io.*;

/**
 * @author tom
 */
public class BeanConversion {
    /**
     * 对象转字节数组
     *
     * @param obj
     * @return
     */
    public static byte[] ObjectToBytes(Object obj) {
        byte[] bytes = null;
        ByteArrayOutputStream bo = null;
        ObjectOutputStream oo = null;
        try {
            bo = new ByteArrayOutputStream();
            oo = new ObjectOutputStream(bo);
            oo.writeObject(obj);
            bytes = bo.toByteArray();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (bo != null) {
                    bo.close();
                }
                if (oo != null) {
                    oo.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return bytes;
    }

    /**
     * 字节数组转对象
     *
     * @param bytes
     * @return
     */
    public static Object BytesToObject(byte[] bytes) {
        Object obj = null;
        ByteArrayInputStream bi = null;
        ObjectInputStream oi = null;
        try {
            bi = new ByteArrayInputStream(bytes);
            oi = new ObjectInputStream(bi);
            obj = oi.readObject();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (bi != null) {
                    bi.close();
                }
                if (oi != null) {
                    oi.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return obj;
    }
}
2.8.2 实现kafka的序列化接口
package com.example.kafka.demo2;

import org.apache.kafka.common.serialization.Serializer;

/**
 * @author tom
 */
public class EncodingKafka implements Serializer<Object> {

    @Override
    public byte[] serialize(String topic, Object data) {
        return BeanConversion.ObjectToBytes(data);
    }

}
2.8.3 实现kafka的反序列化接口
package com.example.kafka.demo2;

import org.apache.kafka.common.serialization.Deserializer;

/**
 * @author tom
 */
public class DecodingKafka implements Deserializer<Object> {

    @Override
    public Object deserialize(String topic, byte[] data) {
        return BeanConversion.BytesToObject(data);
    }

}
2.8.4 对象
package com.example.kafka.demo2;

import java.io.Serializable;

/**
 * @author tom
 */
public class User implements Serializable {

    private int id;
    private int age;
    private String name;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}
2.8.5 生产者发送数据
package com.example.kafka.demo2;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
 * 生产者发送数据
 *
 * @author tom
 */
public class KafkaProducerUser {

    public final static String TOPIC_NAME = "userinfo";

    public static void main(String[] args) {
        // 1、创建用于连接Kafka的Properties配置
        Properties properties = new Properties();
        // 指定连接的kafka集群
        properties.put("bootstrap.servers", "127.0.0.1:9092");
        // ack应答级别
        // all等价于-1   0    1
        properties.put("acks", "all");
        // 重试次数
        properties.put("retries", 1);
        // 批次大小
        // 16k
        properties.put("batch.size", 16384);
        // 等待时间
        properties.put("linger.ms", 1);
        // recordAccumulator缓冲区大小
        // 32m
        properties.put("buffer.memory", 33554432);
        // key,Value的序列化类
        // key的序列化,其类型是Integer
        properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        // 自己定义的序列化
        // value的序列化,其类型是Object
        properties.put("value.serializer", "com.example.kafka.demo2.EncodingKafka");
        // 2、创建一个生产者对象KafkaProducer
        KafkaProducer producer = new KafkaProducer<Integer, User>(properties);
        for (int i = 0; i < 100; i++) {
            User u = new User();
            u.setId(i * 3);
            u.setName("name" + i);
            if (i < 20) {
                u.setAge(i + 20);
            } else {
                u.setAge(i);
            }
            // public ProducerRecord(String topic, K key, V value)
            ProducerRecord<Integer, User> producerRecord = new ProducerRecord<>(TOPIC_NAME, u.getId(), u);
            // 3、调用send发送消息到指定Topic
            // send()方法会返回一个包含RecordMetadata的Future对象,不过因为我们会忽略返回值,所以无法知道消息是否发送成功
            // 如果不关心发送结果,那么可以使用这种发送方式
            Future<RecordMetadata> future = producer.send(producerRecord);
            // send()方住先返回一个 Future对象,然后调用Future对象的get()方法等待Kafka响应
            // 如果服务器返回错误,get()方法会抛出异
            // 如果没有发生错误,我们会得到一个RecordMetadata对象,可以用它获取消息的偏移量
            // 如果在发送数据之前或者在发送过程中发生了任何错误,比如broker返回了一个不允许重发消息的异常或者已经超过了重发的次数,那么就会抛出异常
            RecordMetadata rm = null;
            try {
                // 4、调用一个Future.get()方法等待响应
                rm = future.get();
                System.out.println(rm.topic() + "  分区:" + rm.partition() + "  偏移量:" + rm.offset());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        // 5、关闭生产者
        producer.close();
    }
}

程序输出:
在这里插入图片描述

2.8.6 消费者消费数据
package com.example.kafka.demo2;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * 消费者消费数据
 *
 * @author tom
 */
public class KafkaConsumerUser {

    public final static String TOPIC_NAME = "userinfo";

    public static void main(String[] args) throws Exception {
        // 1、创建消费者配置信息
        Properties properties = new Properties();
        // 连接的集群 bootstrap.servers
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        // 开启自动提交offset enable.auto.commit
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        // 自动提交offset的时间间隔 auto.commit.interval.ms
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        // key,value的反序列化 key.deserializer、value.deserializer
        // key的反序列化,其类型是Integer
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        // value的反序列化,其类型是Object
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.example.kafka.demo2.DecodingKafka");
        // 消费者组 group.id
        // 可以使用消费者组将若干个消费者组织到一起,共同消费Kafka中topic的数据
        // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, TOPIC_NAME);
        // auto.offset.reset
        // 重置消费者offset的方法(达到重复消费的目的),设置该属性也只在两种情况下生效:
        // 1.上面设置的消费组还未消费(可以更改组名来消费)
        // 2.该offset已经过期
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // 2、创建Kafka消费者
        KafkaConsumer<Integer, User> kafkaConsumer = new KafkaConsumer<>(properties);
        // 3、订阅要消费的主题
        // 指定消费者从哪个topic中拉取数据
        kafkaConsumer.subscribe(Arrays.asList(TOPIC_NAME));
        // 4、使用一个while循环,不断从Kafka的topic中拉取消息
        while (true) {
            // Kafka的消费者一次拉取一批的数据
            ConsumerRecords<Integer, User> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
            // 5.将将记录(record)的offset、key、value都打印出来
            for (ConsumerRecord<Integer, User> consumerRecord : consumerRecords) {
                // 主题
                String topic = consumerRecord.topic();
                // offset:这条消息处于Kafka分区中的哪个位置
                long offset = consumerRecord.offset();
                // key\value
                Integer key = consumerRecord.key();
                User value = consumerRecord.value();
                System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);
            }
            Thread.sleep(1000);
        }
    }
}

程序输出:

在这里插入图片描述

2.9 消费kafka topic中的历史数据

消费topic已经存在的数据,类似命令中 --from-beginning参数。

代码在该服务启动前,如果topic中存在数据,是可以全部读出来,但如果topic数据部分已经被消费了,也会被读

出来。

package com.example.kafka.demo3;

import com.example.kafka.demo2.User;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;

/**
 * @author tom
 */
public class KafkaConsumerHistoryOfUser {

    public final static String TOPIC_NAME = "userinfo";

    public static void main(String[] args) throws Exception {
        // 1、创建消费者配置信息
        Properties properties = new Properties();
        // 连接的集群 bootstrap.servers
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        // 开启自动提交offset enable.auto.commit
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        // 自动提交offset的时间间隔 auto.commit.interval.ms
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        // key,value的反序列化 key.deserializer、value.deserializer
        // key的反序列化,其类型是Integer
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        // value的反序列化,其类型是Object
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.example.kafka.demo2.DecodingKafka");
        // 消费者组 group.id
        // 可以使用消费者组将若干个消费者组织到一起,共同消费Kafka中topic的数据
        // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, TOPIC_NAME);
        // auto.offset.reset
        // 重置消费者offset的方法(达到重复消费的目的),设置该属性也只在两种情况下生效:
        // 1.上面设置的消费组还未消费(可以更改组名来消费)
        // 2.该offset已经过期
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // 2、创建Kafka消费者
        KafkaConsumer<Integer, User> kafkaConsumer = new KafkaConsumer<>(properties);
        // 基于再均衡监听器,在给消费者分配分区的时候将消息偏移量跳转到起始位置
        kafkaConsumer.subscribe(Collections.singletonList(TOPIC_NAME), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {

            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                // 读取历史数据 --from-beginning
                // 基于seekToBeginning方法
                kafkaConsumer.seekToBeginning(collection);
                // Map<TopicPartition, Long> beginningOffset = kafkaConsumer.beginningOffsets(collection);
                // for (Map.Entry<TopicPartition, Long> entry : beginningOffset.entrySet()) {
                // 基于seek方法
                // TopicPartition tp = entry.getKey();
                // long offset = entry.getValue();
                // consumer.seek(tp,offset);
                //}
            }
        });
        while (true) {
            // Kafka的消费者一次拉取一批的数据
            ConsumerRecords<Integer, User> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
            // 5.将将记录(record)的offset、key、value都打印出来
            for (ConsumerRecord<Integer, User> consumerRecord : consumerRecords) {
                // 主题
                String topic = consumerRecord.topic();
                // offset:这条消息处于Kafka分区中的哪个位置
                long offset = consumerRecord.offset();
                // key\value
                Integer key = consumerRecord.key();
                User value = consumerRecord.value();
                System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);
            }
            Thread.sleep(1000);
        }
    }
}

程序输出:

在这里插入图片描述

至此,结束。

文章来源:https://blog.csdn.net/qq_30614345/article/details/135443611
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。