kafka生产者设置ack、消费者设置自动提交实例

2024-01-07 17:19:39

生产者

在 Kafka 中,可以通过设置 acks 参数为 “all” 来确保生产者在成功写入所有副本后才认为消息发送成功。下面是一个简单的 Java 示例,演示了如何在 Kafka 生产者中设置 acks=all:

java

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Producer {
    private static final String TOPIC_NAME = "my_topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        // 创建配置对象
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.ACKS_CONFIG, "all"); // 设置 acks 为 "all"

        // 创建生产者实例
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 发送消息
        try {
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key", "value");
            producer.send(record).get(); // 使用 get() 方法阻塞等待消息发送完成
            System.out.println("Message sent successfully.");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

上述代码中,首先创建了一个 Properties 对象,并进行了相应的配置。其中,通过设置 ACKS_CONFIG 参数为 “all”,将生产者的确认模式设置为 “all”,表示只有当所有副本都成功写入消息后,生产者才认为消息发送成功。

然后,使用这些配置创建了一个 KafkaProducer 实例。在发送消息时,使用 send() 方法发送消息,并通过 get() 方法阻塞等待消息发送完成。这样可以确保在返回之前,所有副本已经完成写入操作。

需要注意的是,send() 方法返回一个 Future 对象,可以使用 get() 方法获取发送结果。如果发送过程中发生异常,可以通过捕获异常来处理。在示例中,我们简单地打印出成功发送消息的提示。

消费者

在 Kafka 中,可以通过手动提交消费位移来控制消费者的位移位置。下面是一个简单的 Java 示例,演示了如何在 Kafka 中使用手动提交消费位移:

java

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class Consumer {
    private static final String TOPIC_NAME = "my_topic";
    private static final String GROUP_ID = "my_group";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final int MAX_POLL_RECORDS = 100;

    public static void main(String[] args) {
        // 创建配置对象
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁用自动提交位移
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        // 创建消费者实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅主题
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        // 消费消息
        try {
      
  while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            if (!records.isEmpty()) {
                // 手动提交位移
                List<TopicPartition> partitions = new ArrayList<>();
                for (ConsumerRecord<String, String> record : records) {
                    partitions.add(new TopicPartition(record.topic(), record.partition()));
                }
                consumer.commitSync(partitions);
            }
        }
    } finally {
        consumer.close();
    }
}

}
上述代码中,首先创建了一个 Properties 对象,并进行了相应的配置。注意,通过设置 ENABLE_AUTO_COMMIT_CONFIG 参数为 false,禁用了消费者的自动提交位移功能。

然后,使用这些配置创建了一个 KafkaConsumer 实例,并订阅了指定的主题。在消费消息时,使用 poll() 方法从 Kafka 集群拉取一批消息,并进行手动提交位移操作(通过 commitSync() 方法)。

需要注意的是,手动提交位移时,需要指定每个分区的位移信息。在上述示例中,我们通过遍历消息列表,构建了一个分区列表,并将其传递给 commitSync() 方法。

文章来源:https://blog.csdn.net/wang121213145/article/details/135432692
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。