Kafka-客户端使用

2023-12-13 05:44:03

理解Kafka正确使用方式

Kafka提供了两套客户端API,HighLevel API和LowLevel API。

  • HighLevel API封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。

  • LowLevel API则需要客户端自己管理Kafka的运行细节,Partition,Offset这些数据都由客户端自行管理。这层API功能更灵活,但是使用起来非常复杂,也更容易出错。只在极少数对性能要求非常极致的场景才会偶尔使用。

基础的客户端

引入Maven依赖:

<dependency>
	<groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.13</artifactId>
    <version>3.4.0</version>
</dependency>

消息发送者主流程

public class MyProducer
{
    // private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";
    private static final String BOOTSTRAP_SERVERS = "你的公网IP:9092";
    
    private static final String TOPIC = "disTopic";
    
    public static void main(String[] args)
        throws ExecutionException, InterruptedException
    {
        // PART1:设置发送者相关属性
        Properties props = new Properties();
        // 此处配置的是kafka的端口
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        // 配置key的序列化类
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 配置value的序列化类
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        
        Producer<String, String> producer = new KafkaProducer<>(props);
        CountDownLatch latch = new CountDownLatch(5);
        for (int i = 0; i < 5; i++)
        {
            // Part2:构建消息
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);
            // Part3:发送消息
            // 单向发送:不关心服务端的应答。
            // producer.send(record);
            // System.out.println("message "+i+" sended");
            
            // 同步发送:获取服务端应答消息前,会阻塞当前线程。
            // RecordMetadata recordMetadata = producer.send(record).get();
            // String topic = recordMetadata.topic();
            // int partition = recordMetadata.partition();
            // long offset = recordMetadata.offset();
            // String message = recordMetadata.toString();
            // System.out.println("message:[" + message + "] sended with topic:" + topic + "; partition:" + partition
            // + ";offset:" + offset);
            
            // 异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数
            producer.send(record, new Callback()
            {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e)
                {
                    if (null != e)
                    {
                        System.out.println("消息发送失败," + e.getMessage());
                        e.printStackTrace();
                    }
                    else
                    {
                        String topic = recordMetadata.topic();
                        long offset = recordMetadata.offset();
                        String message = recordMetadata.toString();
                        System.out
                            .println("message:[" + message + "] sended with topic:" + topic + ";offset:" + offset);
                    }
                    latch.countDown();
                }
            });
        }
        // 消息处理完才停止发送者。
        latch.await();
        producer.close();
    }
}

构建Producer分为三个步骤:

1、设置Producer核心属性:Producer可选的属性都可以由ProducerConfig类管理。比如ProducerConfig.BOOTSTRAP_SERVERS_CONFIG属性,显然就是指发送者要将消息发到哪个Kafka集群上。这是每个Producer必选的属性。在ProducerConfig中,对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。

2、构建消息:Kafka的消息是一个Key-Value结构的消息。其中,key和value都可以是任意对象类型。其中,key主要是用来进行Partition分区的,业务上更关心的是value。

3、使用Producer发送消息:通常用到的就是单向发送、同步发送和异步发送者三种发送方式。

异常信息: Expiring 1 record(s) for disTopic-0:120010 ms?has?passed?since batch creation

修改kafka安装路径的config/server.properties配置,然后重启解决问题

# The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://你的公网IP:9092

注意:云服务器配置安全组开放端口,虚拟机防火墙状态要关闭

消息消费者主流程

public class MyConsumer
{
    // private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";
    private static final String BOOTSTRAP_SERVERS = "你的公网IP:9092";
    
    private static final String TOPIC = "disTopic";
    
    public static void main(String[] args)
    {
        // PART1:设置发送者相关属性
        Properties props = new Properties();
        // kafka地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        // 每个消费者要指定一个group
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        // key序列化类
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        // value序列化类
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(TOPIC));
        // 自行调整Offset
        // consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC,0)));
        while (true)
        {
            // PART2:拉取消息
            // 100毫秒超时时间
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));
            // records.partitions().forEach(topicPartition -> {
            // String key = topicPartition.topic()+topicPartition.partition();
            // List<ConsumerRecord<String, String>> partionRecords = records.records(topicPartition);
            // long value = partionRecords.get(partionRecords.size()-1).offset();
            //
            // });
            // PART3:处理消息
            for (ConsumerRecord<String, String> record : records)
            {
                System.out.println("partition = " + record.partition() + "offset = " + record.offset() + ";key = "
                    + record.key() + "; value= " + record.value());
            }
            
            // 提交offset,消息就不会重复推送。
            consumer.commitSync(); // 同步提交,表示必须等到offset提交完毕,再去消费下一批数据。
            // consumer.commitAsync(); //异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。
        }
    }
}

