Kafka 之生产者(Producer)

2024-01-10 11:29:30
1、简介

????????在消息发送的过程中,涉及到了两个线程——main 线程和Sender 线程。在main 线程中创建了一个双端队列RecordAccumulator。main 线程将消息发送给RecordAccumulator,Sender 线程不断从RecordAccumulator 中拉取消息发送到Kafka Broker。

2、生产者常用参数列表
参数名称描述
bootstrap.servers生产者连接集群所需的broker 地址清单。例如 node-1:9092, node-2:9092, node-3:9092,可以设置1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者可以从给定的 broker 里查找到其他 broker 信息。
key.serializer 和 value.serializer指定发送消息的key 和 value 的序列化类型。一定要写全类名。
buffer.memoryRecordAccumulator 缓冲区总大小, 默认 32m 。
batch.size缓冲区一批数据最大值,默认 16k 。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms如果数据迟迟未达到 batch.size sender 等待 linger.time 之后就会发送数据。单位 ms 默认值是 0ms ,表示没有延迟。 生产环境建议该值大小为 5-100ms 之间 。
acks0:生产者发送过来的数据,不需要等数据落盘应答。 1:生产者发送过来的数据 Leader 收到数据后应答。 -1 all ):生产者发送过来的数据 Leader 和 isr 队列里面的所有节点收齐数据后应答。 默认值是 -1,-1 和all 是等价的。
max.in.flight.requests.per.connection允许最多没有返回ack 的次数, 默认为 5 ,开启幂等性要保证该值是 1-5 的数字。
retries当消息发送出现错误的时候,系统会重发消息。retries表示重试次数 。 默认是 int 最大值, 2147483647 。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1 否则在重试此失败消息的时 候,其他的消息可能发送成功了 。
retry.backoff.ms两次重试之间的时间间隔,默认是100ms。
enable.idempotence是否开启幂等性 默认 true ,开启 幂等性 。
compression.type生产者发送的所有数据的压缩方式。默认是 none ,也就是不压缩。支持压缩类型 none 、 gzip 、 snappy 、 lz4 和 zstd 。
3、生产者发送消息代码实现
3.1、引入依赖
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.1</version>
</dependency>
?3.2、异步发送
3.2.1、不带回调函数的异步发送 producer 实现
public static void main(String[] args) throws ExecutionException, InterruptedException {
    // 1 、创建kafka生产者对象
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node-1:9092,node-2:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    // 使用自定义分区
    properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartition.class.getName());
    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    // 2、发送数据
    ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic1", "wei");
    producer.send(producerRecord);
    // 3、关闭资源
    producer.close();
}
3.2.2、带回调函数的异步发送 producer 实现

?????????回调函数会在producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息( RecordMetadata 和 异常信息( Exception )),如果 Exception 为 null ,说明消息发送成功,如果 Exception 不为 null ,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

public static void main(String[] args) throws ExecutionException, InterruptedException {
     // 1 、创建kafka生产者对象
     Properties properties = new Properties();
     properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node-1:9092,node-2:9092");
     properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
     properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
     // 使用自定义分区
     properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartition.class.getName());
     KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
     // 2、向指定分区发送数
     ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic1", "wei");
     // 添加callback函数,接收返回值,返回值封装在meta中
     Future<RecordMetadata> send = producer.send(producerRecord, (meta, ex) ->{
         System.out.println(meta.topic());
         System.out.println(meta.partition() + "-----------------------");
     });
     // 3、关闭资源
     producer.close();
 }
3.3、同步发送

?????????全部发送成功后再发送下一批数据。

 public static void main(String[] args) throws ExecutionException, InterruptedException {
     // 1 、创建kafka生产者对象
     Properties properties = new Properties();
     properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node-1:9092,node-2:9092");
     properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
     properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
     // 使用自定义分区
     properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartition.class.getName());
     KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
     // 2、向指定分区发送数
     ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic1", "wei");
     // 添加callback函数,接收返回值,返回值封装在meta中
     producer.send(producerRecord).get();
     // 3、关闭资源
     producer.close();
 }
3.4、生产者发送消息的分区策略

分区策略有以下三种情况(针对 ProducerRecord 不同的构造方法):

1)、指明 partition 值,直接将数据发送到指定分区;

2)、没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;

3)、既没有 partition 值又没有 key 的情况下,Kafka 采用Sticky Partiion (黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。

3.4.1、自定义分区

????????自定义分区,实现Partitioner 接口。

public class CustomPartition implements Partitioner {
    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        // 将消息中包含 long 的发送到分区1,其余的发送到分区 0
        String msg = o1.toString();
        if (msg.contains("long")){
            return 1;
        }
        return 0;
    }
    @Override
    public void close() {
    }
    @Override
    public void configure(Map<String, ?> map) {

    }
}
3.4.2、使用自定义分区

????????在生产者配置中添加指定分区策略。

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartition.class.getName());
4、总结

? ? ? ? 本文介绍kafka生产者的参数配置以及使用方法,?快速上手kafka生产者的使用。

????????本人是一个从小白自学计算机技术,对运维、后端、各种中间件技术、大数据等有一定的学习心得,想获取自学总结资料(pdf版本)或者希望共同学习,关注微信公众号:it自学社团。后台回复相应技术名称/技术点即可获得。(本人学习宗旨:学会了就要免费分享)

文章来源:https://blog.csdn.net/zwl2220943286/article/details/135446803
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。