Consumer同样是分为三个步骤:

1、设置Consumer核心属性 :可选的属性都可以由ConsumerConfig类管理。在这个类中,同样对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。同样BOOTSTRAP_SERVERS_CONFIG是必须设置的属性。

2、拉取消息:Kafka采用Consumer主动拉取消息的Pull模式。consumer主动从Broker上拉取一批感兴趣的消息。

3、处理消息,提交位点:消费者将消息拉取完成后,就可以交由业务自行处理对应的这一批消息了。只是消费者需要向Broker提交偏移量offset。如果不提交Offset,Broker会认为消费者端消息处理失败了,还会重复进行推送。

kafka官方配置: Apache Kafka

从客户端属性来梳理客户端工作机制

Kafka的设计精髓,是在网络不稳定,服务也随时会崩溃的这些作死的复杂场景下,如何保证消息的高并发、高吞吐。首先要理解基础模型。

消费者分组消费机制

最重要的一个机制

在Consumer中,都需要指定一个GROUP_ID_CONFIG属性,表示当前Consumer所属的消费者组。

public static final String GROUP_ID_CONFIG = "group.id";
public static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using <code>subscribe(topic)</code> or the Kafka-based offset management strategy.";

//参数GROUP_INSTANCE_ID_CONFIG:给组成员设置一个固定的instanceId,可以减少Kafka不必要的rebalance

描述翻译:对于Consumer,如果需要在subcribe时使用组管理功能以及Kafka提供的offset管理策略,那就必须要配置GROUP_ID_CONFIG属性。

消费者组作用:生产者往Topic下发消息时,会尽量均匀的将消息发送到Topic下的各个Partition当中。而这个消息,会向所有订阅了该Topic的消费者推送。推送时,每个ConsumerGroup中只会推送一份。也就是同一个消费者组中的多个消费者实例,只会共同消费一个消息副本。而不同消费者组之间,会重复消费消息副本。

Offset偏移量:表示每个消费者组在每个Partiton中已经消费处理的进度

#查看消费者组Offset记录情况
bin/kafka-consumer-groups.sh --bootstrap-server worker1:9092 --describe --group test

这个Offset偏移量,需要消费者处理完成后主动向Kafka的Broker提交。提交完成后,Broker就会更新消费进度,表示这个消息已经被这个消费者组处理完了。但是如果消费者没有提交Offset,Broker就会认为这个消息还没有被处理过,就会重新往对应的消费者组进行推送,不过这次,一般会尽量推送给同一个消费者组当中的其他消费者实例。

Kafka自动提交:

public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit";
private static final String ENABLE_AUTO_COMMIT_DOC = "If true the consumer's offset will be periodically committed in the background.";

如何提高Offset数据的安全性呢?

服务端:在Consumer中,实际上提供了AUTO_OFFSET_RESET_CONFIG参数,来指定消费者组在服务端的Offset不存在时如何进行后续消费。(有可能服务端初始化Consumer Group的Offset失败,也有可能Consumer Group当前的Offset对应的数据文件被过期删除了)

ConsumerConfig.AUTO_OFFSET_RESEWT_CONFIG :当Server端没有对应的Offset时如何处理。 可选项:

  • earliest: 自动设置为当前最早的offset

  • latest:自动设置为当前最晚的offset

  • none: 如果消费者组对应的offset找不到,就向Consumer抛异常。

  • 其他选项: 向Consumer抛异常。

客户端:

  • 异步提交:消费者在处理业务的同时,异步向Broker提交Offset。效率高,但是容易丢数据(处理失败但是offset提交了)

  • 同步提交:消费者保证处理完所有业务后,再提交Offset。消息不会因为offset丢失。业务处理失败不提交Offset,还可以重试。坏处是处理变慢,另外还可能重复消费(生产者推送消息给组内的其他消费者)

生产者拦截器机制

允许客户端在生产者消息发送到Kafka集群之前,对消息进行拦截,甚至可以修改消息内容

涉及到Producer中指定的一个参数:INTERCEPTOR_CLASSES_CONFIG

public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
public static final String INTERCEPTOR_CLASSES_DOC = "A list of classes to use as interceptors. Implementing the <code>org.apache.kafka.clients.producer.ProducerInterceptor</code> interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there are no interceptors.";

按照说明,定义一个自己的拦截器实现类:

public class MyInterceptor implements ProducerInterceptor
{
    // 发送消息时触发
    @Override
    public ProducerRecord onSend(ProducerRecord producerRecord)
    {
        System.out.println("prudocerRecord : " + producerRecord.toString());
        return producerRecord;
    }
    
    // 收到服务端响应时触发
    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e)
    {
        System.out.println("acknowledgement recordMetadata:" + recordMetadata.toString());
    }
    
    // 连接关闭时触发
    @Override
    public void close()
    {
        System.out.println("producer closed");
    }
    
    // 整理配置项
    @Override
    public void configure(Map<String, ?> map)
    {
        System.out.println("=====config start======");
        for (Map.Entry<String, ?> entry : map.entrySet())
        {
            System.out.println("entry.key:" + entry.getKey() + " === entry.value: " + entry.getValue());
        }
        System.out.println("=====config end======");
    }
}

然后在生产者中指定拦截器类(多个拦截器类,用逗号隔开)

props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.gao.kfk.basic.MyInterceptor");

拦截器机制一般用得比较少,主要用在一些统一添加时间等类似的业务场景。比如,用Kafka传递一些POJO,就可以用拦截器统一添加时间属性。但是我们平常用Kafka传递的都是String类型的消息,POJO类型的消息,Kafka可以传吗?这就要用到下面的消息序列化机制。

消息序列化机制

Producer指定了两个属性KEY_SERIALIZER_CLASS_CONFIG和VALUE_SERIALIZER_CLASS_CONFIG,对于这两个属性,在ProducerConfig中都有配套的说明属性。

public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
public static final String KEY_SERIALIZER_CLASS_DOC = "Serializer class for key that implements the <code>org.apache.kafka.common.serialization.Serializer</code> interface.";
public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
public static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>org.apache.kafka.common.serialization.Serializer</code> interface.";

通过这两个参数,可以指定消息生产者如何将消息的key和value序列化成二进制数据。

  • key是用来进行分区的可选项。Kafka通过key来判断消息要分发到哪个Partition。

    如果没有填写key,那么Kafka会使Round-robin轮询的方式,自动选择Partition。

    如果填写了key,那么会通过声明的Serializer序列化接口,将key转换成一个byte[]数组,然后对key进行hash,选择Partition。这样可以保证key相同的消息会分配到相同的Partition中。

  • Value是业务上比较关心的消息。Kafka同样需要将Value对象通过Serializer序列化接口,将Key转换成byte[]数组,这样才能比较好的在网络上传输Value信息,以及将Value信息落盘到操作系统的文件当中。

生产者要对消息进行序列化,那么消费者拉取消息时,自然需要进行反序列化。所以,在Consumer中,也有反序列化的两个配置

public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
public static final String KEY_DESERIALIZER_CLASS_DOC = "Deserializer class for key that implements the <code>org.apache.kafka.common.serialization.Deserializer</code> interface.";
public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
public static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>org.apache.kafka.common.serialization.Deserializer</code> interface.";

在Kafka中,对于常用的一些基础数据类型,都已经提供了对应的实现类。但是,如果需要使用一些自定义的消息格式,比如自己定制的POJO,就需要定制具体的实现类了。

在自己进行序列化机制时,需要考虑的是如何用二进制来描述业务数据:

一种类型是定长的基础类型,比如Integer,Long,Double等。这些基础类型转化成二进制数组都是定长的。这类属性可以直接转成序列化数组,在反序列化时,只要按照定长去读取二进制数据就可以反序列化了。

另一种是不定长的浮动类型,比如String,或者基于String的JSON类型等。这种浮动类型的基础数据转化成二进制数组,长度都是不一定的。对于这类数据,通常的处理方式都是先往二进制数组中写入一个定长的数据的长度数据(Integer或者Long类型),然后再继续写入数据本身。这样,反序列化时,就可以先读取一个定长的长度,再按照这个长度去读取对应长度的二进制数据,这样就能读取到数据的完整二进制内容。

序列化机制是在高并发场景中非常重要的一个优化机制。高效的序列化实现能够极大的提升分布式系统的网络传输以及数据落盘的能力。例如对于一个User对象,即可以使用JSON字符串这种简单粗暴的序列化方式,也可以选择按照各个字段进行组合序列化的方式。但是显然后者的占用空间比较小,序列化速度也会比较快。而Kafka在文件落盘时,也设计了非常高效的数据序列化实现,这也是Kafka高效运行的一大支撑。

在很多其他业务场景中,也需要我们提供更高效的序列化实现。例如使用MapReduce框架时,就需要自行定义数据的序列化方式。使用Netty框架进行网络调用时,为了防止粘包,也需要定制数据的序列化机制

消息分区路由机制

消息如何进行路由?

  • Producer会根据消息的key选择Partition,具体如何通过key找Partition呢?

  • 一个消费者组会共同消费一个Topic下的多个Partition中的同一套消息副本,那Consumer节点是不是可以决定自己消费哪些Partition的消息呢?

首先,在Producer中,可以指定一个Partitioner来对消息进行分配。

public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
private static final String PARTITIONER_CLASS_DOC = "A class to use to determine which partition to be send to when produce the records. Available options are:" +
    "<ul>" +
    "<li>If not set, the default partitioning logic is used. " +
    "This strategy will try sticking to a partition until at least " + BATCH_SIZE_CONFIG + " bytes is produced to the partition. It works with the strategy:" +
    "<ul>" +
    "<li>If no partition is specified but a key is present, choose a partition based on a hash of the key</li>" +
    "<li>If no partition or key is present, choose the sticky partition that changes when at least " + BATCH_SIZE_CONFIG + " bytes are produced to the partition.</li>" +
    "</ul>" +
    "</li>" +
    "<li><code>org.apache.kafka.clients.producer.RoundRobinPartitioner</code>: This partitioning strategy is that " +
    "each record in a series of consecutive records will be sent to a different partition(no matter if the 'key' is provided or not), " +
    "until we run out of partitions and start over again. Note: There's a known issue that will cause uneven distribution when new batch is created. " +
    "Please check KAFKA-9965 for more detail." +
    "</li>" +
    "</ul>" +
    "<p>Implementing the <code>org.apache.kafka.clients.producer.Partitioner</code> interface allows you to plug in a custom partitioner.";

这里就说明了Kafka是通过一个Partitioner接口的具体实现来决定一个消息如何根据Key分配到对应的Partition上的。可以很简单的实现一个自己的分配策略。

在之前的3.2.0版本,Kafka提供了三种默认的Partitioner实现类,RoundRobinPartitioner,DefaultPartitioner和UniformStickyPartitioner。目前后面两种实现已经标记为过期,被替换成了默认的实现机制。

对于生产者,默认的Sticky策略在给一个生产者分配了一个分区后,会尽可能一直使用这个分区。等待该分区的batch.size(默认16K)已满,或者这个分区的消息已完成 linger.ms(默认0毫秒,表示如果batch.size迟迟没有满后的等待时间)。RoundRobinPartitioner是在各个Partition中进行轮询发送,这种方式没有考虑到消息大小以及各个Broker性能差异,用得比较少。

另外可以自行指定一个Partitioner实现类,定制分区逻辑。在Partitioner接口中,核心要实现的就是partition方法。根据相关信息,选择一个Partition。比如用key对partition的个数取模之类的。而Topic下的所有Partition信息都在cluster参数中。

//获取所有的Partition信息。
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

然后,在Consumer中,可以指定一个PARTITION_ASSIGNMENT_STRATEGY分区分配策略,决定如何在多个Consumer实例和多个Partitioner之间建立关联关系。

public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";
private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "A list of class names or class types, " +
    "ordered by preference, of supported partition assignment strategies that the client will use to distribute " +
    "partition ownership amongst consumer instances when group management is used. Available options are:" +
    "<ul>" +
    "<li><code>org.apache.kafka.clients.consumer.RangeAssignor</code>: Assigns partitions on a per-topic basis.</li>" +
    "<li><code>org.apache.kafka.clients.consumer.RoundRobinAssignor</code>: Assigns partitions to consumers in a round-robin fashion.</li>" +
    "<li><code>org.apache.kafka.clients.consumer.StickyAssignor</code>: Guarantees an assignment that is " +
    "maximally balanced while preserving as many existing partition assignments as possible.</li>" +
    "<li><code>org.apache.kafka.clients.consumer.CooperativeStickyAssignor</code>: Follows the same StickyAssignor " +
    "logic, but allows for cooperative rebalancing.</li>" +
    "</ul>" +
    "<p>The default assignor is [RangeAssignor, CooperativeStickyAssignor], which will use the RangeAssignor by default, " +
    "but allows upgrading to the CooperativeStickyAssignor with just a single rolling bounce that removes the RangeAssignor from the list.</p>" +
    "<p>Implementing the <code>org.apache.kafka.clients.consumer.ConsumerPartitionAssignor</code> " +
    "interface allows you to plug in a custom assignment strategy.</p>";

同样,Kafka内置了一些实现方式,在通常情况下也都是最优的选择。可以实现自己的分配策略。

从上面介绍可以看到Kafka默认提供了三种消费者的分区分配策略

  • range策略: 比如一个Topic有10个Partiton(partition 0~9) 一个消费者组下有三个Consumer(consumer1~3)。Range策略就会将分区0~3分给一个Consumer,4~6给一个Consumer,7~9给一个Consumer。

  • round-robin策略:轮询分配策略,可以理解为在Consumer中一个一个轮流分配分区。比如0,3,6,9分区给一个Consumer,1,4,7分区给一个Consumer,然后2,5,8给一个Consumer

  • sticky策略:粘性策略。这个策略有两个原则:

    • 在开始分区时,尽量保持分区的分配均匀。比如按照Range策略分(这一步实际上是随机的)。

    • 分区的分配尽可能的与上一次分配的保持一致。比如在range分区的情况下,第三个Consumer的服务宕机了,那么按照sticky策略,就会保持consumer1和consumer2原有的分区分配情况。然后将consumer3分配的7~9分区尽量平均的分配到另外两个consumer上。这种粘性策略可以很好的保持Consumer的数据稳定性。

另外可以通过继承AbstractPartitionAssignor抽象类自定义消费者的订阅方式。

官方默认提供的生产者端的默认分区器以及消费者端的RangeAssignor+CooperativeStickyAssignor分配策略,在大部分场景下都是非常高效的算法。深入理解这些算法,对于你深入理解MQ场景,以及借此去横向对比理解其他的MQ产品,都是非常有帮助的。

那么在哪些场景下我们可以自己来定义分区器呢?例如如果在部署消费者时,如果我们的服务器配置不一样,就可以通过定制消费者分区器,让性能更好的服务器上的消费者消费较多的消息,而其他服务器上的消费者消费较少的消息,这样就能更合理的运用上消费者端的服务器性能,提升消费者的整体消费速度。

生产者消息缓存机制

Kafka生产者为了避免高并发请求对服务端造成过大压力,每次发消息时并不是一条一条发往服务端,而是增加了一个高速缓存,将消息集中到缓存后,批量进行发送。这种缓存机制也是高并发处理时非常常用的一种机制。

Kafka的消息缓存机制涉及到KafkaProducer中的两个关键组件: accumulator 和 sender

//1.记录累加器
int batchSize = Math.max(1, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
this.accumulator = new RecordAccumulator(logContext,batchSize,this.compressionType,lingerMs(config),retryBackoffMs,deliveryTimeoutMs, partitionerConfig,metrics,PRODUCER_METRIC_GROUP_NAME,time,apiVersions,transactionManager,new BufferPool(this.totalMemorySize, batchSize, metrics, time, PRODUCER_METRIC_GROUP_NAME));
//2. 数据发送线程
this.sender = newSender(logContext, kafkaClient, this.metadata);

其中RecordAccumulator,就是Kafka生产者的消息累加器。KafkaProducer要发送的消息都会在ReocrdAccumulator中缓存起来,然后再分批发送给kafka broker。

在RecordAccumulator中,会针对每一个Partition,维护一个Deque双端队列,这些Dequeue队列基本上是和Kafka服务端的Topic下的Partition对应的。每个Dequeue里会放入若干个ProducerBatch数据。KafkaProducer每次发送的消息,都会根据key分配到对应的Deque队列中。然后每个消息都会保存在这些队列中的某一个ProducerBatch中。而消息分发的规则,就是由上面的Partitioner组件完成的。

这里主要涉及到两个参数

//RecordAccumulator缓冲区大小
public static final String BUFFER_MEMORY_CONFIG = "buffer.memory";
private static final String BUFFER_MEMORY_DOC = "The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are "
                                                    + "sent faster than they can be delivered to the server the producer will block for <code>" + MAX_BLOCK_MS_CONFIG + "</code> after which it will throw an exception."
                                                    + "<p>"
                                                    + "This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since "
                                                    + "not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if "
                                                    + "compression is enabled) as well as for maintaining in-flight requests.";

//缓冲区每一个batch的大小
public static final String BATCH_SIZE_CONFIG = "batch.size";
private static final String BATCH_SIZE_DOC = "The producer will attempt to batch records together into fewer requests whenever multiple records are being sent"
                                                 + " to the same partition. This helps performance on both the client and the server. This configuration controls the "
                                                 + "default batch size in bytes. "
                                                 + "<p>"
                                                 + "No attempt will be made to batch records larger than this size. "
                                                 + "<p>"
                                                 + "Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent. "
                                                 + "<p>"
                                                 + "A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable "
                                                 + "batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a "
                                                 + "buffer of the specified batch size in anticipation of additional records."
                                                 + "<p>"
                                                 + "Note: This setting gives the upper bound of the batch size to be sent. If we have fewer than this many bytes accumulated "
                                                 + "for this partition, we will 'linger' for the <code>linger.ms</code> time waiting for more records to show up. "
                                                 + "This <code>linger.ms</code> setting defaults to 0, which means we'll immediately send out a record even the accumulated "
                                                 + "batch size is under this <code>batch.size</code> setting.";

这里面也提到了几个其他的参数,比如 MAX_BLOCK_MS_CONFIG ,默认60秒

接下来,sender就是KafkaProducer中用来发送消息的一个单独的线程。从这里可以看到,每个KafkaProducer对象都对应一个sender线程。他会负责将RecordAccumulator中的消息发送给Kafka。

Sender也并不是一次就把RecordAccumulator中缓存的所有消息都发送出去,而是每次只拿一部分消息。他只获取RecordAccumulator中缓存内容达到BATCH_SIZE_CONFIG大小的ProducerBatch消息。当然,如果消息比较少,ProducerBatch中的消息大小长期达不到BATCH_SIZE_CONFIG的话,Sender也不会一直等待。最多等待LINGER_MS_CONFIG时长。然后就会将ProducerBatch中的消息读取出来。LINGER_MS_CONFIG默认值是0。

然后,Sender对读取出来的消息,会以Broker为key,缓存到一个对应的队列当中。这些队列当中的消息就称为InflightRequest。接下来这些Inflight就会一一发往Kafka对应的Broker中,直到收到Broker的响应,才会从队列中移除。这些队列也并不会无限缓存,最多缓存MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION(默认值为5)个请求。

生产者缓存机制的主要目的是将消息打包,减少网络IO频率。所以,在Sender的InflightRequest队列中,消息也不是一条一条发送给Broker的,而是一批消息一起往Broker发送。而这就意味着这一批消息是没有固定的先后顺序的。

其中涉及到的几个主要参数如下:

public static final String LINGER_MS_CONFIG = "linger.ms";
private static final String LINGER_MS_DOC = "The producer groups together any records that arrive in between request transmissions into a single batched request. "
    + "Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to "
    + "reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount "
    + "of artificial delay&mdash;that is, rather than immediately sending out a record, the producer will wait for up to "
    + "the given delay to allow other records to be sent so that the sends can be batched together. This can be thought "
    + "of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once "
    + "we get <code>" + BATCH_SIZE_CONFIG + "</code> worth of records for a partition it will be sent immediately regardless of this "
    + "setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the "
    + "specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting <code>" + LINGER_MS_CONFIG + "=5</code>, "
    + "for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.";



public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking."
    + " Note that if this configuration is set to be greater than 1 and <code>enable.idempotence</code> is set to false, there is a risk of"
    + " message reordering after a failed send due to retries (i.e., if retries are enabled); "
    + " if retries are disabled or if <code>enable.idempotence</code> is set to true, ordering will be preserved."
    + " Additionally, enabling idempotence requires the value of this configuration to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "."
    + " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. ";

最后,Sender会通过其中的一个Selector组件完成与Kafka的IO请求,并接收Kafka的响应。

//org.apache.kafka.clients.producer.KafkaProducer#doSend
if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), appendCallbacks.getPartition());
                this.sender.wakeup();
            }

Kafka的生产者缓存机制是Kafka面对海量消息时非常重要的优化机制。合理优化这些参数,对于Kafka集群性能提升是非常重要的。比如如果你的消息体比较大,那么应该考虑加大batch.size,尽量提升batch的缓存效率。而如果Producer要发送的消息确实非常多,那么就需要考虑加大total.memory参数,尽量避免缓存不够造成的阻塞。如果发现生产者发送消息比较慢,那么可以考虑提升max.in.flight.requests.per.connection参数,这样能加大消息发送的吞吐量。

发送应答机制

生产者消息幂等性

生产者消息事务

文章来源:https://blog.csdn.net/weixin_58482311/article/details/134938989
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。