深入浅出分析kafka客户端程序设计 ----- 消费者篇----万字总结
1.?Kafka?消费者的逻辑
- 配置消费者客户端参数。
- 创建相应的消费者实例。
- 订阅主题。
- 拉取消息并消费;
- 提交消息位移;
- 关闭消费者实例;
2 Kafka 的C++ API
2.1 RdKafka::Conf
见生成者实现文章。
2.2 RdKafka::Event
见生成者实现文章。
2.3 RdKafka::EventCb
见生成者实现文章。
2.4 RdKafka::TopicPartition
static TopicPartition * create(const std::string &topic, int partition);
//创建一个TopicPartition对象。
static TopicPartition *create (const std::string &topic, int partition,int64_t offset);
//创建TopicPartition对象。
static void destroy (std::vector<TopicPartition*> &partitions);
//销毁所有TopicPartition对象。
const std::string & topic () const;
//返回Topic名称。
int partition ();
//返回分区号。
int64_t offset();
//返回位移。
void set_offset(int64_t offset);
//设置位移。
ErrorCode err();
//返回错误码。
2.5 RdKafka::RebalanceCb
virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector< TopicPartition * >
&partitions)=0;
用于RdKafka::KafkaConsunmer的组再平衡回调函数;注册rebalance_cb回调函数会关闭rdkafka的自动分区赋值和再分配并替换应用程序的rebalance_cb回调函数
?再平衡回调函数负责对基于RdKafka::ERR_ASSIGN_PARTITIONS和RdKafka::ERR_REVOKE_PARTITIONS事件更新rdkafka的分区分配,也能处理任意前两者错误除外其它再平衡失败错误。对于RdKafka::ERR_ASSIGN_PARTITIONS和RdKafka::ERR_REVOKE_PARTITIONS事件之外的其它再平衡失败错误,必须调用unassign()同步状态。
没有再平衡回调函数,rdkafka也能自动完成再平衡过程,但注册一个再平衡回调函数可以使应用程序在执行其它操作时拥有更大的灵活性,例如从指定位置获取位移或手动提交位移。
C++封装API:
// 定义了一个名为ConsumerRebalanceCb的类,继承自RdKafka::RebalanceCb
class ConsumerRebalanceCb : public RdKafka::RebalanceCb
{
private:
// 定义了一个静态方法printTopicPartition,用于打印当前获取的分区
static void printTopicPartition(const std::vector<RdKafka::TopicPartition*>& partitions)
{
// 循环遍历传入的分区集合,打印每个分区的主题和分区号
for (unsigned int i = 0; i < partitions.size(); i++)
std::cerr << partitions[i]->topic() <<
"[" << partitions[i]->partition() << "], ";
std::cerr << "\n";
}
public:
// 定义了一个再平衡回调函数rebalance_cb,处理消费者组再平衡时的事件
void rebalance_cb(RdKafka::KafkaConsumer* consumer, RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition*>& partitions)
{
// 打印再平衡回调事件的错误码和涉及的分区信息
std::cerr << "RebalanceCb: " << RdKafka::err2str(err) << ": ";
printTopicPartition(partitions);
// 根据再平衡的错误码进行处理
if (err == RdKafka::ERR__ASSIGN_PARTITIONS)
{
// 如果是分配分区的事件,将分区分配给消费者,并记录分区数量
consumer->assign(partitions);
partition_count = (int)partitions.size();
}
else
{
// 如果是分区撤销的事件,取消消费者已分配的所有分区,并将分区数量置为0
consumer->unassign();
partition_count = 0;
}
}
private:
int partition_count; // 记录分区数量的私有成员变量
};
2.6 RdKafka::Message
见生成者实现文章。
2.7 RdKafka::KafkaConsumer(核心)
KafkaConsumer是高级API,要求Kafka 0.9.0以上版本,当前支持range和roundrobin分区分配策略。
static KafkaConsumer * create(Conf *conf, std::string &errstr);
创建KafkaConsumer对象,conf对象必须配置Consumer要加入的消费者组。使用KafkaConsumer::close()进行关闭。
ErrorCode assignment(std::vector< RdKafka::TopicPartition * > &partitions);
返回由RdKafka::KafkaConsumer::assign() 设置的当前分区。
ErrorCode subscription(std::vector< std::string > &topics);
返回由RdKafka::KafkaConsumer::subscribe() 设置的当前订阅Topic。
ErrorCode subscribe(const std::vector< std::string > &topics);
更新订阅Topic分区。
ErrorCode unsubscribe();
将当前订阅Topic取消订阅分区。
ErrorCode assign(const std::vector< TopicPartition * > &partitions);
将分配分区更新为partitions。
ErrorCode unassign();
停止消费并删除当前分配的分区。
Message * consume(int timeout_ms);
消费消息或获取错误事件,触发回调函数,会自动调用注册的回调函数,包括RebalanceCb、EventCb、OffsetCommitCb等。需要使用delete释放消息。应用程序必须确保consume在指定时间间隔内调用,为了执行等待调用的回调函数,即使没有消息。当RebalanceCb被注册时,在需要调用和适当处理内部Consumer同步状态时,确保consume在指定时间间隔内调用极为重要。应用程序必须禁止对KafkaConsumer对象调用poll函数。
如果RdKafka::Message::err()是ERR_NO_ERROR,则返回正常的消息;如果RdKafka::Message::err()是ERR_NO_ERRO,返回错误事件;如果RdKafka::Message::err()是ERR_TIMED_OUT,则超时。
ErrorCode commitSync();
提交当前分配分区的位移,同步操作,会阻塞直到位移被提交或提交失败。如果注册了RdKafka::OffsetCommitCb回调函数,其会在KafkaConsumer::consume()函数内调用并提交位移。
ErrorCode commitAsync();
异步提交位移。
ErrorCode commitSync(Message *message);
基于消息对单个topic+partition对象同步提交位移。
virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets) = 0;
对指定多个TopicPartition同步提交位移。
ErrorCode commitAsync(Message *message);
基于消息对单个TopicPartition异步提交位移。
virtual ErrorCode commitAsync (const std::vector<TopicPartition*> &offsets) = 0;
对多个TopicPartition异步提交位移。
ErrorCode close();
正常关闭,会阻塞直到四个操作完成(触发避免当前分区分配的局部再平衡,停止当前赋值消费,提交位移,离开分组)
virtual ConsumerGroupMetadata *groupMetadata () = 0;
返回本Consumer实例的Consumer Group的元数据。
ErrorCode position (std::vector<TopicPartition*> &partitions)
获取TopicPartition对象中当前位移,会别填充TopicPartition对象的offset字段。
ErrorCode seek (const TopicPartition &partition, int timeout_ms)
定位TopicPartition的Consumer到位移。timeout_ms为0,会开始Seek并立即返回;timeout_ms非0,Seek会等待timeout_ms时间。
ErrorCode offsets_store (std::vector<TopicPartition*> &offsets)
为TopicPartition存储位移,位移会在auto.commit.interval.ms时提交或是被手动提交。enable.auto.offset.store属性必须设置为fasle。
3 Kafka 消费者客户端开发
3.1 必要的参数配置(bootstrap.servers)
在创建消费者的时候以下以下三个选项是必选的:
bootstrap.servers:指定 broker (kafka服务器)的地址清单,清单里不需要包含所有的 broker(kafka) 地址,生产者会从给定的 broker 里查找 broker 的信息。不过建议至少要提供两个 broker 的信息作为容错。
group.id:consumer group 是 kafka 提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的 ID,即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。
auto.offset.reset:这个参数是针对新的 groupid 中的消费者而言的,当有新 groupid 的消费者来
这三个参数在创建Kafka消费者时是必选的,它们分别为:
-
bootstrap.servers:
- 用途:指定 Kafka 集群中的 broker 地址清单。
- 作用:消费者在启动时需要知道至少一个 broker 的地址,以便获取集群中的元数据信息,如主题分区的分布情况、leader 信息等。消费者将会从提供的 broker 中获取这些信息,然后根据负载均衡策略决定从哪个分区拉取消息。
- 注意:建议至少提供两个 broker 的地址,以增加容错性,防止某个 broker 不可用时消费者无法正常工作。
-
- 用途:指定消费者所属的消费者组的唯一标识。
- 作用:Kafka 提供了分组机制,即将消费者组织成逻辑上的组,组内的每个消费者协调消费不同的分区,以实现水平扩展和容错性。所有消费者实例在同一组内必须共享相同的 group ID,这样它们就可以协同工作,确保每个分区只有一个消费者消费。
- 注意:组内的消费者将协同处理订阅主题的所有分区。
-
auto.offset.reset:
- 用途:指定消费者在发现没有存储偏移量或偏移量无效的情况下该如何处理。
- 作用:对于一个新的消费者组(group.id是新的)来说,如果没有存储的偏移量信息,或者存储的偏移量无效,该参数就决定了消费者从哪里开始消费消息。可能的取值包括:
- earliest:从最早的偏移量开始消费。
- latest:从最新的偏移量开始消费。
- none:如果没有发现消费者组的偏移量,就抛出异常。
- 注意:这个参数主要用于处理新的消费者组,已有偏移量的消费者组不受此参数影响。
std::string errorStr;
RdKafka::Conf::ConfResult errorCode;
m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
m_event_cb = new ConsumerEventCb;
errorCode = m_config->set("event_cb", m_event_cb, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
m_rebalance_cb = new ConsumerRebalanceCb;
errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("enable.partition.eof", "false", errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("group.id", m_groupID, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("max.partition.fetch.bytes", "1024000", errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
?
3.2 订阅主题和分区
订阅主题,可以订阅多个。
也可以通过正则表达式方式一次订阅多个主题,比如 “topic-.*”, 则前缀为“topic-.”的主题都被订阅。
m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
// 获取最新的消息数据
errorCode = m_topicConfig->set("auto.offset.reset", "latest", errorStr);
if (errorCode != RdKafka::Conf::CONF_OK) {
std::cout << "Topic Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("default_topic_conf", m_topicConfig, errorStr);
if (errorCode != RdKafka::Conf::CONF_OK) {
std::cout << "Conf set failed: " << errorStr << std::endl;
}
m_consumer = RdKafka::KafkaConsumer::create(m_config, errorStr);
if (m_consumer == NULL) {
std::cout << "Create KafkaConsumer failed: " << errorStr << std::endl;
}
std::cout << "Created consumer " << m_consumer->name() << std::endl;
// 订阅主题, 可以订阅多个主题
RdKafka::ErrorCode errorCode = m_consumer->subscribe(m_topicVector);
3.3 消息消费
void msg_consume(RdKafka::Message* msg, void* opaque)
{
switch (msg->err())
{
case RdKafka::ERR__TIMED_OUT:
std::cerr << "Consumer error: " << msg->errstr() << std::endl; // 超时
break;
case RdKafka::ERR_NO_ERROR: // 有消息进来
std::cout << " Message in-> topic:" << msg->topic_name() << "partition:["
<< msg->partition() << "] at offset " << msg->offset()
<< " key: " << msg->key() << " payload: "
<< (char*)msg->payload() << std::endl;
break;
default:
std::cerr << "Consumer error: " << msg->errstr() << std::endl;
break;
}
}
void KafkaConsumer::pullMessage()
{
// 订阅Topic
RdKafka::ErrorCode errorCode = m_consumer->subscribe(m_topicVector);
if (errorCode != RdKafka::ERR_NO_ERROR)
{
std::cout << "subscribe failed: " << RdKafka::err2str(errorCode) << std::endl;
}
// 消费消息
while(true)
{
RdKafka::Message *msg = m_consumer->consume(1000);
msg_consume(msg, NULL);
delete msg;
}
}
?
3.4 完整示例代码
KafkaConsumer.h
#ifndef KAFKACONSUMER_H
#define KAFKACONSUMER_H
// 防止头文件多次包含的保护机制
#pragma once
// 另一种头文件包含保护的方式
#include <string>
#include <iostream>
#include <vector>
#include <stdio.h>
#include "librdkafka/rdkafkacpp.h"
// 包含必要的C++标准库和Rdkafka C++库的头文件
class KafkaConsumer
{
public:
// 构造函数
explicit KafkaConsumer(const std::string& brokers, const std::string& groupID,
const std::vector<std::string>& topics, int partition);
// 拉取消息的函数声明
void pullMessage();
// 析构函数声明
~KafkaConsumer();
protected:
// 成员变量
std::string m_brokers; // Kafka集群的broker地址
std::string m_groupID; // 消费者组的唯一标识符
std::vector<std::string> m_topicVector; // 要消费的主题列表
int m_partition; // 消息分区
RdKafka::Conf* m_config; // Kafka配置
RdKafka::Conf* m_topicConfig; // 主题配置
RdKafka::KafkaConsumer* m_consumer; // Kafka消费者
RdKafka::EventCb* m_event_cb; // 事件回调
RdKafka::RebalanceCb* m_rebalance_cb; // 重新平衡回调
};
#endif // KAFKACONSUMER_H
// 结束头文件包含保护
KafkaConsumer.cpp
#include "KafkaConsumer.h"
class ConsumerEventCb : public RdKafka::EventCb
{
public:
void event_cb (RdKafka::Event &event)
{
switch (event.type())
{
case RdKafka::Event::EVENT_ERROR:
if (event.fatal())
{
std::cerr << "FATAL ";
}
std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
break;
case RdKafka::Event::EVENT_STATS:
std::cerr << "\"STATS\": " << event.str() << std::endl;
break;
case RdKafka::Event::EVENT_LOG:
fprintf(stderr, "LOG-%i-%s: %s\n",
event.severity(), event.fac().c_str(), event.str().c_str());
break;
case RdKafka::Event::EVENT_THROTTLE:
std::cerr << "THROTTLED: " << event.throttle_time() << "ms by " <<
event.broker_name() << " id " << (int)event.broker_id() << std::endl;
break;
default:
std::cerr << "EVENT " << event.type() <<
" (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
break;
}
}
};
class ConsumerRebalanceCb : public RdKafka::RebalanceCb
{
private:
static void printTopicPartition (const std::vector<RdKafka::TopicPartition*>&partitions) // 打印当前获取的分区
{
for (unsigned int i = 0 ; i < partitions.size() ; i++)
std::cerr << partitions[i]->topic() <<
"[" << partitions[i]->partition() << "], ";
std::cerr << "\n";
}
public:
void rebalance_cb (RdKafka::KafkaConsumer *consumer,
RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition*> &partitions)
{
std::cerr << "RebalanceCb: " << RdKafka::err2str(err) << ": ";
printTopicPartition(partitions);
if (err == RdKafka::ERR__ASSIGN_PARTITIONS)
{
consumer->assign(partitions);
partition_count = (int)partitions.size();
}
else
{
consumer->unassign();
partition_count = 0;
}
}
private:
int partition_count;
};
KafkaConsumer::KafkaConsumer(const std::string& brokers, const std::string& groupID,
const std::vector<std::string>& topics, int partition)
{
m_brokers = brokers;
m_groupID = groupID;
m_topicVector = topics;
m_partition = partition;
std::string errorStr;
RdKafka::Conf::ConfResult errorCode;
m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
m_event_cb = new ConsumerEventCb;
errorCode = m_config->set("event_cb", m_event_cb, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
m_rebalance_cb = new ConsumerRebalanceCb;
errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("enable.partition.eof", "false", errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("group.id", m_groupID, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("max.partition.fetch.bytes", "1024000", errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
// partition.assignment.strategy range,roundrobin
m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
// 获取最新的消息数据
errorCode = m_topicConfig->set("auto.offset.reset", "latest", errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Topic Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("default_topic_conf", m_topicConfig, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
m_consumer = RdKafka::KafkaConsumer::create(m_config, errorStr);
if(m_consumer == NULL)
{
std::cout << "Create KafkaConsumer failed: " << errorStr << std::endl;
}
std::cout << "Created consumer " << m_consumer->name() << std::endl;
}
void msg_consume(RdKafka::Message* msg, void* opaque)
{
switch (msg->err())
{
case RdKafka::ERR__TIMED_OUT:
std::cerr << "Consumer error: " << msg->errstr() << std::endl; // 超时
break;
case RdKafka::ERR_NO_ERROR: // 有消息进来
std::cout << " Message in-> topic:" << msg->topic_name() << "partition:["
<< msg->partition() << "] at offset " << msg->offset()
<< " key: " << msg->key() << " payload: "
<< (char*)msg->payload() << std::endl;
break;
default:
std::cerr << "Consumer error: " << msg->errstr() << std::endl;
break;
}
}
void KafkaConsumer::pullMessage()
{
// 订阅Topic
RdKafka::ErrorCode errorCode = m_consumer->subscribe(m_topicVector);
if (errorCode != RdKafka::ERR_NO_ERROR)
{
std::cout << "subscribe failed: " << RdKafka::err2str(errorCode) << std::endl;
}
// 消费消息
while(true)
{
RdKafka::Message *msg = m_consumer->consume(1000);
msg_consume(msg, NULL);
delete msg;
}
}
KafkaConsumer::~KafkaConsumer()
{
m_consumer->close();
delete m_config;
delete m_topicConfig;
delete m_consumer;
delete m_event_cb;
delete m_rebalance_cb;
}
class ConsumerEventCb : public RdKafka::EventCb
{
public:
? ? void event_cb (RdKafka::Event &event)
? ? {
? ? ? ? switch (event.type())
? ? ? ? {
? ? ? ? case RdKafka::Event::EVENT_ERROR:
? ? ? ? ? ? if (event.fatal())
? ? ? ? ? ? {
? ? ? ? ? ? ? ? std::cerr << "FATAL ";
? ? ? ? ? ? }
? ? ? ? ? ? std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
? ? ? ? ? ? ? ? ? ? ? event.str() << std::endl;
? ? ? ? ? ? break;? ? ? ? case RdKafka::Event::EVENT_STATS:
? ? ? ? ? ? std::cerr << "\"STATS\": " << event.str() << std::endl;
? ? ? ? ? ? break;? ? ? ? case RdKafka::Event::EVENT_LOG:
? ? ? ? ? ? fprintf(stderr, "LOG-%i-%s: %s\n",
? ? ? ? ? ? ? ? ? ? event.severity(), event.fac().c_str(), event.str().c_str());
? ? ? ? ? ? break;? ? ? ? case RdKafka::Event::EVENT_THROTTLE:
? ? ? ? ? ? std::cerr << "THROTTLED: " << event.throttle_time() << "ms by " <<
? ? ? ? ? ? ? ? ? ? ? event.broker_name() << " id " << (int)event.broker_id() << std::endl;
? ? ? ? ? ? break;? ? ? ? default:
? ? ? ? ? ? std::cerr << "EVENT " << event.type() <<
? ? ? ? ? ? ? ? ? ? ? " (" << RdKafka::err2str(event.err()) << "): " <<
? ? ? ? ? ? ? ? ? ? ? event.str() << std::endl;
? ? ? ? ? ? break;
? ? ? ? }
? ? }
};
这段代码定义了一个名为 ConsumerEventCb
的类,其主要作用是定义了一个回调函数 event_cb
,用于处理不同类型的 Kafka 事件。在这个类中,通过继承 RdKafka::EventCb
,实现了 Kafka 事件的回调处理。
当 Kafka 消费者使用这个事件回调时,会在特定情况下触发这些事件。事件的触发是由 Kafka broker 和 Kafka consumer 之间的交互而引起的。
当 Kafka broker 发送事件通知时(例如错误、统计、日志或节流等),Kafka consumer 将触发适当类型的事件。这时会调用 event_cb
函数,根据事件的类型执行相应的处理逻辑。
流程大致如下:
- Kafka Consumer 设置事件回调函数:?在使用 Kafka Consumer 时,可以将?
ConsumerEventCb
?的实例作为事件回调函数传递给 Kafka Consumer。 - Kafka Consumer 消费消息:?在 Kafka Consumer 消费消息的过程中,如果发生了特定类型的事件(例如错误、统计、日志或节流等),Kafka Consumer 会根据事件的性质调用相应的?
event_cb
?函数。 - 根据事件类型执行相应的处理逻辑:?在?
event_cb
?函数中,通过?switch
?语句根据不同类型的事件执行相应的逻辑处理。对于每种事件类型,都有不同的处理方式,例如输出错误信息、统计信息、日志信息或节流信息等。 - 处理完毕后继续消费:?处理完特定类型的事件后,Kafka Consumer 可能会继续消费消息或者执行其他操作,这取决于应用程序的逻辑。
class ConsumerRebalanceCb : public RdKafka::RebalanceCb
{
private:
? ? static void printTopicPartition (const std::vector<RdKafka::TopicPartition*>&partitions) ? ? ? ?// 打印当前获取的分区
? ? {
? ? ? ? for (unsigned int i = 0 ; i < partitions.size() ; i++)
? ? ? ? ? ? std::cerr << partitions[i]->topic() <<
? ? ? ? ? ? ? ? ? ? ? "[" << partitions[i]->partition() << "], ";
? ? ? ? std::cerr << "\n";
? ? }public:
? ? void rebalance_cb (RdKafka::KafkaConsumer *consumer,
? ? ? ? ? ? ? ? ? ? ? ?RdKafka::ErrorCode err,
? ? ? ? ? ? ? ? ? ? ? ?std::vector<RdKafka::TopicPartition*> &partitions)
? ? {
? ? ? ? std::cerr << "RebalanceCb: " << RdKafka::err2str(err) << ": ";
? ? ? ? printTopicPartition(partitions);
? ? ? ? if (err == RdKafka::ERR__ASSIGN_PARTITIONS)
? ? ? ? {
? ? ? ? ? ? consumer->assign(partitions);
? ? ? ? ? ? partition_count = (int)partitions.size();
? ? ? ? }
? ? ? ? else
? ? ? ? {
? ? ? ? ? ? consumer->unassign();
? ? ? ? ? ? partition_count = 0;
? ? ? ? }
? ? }
private:
? ? int partition_count;
};
?
-
定义静态成员函数
printTopicPartition
:- 该函数用于打印当前获取的分区信息。
- 遍历传入的?
partitions
?向量,输出每个分区的主题(topic)和分区号(partition)。
-
声明
ConsumerRebalanceCb
类:- 该类继承自?
RdKafka::RebalanceCb
,表明它是一个用于处理重新分配分区的回调的类。 - 包含一个私有成员函数?
printTopicPartition
?和一个私有整数成员?partition_count
。
- 该类继承自?
-
实现
rebalance_cb
函数:rebalance_cb
?函数是一个虚函数,用于处理分区重新分配事件。它接收三个参数:consumer
:指向?RdKafka::KafkaConsumer
?类型对象的指针,表示触发回调的消费者。err
:表示重新分配操作的错误码,使用?RdKafka::ErrorCode
?类型。partitions
:一个指向?RdKafka::TopicPartition
?对象的指针的向量,表示重新分配的分区信息。
- 在函数中,首先打印重新分配的动作和错误码。
- 调用前面定义的?
printTopicPartition
?函数打印重新分配的分区信息。 - 根据错误码的不同,执行不同的逻辑:
- 如果错误码是?
ERR__ASSIGN_PARTITIONS
,则表示需要分配分区,调用?consumer->assign(partitions)
?将分配的分区应用到消费者,并更新?partition_count
。 - 否则,调用?
consumer->unassign()
?取消分配的分区,将?partition_count
?设置为 0。
- 如果错误码是?
-
私有成员
partition_count
:- 用于追踪当前分配的分区数量。
KafkaConsumer::KafkaConsumer(const std::string& brokers, const std::string& groupID,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?const std::vector<std::string>& topics, int partition)
{
? ? m_brokers = brokers;
? ? m_groupID = groupID;
? ? m_topicVector = topics;
? ? m_partition = partition;//属性赋值给消费者对象成员属性
? ? std::string errorStr;
? ? RdKafka::Conf::ConfResult errorCode;
? ? m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);//创建全局配置对象? ? m_event_cb = new ConsumerEventCb;
? ? errorCode = m_config->set("event_cb", m_event_cb, errorStr);//将消费者事件回调配置到全局配置对象当中
? ? if(errorCode != RdKafka::Conf::CONF_OK)
? ? {
? ? ? ? std::cout << "Conf set failed: " << errorStr << std::endl;
? ? }? ? m_rebalance_cb = new ConsumerRebalanceCb;//创建消费自平衡回调
? ? errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);//将消费自平衡回调配置到全局配置对象当中??
? ? if(errorCode != RdKafka::Conf::CONF_OK)
? ? {
? ? ? ? std::cout << "Conf set failed: " << errorStr << std::endl;
? ? }? ? errorCode = m_config->set("enable.partition.eof", "false", errorStr);
? ? if(errorCode != RdKafka::Conf::CONF_OK)
? ? {
? ? ? ? std::cout << "Conf set failed: " << errorStr << std::endl;
? ? }? ? errorCode = m_config->set("group.id", m_groupID, errorStr);
? ? if(errorCode != RdKafka::Conf::CONF_OK)
? ? {
? ? ? ? std::cout << "Conf set failed: " << errorStr << std::endl;
? ? }
? ? errorCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
? ? if(errorCode != RdKafka::Conf::CONF_OK)
? ? {
? ? ? ? std::cout << "Conf set failed: " << errorStr << std::endl;
? ? }
? ? errorCode = m_config->set("max.partition.fetch.bytes", "1024000", errorStr);
? ? if(errorCode != RdKafka::Conf::CONF_OK)
? ? {
? ? ? ? std::cout << "Conf set failed: " << errorStr << std::endl;
? ? }? ?//上述都是设置全局配置
??
? ? // partition.assignment.strategy ?range,roundrobin
? ? m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
? ? // 获取最新的消息数据
? ? errorCode = m_topicConfig->set("auto.offset.reset", "latest", errorStr);
? ? if(errorCode != RdKafka::Conf::CONF_OK)
? ? {
? ? ? ? std::cout << "Topic Conf set failed: " << errorStr << std::endl;
? ? }
? ? errorCode = m_config->set("default_topic_conf", m_topicConfig, errorStr);
? ? if(errorCode != RdKafka::Conf::CONF_OK)
? ? {
? ? ? ? std::cout << "Conf set failed: " << errorStr << std::endl;
? ? }
? ? m_consumer = RdKafka::KafkaConsumer::create(m_config, errorStr);? ?
? ? if(m_consumer == NULL)
? ? {
? ? ? ? std::cout << "Create KafkaConsumer failed: " << errorStr << std::endl;
? ? }
? ? std::cout << "Created consumer " << m_consumer->name() << std::endl;
}
?
-
成员变量初始化:
m_brokers
、m_groupID
、m_topicVector
?和?m_partition
?被设置为传入的参数值。errorStr
?用于存储错误信息,errorCode
?用于存储配置设置的结果。
-
创建全局配置对象
m_config
:- 使用?
RdKafka::Conf::create
?创建一个全局配置对象,用于配置 Kafka Consumer。 - 创建事件回调对象?
m_event_cb
?的实例,并将其设置为全局配置的事件回调。
- 使用?
-
设置全局配置参数:
- 使用?
set
?方法设置全局配置参数,例如设置事件回调、禁用分区 EOF(end-of-file)通知、设置消费者组 ID、设置 bootstrap 服务器地址等。 - 如果设置失败,输出错误信息。
- 使用?
-
创建主题配置对象
m_topicConfig
:- 使用?
RdKafka::Conf::create
?创建一个主题配置对象,用于配置特定主题的消费者行为。
- 使用?
-
设置主题配置参数:
- 使用?
set
?方法设置主题配置参数,例如设置自动偏移重置为 "latest",表示消费者将从最新的消息开始消费。 - 如果设置失败,输出错误信息。
- 使用?
-
检查主题配置设置是否成功:
- 使用?
errorCode
?和?errorStr
?来检查之前设置主题配置参数的操作是否成功。如果失败,输出错误信息。
- 使用?
-
设置默认主题配置参数:
- 使用?
set
?方法将之前创建的主题配置对象?m_topicConfig
?设置为默认主题配置参数。这样,消费者将使用这个主题配置对象来消费所有的主题。 - 如果设置失败,输出错误信息。
- 使用?
-
创建 Kafka 消费者对象:
- 使用?
RdKafka::KafkaConsumer::create
?创建 Kafka 消费者对象,并传入之前配置好的全局配置对象?m_config
。 - 如果创建失败,输出错误信息。
- 使用?
-
检查消费者对象创建是否成功:
- 使用条件语句检查消费者对象是否成功创建,如果为?
NULL
,则表示创建失败,输出错误信息。
- 使用条件语句检查消费者对象是否成功创建,如果为?
-
输出消费者对象名称:
- 如果消费者对象成功创建,使用?
m_consumer->name()
?获取消费者的名称,并输出到控制台。
- 如果消费者对象成功创建,使用?
?void KafkaConsumer::pullMessage()
{
? ? // 订阅Topic
? ? RdKafka::ErrorCode errorCode = m_consumer->subscribe(m_topicVector);
? ? if (errorCode != RdKafka::ERR_NO_ERROR)
? ? {
? ? ? ? std::cout << "subscribe failed: " << RdKafka::err2str(errorCode) << std::endl;
? ? }
? ? // 消费消息
? ? while(true)
? ? {
? ? ? ? RdKafka::Message *msg = m_consumer->consume(1000);
? ? ? ? msg_consume(msg, NULL);
? ? ? ? delete msg;
? ? }
}
?
在函数 KafkaConsumer::pullMessage()
中:
订阅 Topic:
RdKafka::ErrorCode errorCode = m_consumer->subscribe(m_topicVector);
这里使用了 librdkafka 提供的 subscribe
方法。m_consumer
是一个 Kafka 消费者对象,m_topicVector
是一个包含要订阅的 Topic 名称的向量(可能包含一个或多个 Topic)。
subscribe
方法用于订阅指定的 Topic,使消费者开始接收这些 Topic 中的消息。如果订阅成功,errorCode
将为 RdKafka::ERR_NO_ERROR
;否则,将根据发生的错误返回相应的错误码。
消费消息:?
while(true)
{
RdKafka::Message *msg = m_consumer->consume(1000);
msg_consume(msg, NULL);
delete msg;
}
-
- 这里使用了一个无限循环来持续消费消息。
m_consumer->consume(1000)
?表示从 Kafka 中拉取消息,等待最多 1000 毫秒(1秒)来获取消息。这个方法会阻塞至多指定的超时时间等待消息到达。一旦有消息到达或超时,它将返回一个?RdKafka::Message
?指针。msg_consume(msg, NULL)
?被调用来处理接收到的消息。这个函数处理了从 Kafka 消费者获取的消息对象,根据消息的内容进行相应的操作。delete msg
?释放了消费者获取的消息对象的内存,确保不会造成内存泄漏。
因此,整个过程是:首先通过 subscribe
方法订阅了指定的 Topic,然后通过 consume
方法持续拉取消息,并通过 msg_consume
函数处理接收到的消息。
CMakeLists.txt
cmake_minimum_required(VERSION 2.8)
project(KafkaConsumer)
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_COMPILER "g++")
set(CMAKE_CXX_FLAGS "-std=c++11 ${CMAKE_CXX_FLAGS}")
set(CMAKE_INCLUDE_CURRENT_DIR ON)
# Kafka头文件路径
include_directories(/usr/local/include/librdkafka)
# Kafka库路径
link_directories(/usr/lib64)
aux_source_directory(. SOURCE)
#add_executable(${PROJECT_NAME} ${SOURCE})
#TARGET_LINK_LIBRARIES(${PROJECT_NAME} rdkafka++)
ADD_EXECUTABLE(${PROJECT_NAME} main.cpp KafkaConsumer.cpp)
TARGET_LINK_LIBRARIES(${PROJECT_NAME} rdkafka++)
编译:
mkdir build
cd build
cmake ..
make
4 位移提交
Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据。
提交位移主要是为了表征 Consumer 的消费进度,这样当 Consumer 发生故障重启之后,就能够从Kafka 中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍。
从用户的角度来说,位移提交分为自动提交和手动提交;从 Consumer 端的角度来说,位移提交分为同步提交和异步提交。
这个还是太官方了,我们就举个例子来介绍一下:
假设有一个图书馆的场景:
-
位移数据就像书籍的页码:
- 当你在阅读一本厚厚的书时,你会记住自己读到的页码,这样下次再打开书时就能从上次停下的地方继续读。
-
提交位移就像你在书上标记页码:
- 当你确定自己已经读完了一页或者几页时,你可能会在书的页边标记一下,表示你已经阅读到这个位置。
-
多个分区就像多本书:
- 如果你同时在阅读多本书,每本书都有自己的页码,你需要分别标记每本书的页码。
-
Consumer 的消费进度就像你的阅读进度:
- 位移提交就是告诉图书馆,你已经读到了每本书的哪一页,这样即使你离开一段时间,再回来时仍能从之前的地方继续阅读。
-
故障重启就像你离开一段时间再回到图书馆:
- 如果你离开了一段时间,但之前标记的页码仍然保存在书上,你可以根据这些页码继续阅读,而不必从头开始。
-
自动提交和手动提交就像自动书签和手动书签:
- 自动提交就像书上自带的自动书签,每隔一段时间系统会自动帮你标记页码。
- 手动提交就像你主动使用书签标记页码,你可以在任何时候选择提交。
-
同步提交和异步提交就像同步和异步的书签标记过程:
- 同步提交就像你标记完一页后,告诉图书馆:“我标记好了,你可以记录了”。
- 异步提交就像你标记完一页后,不急着告诉图书馆,而是稍后再告诉他们。
在 Kafka 的语境下,Consumer 提交位移就是告诉 Kafka,它已经成功消费了某个分区的消息到哪个位置,以便在之后的消费中从正确的位置继续。自动提交是由 Consumer 在后台自动完成,而手动提交是由开发者在代码中显式调用。同步提交会等待提交的确认,而异步提交则允许 Consumer 在提交时继续处理其他任务,不必等待确认。
4.1 自动提交
自动提交默认全部为同步提交。
自动提交相关参数:
4.2 手动提交
手动提交可以自己选择是同步提交(commitSync)还是异步提交(commitAsync )。commitAsync 不能够替代 commitSync。commitAsync 的问题在于,出现问题时它不会自动重试,因为它是异步操作。
手动提交,需要将 commitSync 和 commitAsync 组合使用才能到达最理想的效果。可以利用 commitSync 的自动重试来规避那些瞬时错误,同时不希望程序总处于阻塞状态,影响 TPS。
同时使用 commitSync() 和 commitAsync():
对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保Consumer 关闭前能够保存正确的位移数据。
将两者结合后,既实现了异步无阻塞式的位移管理,也确保了 Consumer 位移的正确性.
-
总结:
- 手动提交位移时,可以选择 commitSync(同步提交)或 commitAsync(异步提交)。
- commitAsync 不能完全替代 commitSync,因为它在出现问题时不会自动重试,而 commitSync 具有自动重试的机制。
- 通过同时使用 commitSync() 和 commitAsync(),可以实现异步无阻塞式的位移管理,同时确保在 Consumer 关闭前保存正确的位移数据。
-
相关函数说明:
-
commitSync():
- 同步提交,会阻塞当前线程,等待提交的确认。
- 具有自动重试机制,可以处理瞬时错误。
- 适合在程序关闭前执行,以确保最终一致性。
-
commitAsync():
- 异步提交,不会阻塞当前线程,允许程序继续执行。
- 不具备自动重试机制,需要开发者手动处理提交失败的情况。
- 适合常规性、阶段性的手动提交,避免阻塞程序,提高吞吐量(TPS)。
-
-
综合使用方式:
- 在正常的位移管理过程中,使用 commitAsync() 进行异步提交,以提高程序的响应性和吞吐量。
- 在 Consumer 即将关闭前,通过调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据,保障最终一致性。
4.3 提交API?
ErrorCode commitSync();
?提交当前分配分区的位移,同步操作,会阻塞直到位移被提交或提交失败。如果注册了RdKafka::OffsetCommitCb回调函数,其会在KafkaConsumer::consume()函数内调用并提交位移。
ErrorCode commitAsync();
异步提交位移
5 消费Rebalance机制
当kafka遇到如下四种情况的时候,kafka会触发Rebalance机制:
1.? 消费组成员发生了变更,比如有新的消费者加入了消费组组或者有消费者宕机。
2.? 消费者无法在指定的时间之内完成消息的消费。
3.? 消费组订阅的Topic发生了变化。
4.? 订阅的Topic的partition发生了变化。
6 其他重要的消费者参数
在 KafkaConsumer 中,除了前面讲的必要的客户端参数,大部分的参数都有合理的默认值,一般我们
也不需要去修改它们。不过了解这些参数可以让我们更好地使用消费者客户端,其中还有一些重要的参
数涉及程序的可用性和性能,如果能够熟练掌握它们,也可以让我们在编写相关的程序时能够更好地进
行性能调优与故障排查。下面挑选一些重要的参数来做细致的讲解
6.1 fetch.min.bytes
该参数用来配置 Consumer 在一次拉取请求(调用 poll() 方法)中能从 Kafka 中拉取的最小数据量,默
认值为1(B)。Kafka 在收到 Consumer 的拉取请求时,如果返回给 Consumer 的数据量小于这个参
数所配置的值,那么它就需要进行等待,直到数据量满足这个参数的配置大小。可以适当调大这个参数
的值以提高一定的吞吐量,不过也会造成额外的延迟(latency),对于延迟敏感的应用可能就不可取
了。
6.2 fetch.max.bytes
该参数与 fetch.min.bytes 参数对应,它用来配置 Consumer 在一次拉取请求中从Kafka中拉取的最大
数据量,默认值为52428800(B),也就是50MB。
如果这个参数设置的值比任何一条写入 Kafka 中的消息要小,那么会不会造成无法消费呢?很多资料对
此参数的解读认为是无法消费的,比如一条消息的大小为10B,而这个参数的值是1(B),既然此参数
设定的值是一次拉取请求中所能拉取的最大数据量,那么显然1B<10B,所以无法拉取。这个观点是错误
的,该参数设定的不是绝对的最大值,如果在第一个非空分区中拉取的第一条消息大于该值,那么该消
息将仍然返回,以确保消费者继续工作。也就是说,上面问题的答案是可以正常消费。
与此相关的,Kafka 中所能接收的最大消息的大小通过服务端参数 message.max.bytes(对应于主题端
参数 max.message.bytes)来设置。
6.3 fetch.max.wait.ms
这个参数也和 fetch.min.bytes 参数有关,如果 Kafka 仅仅参考 fetch.min.bytes 参数的要求,那么有可
能会一直阻塞等待而无法发送响应给 Consumer,显然这是不合理的。fetch.max.wait.ms 参数用于指
定 Kafka 的等待时间,默认值为500(ms)。如果 Kafka 中没有足够多的消息而满足不了
fetch.min.bytes 参数的要求,那么最终会等待500ms。这个参数的设定和 Consumer 与 Kafka 之间的
延迟也有关系,如果业务应用对延迟敏感,那么可以适当调小这个参数。
6.4 max.partition.fetch.bytes
这个参数用来配置从每个分区里返回给 Consumer 的最大数据量,默认值为1048576(B),即1MB。
这个参数与 fetch.max.bytes 参数相似,只不过前者用来限制一次拉取中每个分区的消息大小,而后者
用来限制一次拉取中整体消息的大小。同样,如果这个参数设定的值比消息的大小要小,那么也不会造
成无法消费,Kafka 为了保持消费逻辑的正常运转不会对此做强硬的限制。
6.5 max.poll.records
这个参数用来配置 Consumer 在一次拉取请求中拉取的最大消息数,默认值为500(条)。如果消息的
大小都比较小,则可以适当调大这个参数值来提升一定的消费速度。
如果用户的消息处理逻辑很轻量,默认的 500 条消息通常不能满足实际的消息处理速度 。
6.6 connections.max.idle.ms
这个参数用来指定在多久之后关闭闲置的连接,默认值是540000(ms),即9分钟。
6.7 exclude.internal.topics
Kafka 中有两个内部的主题: consumer_offsets 和 transaction_state。exclude.internal.topics 用来指定 Kafka 中的内部主题是否可以向消费者公开,默认值为 true。如果设置为 true,那么只能使用
subscribe(Collection)的方式而不能使用 subscribe(Pattern)的方式来订阅内部主题,设置为 false 则没
有这个限制。
6.8 receive.buffer.bytes
同生产者。
6.9 send.buffer.bytes
同生产者。
6.10 request.timeout.ms
这个参数用来配置 Consumer 等待请求响应的最长时间,默认值为30000(ms)。
6.11 metadata.max.age.ms
这个参数用来配置元数据的过期时间,默认值为300000(ms),即5分钟。如果元数据在此参数所限定的时间范围内没有进行更新,则会被强制更新,即使没有任何分区变化或有新的 broker 加入。
6.12 reconnect.backoff.ms
这个参数用来配置尝试重新连接指定主机之前的等待时间(也称为退避时间),避免频繁地连接主机,默认值为50(ms)。这种机制适用于消费者向 broker 发送的所有请求。
6.13 retry.backoff.ms
这个参数用来配置尝试重新发送失败的请求到指定的主题分区之前的等待(退避)时间,避免在某些故障情况下频繁地重复发送,默认值为100(ms)。
6.14 isolation.level
这个参数用来配置消费者的事务隔离级别。字符串类型,有效值为“read_uncommitted”和
“read_committed”,表示消费者所消费到的位置,如果设置为“read_committed”,那么消费者就会忽略事务未提交的消息,即只能消费到LSO(LastStableOffset)的位置,默认情况下为
“read_uncommitted”,即可以消费到 HW(High Watermark)处的位置。
6.15 session.timeout.ms
非常重要的参数之一 !
session.timeout.ms 是 consumer group 检测组内成员发送崩溃的时间 。
假设你设置该参数为 5 分钟,那么当某个 group 成员突然崩攒了(比如被 kill -9 或岩机), 管理group 的 Kafka 组件(即消费者组协调者,也称 group coordinator)有可能需要 5 分钟才能感知到这个崩溃。显然我们想要缩短这个时间,让coordinator 能够更快地检测到 consumer 失败 。这个参数还有另外一重含义 :consumer 消息处理逻辑的最大时间。
倘若 consumer 两次 poll 之间的间隔超过了该参数所设置的阑值,那么 coordinator 就会认为这个consumer 己经追不上组内其他成员的消费进度了,因此会将该 consumer 实例“踢出”组,该consumer 负责的分区也会被分配给其他 consumer。在最好的情况下,这会导致不必要的 rebalance,因为 consumer 需要重新加入 group 。更糟的是,对于那些在被踢出 group 后处理的消息, consumer 都无法提交位移一一这就意味着这些消息在rebalance 之后会被重新消费一遍。如果一条消息或一组消息总是需要花费很长的时间处理,那么consumer 甚至无法执行任何消费,除非用户重新调整参数 。鉴于以上的“窘境”, Kafka 社区于 0.10.1.0 版本对该参数的含义进行了拆分 。 在该版本及以后的版本中, session.timeout.ms 参数被明确
为“ coordinator 检测失败的时间” 。因此在实际使用中,用户可以为该参数设置一个比较小的值,让 coordinator 能够更快地检测consumer 崩溃的情况,从而更快地开启 rebalance,避免造成更大的消费滞后( consumer lag ) 。目前该参数的默认值是 10 秒。
6.16 max.poll.interval.ms
Apache官网max.poll.interval.ms上的解释如下,消费者组中的一员在拉取消息时如果超过了设置的最大拉取时间,则会认为消费者消费消息失败,kafka会重新进行重新负载均衡,以便把消息分配给另一个消费组成员。在一个典型的 consumer 使用场景中,用户对于消息的处理可能需要花费很长时间。这个参数就是用于设置消息处理逻辑的最大时间的 。 假设用户的业务场景中消息处理逻辑是把消息、“落地”到远程数据库中,且这个过程平均处理时间是 2 分钟,那么用户仅需要将 max.poll.interval.ms 设置为稍稍大于 2 分钟的值即可,而不必为 session. neout.ms 也设置这么大的值。通过将该参数设置成实际的逻辑处理时间再结合较低的session.timeout.ms 参数值,consumer group既实现了快速的 consumer 崩溃检测,也保证了复杂的事件处理逻辑不会造成不必要的 rebalance 。
6.17 heartbeat.interval.ms
该参数和 request.timeout.ms 、max.poll.interval.ms 参数是最难理解的 consumer 参数 。 前面己经
讨论了后两个参数的含义,这里解析一下heartbeat.interval.ms的含义及用法 。
从表面上看,该参数似乎是心跳的问隔时间,但既然己经有了上面的 session.timeout.ms 用于设置超
时,为何还要引入这个参数呢?
这里的关键在于要搞清楚 consumer group 的其他成员,如何得知要开启新一轮 rebalance;当coordinator 决定开启新一轮 rebalance 时,它会将这个决定以 REBALANCE_IN_PROGRESS 异常的形
式“塞进” consumer 心跳请求的 response 中,这样其他成员拿到 response 后才能知道它需要重新加入group。显然这个过程越快越好,而heartbeat. interval.ms 就是用来做这件事情的 。比较推荐的做法是设置一个比较低的值,让 group 下的其他 consumer 成员能够更快地感知新一轮rebalance. 开启了。注意,该值必须小于 session.timeout.ms !这很容易理解,毕竟如果 consumer 在session.timeout.ms 这段时间内都不发送心跳, coordinator 就会认为它已经 dead,因此也就没有必要让它知晓 coordinator 的决定了。
7 broker参数详解
broker 端参数需要在 Kafka 目录下的 config/server.properties 文件中进行设置。当前对于绝大多数的
broker 端参数而言, Kafka 尚不支持动态修改一一这就是说,如果要新增、修改,抑或是删除某些
broker 参数的话,需要重启对应的 broker 服务器。
7.1 broker.id
Kafka 使用唯一的一 个整数来标识每个 broker ,这就是 broker.id ;该参数默认是-1 。如果不指定,
Kafka 会自动生成一个唯一值;总之,不管用户指定什么都必须保证该值在 Kafka 集群中是唯一的,不
能与其他 broker 冲突 。
在实际使用中,推荐使用从0开始的数字序列,如0、1、2……
?
7.2 log.dirs
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
该参数指定了 Kafka 持久化消息的目录;非常重要的参数!
若不设置该参数, Kafka 默认使用tmp/kafka-logs 作为消息保存的目录(商用一定要自己设置目录)。
把消息保存在 tmp 录下,生产环境中是不可取的。若待保存的消息数量非常多,那么最好确保该文件夹
下有充足的磁盘空间。
该参数可以设置多个目录,以逗号分隔,比如**/home/kafka1,/home/kafka2** 。在实际使用过程中,
指定多个目录的做法通常是被推荐的,因为这样 Kafka 可以把负载均匀地分配到多个目录下。
若用户机器上有N块物理硬盘,那么设置多?个目录(须挂载在不同磁盘上的目录)是一个很好的选择。N
个磁头可以同时执行写操作,极大地提升了吞吐量。
注意 ,这里的“均匀”是根据目录下的分区数进行比较的,而不是根据实际的磁盘空间。
7.3 zookeeper.connect
同样是非常重要的参数。主要是在zookeeper中,kafka的配置数据,有一个公共开始的根节点;该参数没有默认的值,如果不配置,则使用zookeeper的/目录;
例如:zkl :218l,zk2:2181,zk3:2181/kafka_clusterl;/kafka_cluster就是kafka的配置的目录;配置了,
可以起到很好的隔离效果。这样管理 Kafka 集群将变得更加容易。
?
多个 Zookeeper 实例
在 Kafka 集群中,可以配置多个 Zookeeper 实例来提高可用性和容错性。Zookeeper 主要用于协调和存储 Kafka 集群的元数据和配置信息。通过配置多个 Zookeeper 实例,可以防止单点故障,并提供更好的集群健壮性。
上述zkl :218l,zk2:2181,zk3:2181/kafka_clusterl 这个就是配置了三个zookeeper
7.4 listeners
broker 监听器的 csv (comma-separated values)列表,格式是[协议://:主机名:端口],[协议]:[// 主机
名:端口]。
该参数主要用于客户端连接 broker 使用,可以认为是 broker 端开放给 clients的监听端口 如果不指定
主机名,则表示绑定默认网卡:如果0.0.0.0,则表示绑定所有网卡。 Kafka 当前支持的协议类型包括
PLAINTEXT、SSL以及 SASL SSL 等。
-
listeners:
- 作用:
listeners
参数定义了 Kafka Broker 用于接受客户端连接的监听器和端口。 - 配置示例:
listeners=PLAINTEXT://:9092,SSL://:9093
- 解释:此配置示例指示 Kafka Broker 使用两个监听器端口:一个是9092端口,使用PLAINTEXT协议(非加密),另一个是9093端口,使用SSL协议(加密通信)。
- 作用:
假设您有一个 Kafka 集群部署在云环境中,每个 Broker 机器有私有IP和公有IP。您希望集群内的 Broker 使用不同的协议和端口与客户端通信,但客户端需要连接到公有IP。
listeners=PLAINTEXT://:9092,SSL://:9093
:Broker使用两个监听器端口,一个用于PLAINTEXT通信(9092端口),另一个用于SSL加密通信(9093端口)。
7.5 advertised.listeners
跟 listeners 类似,该参数也是用于发布给 clients 的监昕器;不过该参数主要用于 IaaS 环境,比如云上
的机器通常都配有多块网卡(私网网卡和公网网卡)。
对于这种机器,用户可以设置该参数绑定公网 IP 供外部 clients 使用,然后配置上面的 listeners 来绑定
私网 IP供broker间通信使用。
当然不设置该参数也是可以的,只是云上的机器很容易出现 clients 无法获取数据的问题,原因就是
listeners 绑定的是默认网卡,而默认网卡通常都是绑定私网的。
在实际使用场景中,对于配有多块网卡的机器而言,这个参数通常都是需要配置的。
-
advertised.listeners:
- 作用:
advertised.listeners
参数用于向客户端公布 Kafka Broker 的监听器信息。 - 配置示例:
advertised.listeners=PLAINTEXT://public_ip:9092,SSL://public_ip:9093
- 解释:在云环境或多网络接口的场景中,机器可能拥有私有和公有 IP。该参数的配置示例中,公布给客户端的是公有 IP 地址,这样客户端就可以通过公有 IP 正确访问 Kafka Broker。这对于确保在复杂网络环境中客户端与 Broker 的连接十分重要。
- 作用:
假设您有一个 Kafka 集群部署在云环境中,每个 Broker 机器有私有IP和公有IP。您希望集群内的 Broker 使用不同的协议和端口与客户端通信,但客户端需要连接到公有IP。
advertised.listeners=PLAINTEXT://public_ip:9092,SSL://public_ip:9093
:这里的public_ip
是您的机器的公有IP地址。客户端通过这些公开的地址访问 Broker,确保能够连接到正确的端口以进行通信。
7.6 unclean.leader.election.enable
是否开启 unclean leader 选举。Kafka 社区在1.0.0 版本才正式将该参数默认值调整为 false ;
即表明如果发生这种情况, Kafka 不允许从剩下存活的非 ISR 副本中选择一个当 leader。因为如果
true,这样做固然可以让 Kafka 继续提供服务给 clients ,但会造成消息数据的丢失,正式环境中,数据
不丢失是基本的业务需求;
番外:ISR解释
ISR 全称是in-sync replica ,翻译过来就是与 leader replica 保持同步的 replica 集合 ;一个partition 可以配置N个replica ,那么这是否就意味着该 partition可以容忍 N-1 replica 失效而不丢失数据呢?
答案是“否”!
Kafka partition 动态维护 一个replica 集合。该集合中的所有 replica 保存的消息日志都与leader
replica 保持同步状态。只有这个集合中的 replica 才能被选举为 leader ,也只有该集合中所有 replica
都接收到了同一条消息, Kafka 才会将该消息置于“己提交”状态,即认为这条消息发送成功。
Kafka 中只要这个集合中至少存在一个 replica ,那些“己提交”状态的消息就不会丢失;
这句话的两个关键点:
ISR 中至少存在一个“活着的”replica;replica “己提 ”消息 。Kafka 对于没有提交成功的消息不做任何保证,只保证在 ISR 存活的情况下“己提交”的消息不会丢失。正常情况下,partition 的所有 replica (含 leader replica )都应该与 leader replica 保持同步,即所有replica 都在 ISR 中。因为各种各样的原因,一小部分 replica 开始落后于 leader replica的进度 。当滞后 一定程度时, Kafka会将这些 replica “踢”出 ISR;相反地,当这replica 重新追上leader进度时候,kafka会再次把它们加回ISR中。
7.7 delete.topic.enable
是否允许 Kafka 删除 topic 。
默认情况下, Kafka 集群允许用户删除topic 及其数据。这样当用户发起删除 topic 操作时, broker
端会执行 topic 删除逻辑。
在实际生产环境中我们发现允许 Kafka 删除 opic 其实是一个很方便的功能,再加上自Kafka 0.0 新增的
ACL 权限特性,以往对于误操作和恶意操作的担心完全消失了,因此设置该参数为 true 是推荐的做法。
7.8 log.retention. {hours|minutes|ms }
这组参数控制了消息数据的留存时间;默认的留存时间是7天(168小时);即 Kafka 只会保存最近 7天的
数据,井自动删除 7天前的数据。
当前较新版本的Kafka 会根据消息的时间戳信息进行留存与否的判断;老版本消息格式没有时间戳,
Kafka 会根据日志文件的最近修改时间( last modified time )进行判断。
三个参数如果同时设置,优先选取ms的设置,minutes次之,hours最后。
7.9 log.retention.bytes
这个参数定义了空间维度上的留存策略;参数默认值是-1,表示 Kafka 永远不会根据消息日志文件总大
小来删除日志。
对于大小超过该参数的分区日志而言, Kafka 会自动清理该分区的过期日志段文件。
7.10 min.insync.replicas
该参数表示kafka存储的最小副本数,producer发送数据的时候,指定了 broker 端必须成功响应
clients 消息发送的最少副本数;
该参数其实是与 producer 端的 acks 参数配合使用的 ;并且min.insync.replicas 也只有在 acks=-1 时
才有意义;acks=-1 表示 producer端寻求最高等级的持久化保证;
假如 broker 端无法满足该条件,则 clients 的消息发送并不会被视为成功。它与 acks 配合使用可以令
Kafka集群达成最高等级的消息持久化;
举 个例子,假设某个topic 的每个分区的副本数是3 ,那么推荐设置该参数为 2,这样我们就能够容忍
一台broker 宕机而不影响服务;若设置参数为3 ,那么只要任何一台 broker 岩机,整个Kafka 集群将
无法继续提供服务。
7.11 num.network threads (fastdfs 网络数据读取)
一个 broker 在后台用于处理网络请求的线程数,默认是3 。
broker启动时会创建多个线程处理来自其他broker和clients 发送过来的各种请求。会将接收到的请求转
发到后面的处理线程中。在真实的环境中,用户需要不断地监控NetworkProcessorAvgldlePercent JMX
指标; 如果该指标持续低于0.3 ;建议适当增加该参数的值。
7.12 num.io.threads (处理我们读取到网络数据)
这个参数就是控制 broker 端实际处理网络请求的线程数,默认值是8;
Kafka broker 默认创建 8个线程以轮询方式不停地监昕转发过来的网络请求井进行实时处理。 Kafka 同
样也为请求处理提供了一个 JMX 监控指标 RequestHandler A vgldlePercent。如果发现该指标持续低于
0.3 ,则可以考虑适当增加该参数的值。
7.13 message.max. bytes
Kafka broker 能够接收的最大消息大小,默认是 977kB;还不到1MB ,可见是非常小的。
在实际使用场景中,突破 1MB 大小的消息十分常见,因此用户有必要综合考虑 Kafka 集群可能处理的
最大消息尺寸井设置该参数值
7.14 log.segment.bytes (分片的概念)
(默认: 1GB) – kafka数据文件的大小,确保这个数值大于一个消息的长度。一般说来使用默认值即可(一
般一个消息很难大于1G,因为这是一个消息系统,而不是文件系统)。
?
# The maximum size of a log segment file. When this size is reached a new log
segment will be created.
#log.segment.bytes=1073741824
# 配置成5M做测试
log.segment.bytes==5242880
8 总结
RdKafka提供了两种消费者API,低级API的Consumer和高级API的KafkaConsumer。
Kafka Consumer使用流程:
(1)创建Kafka配置实例。
(2)创建Topic配置实例。
(3)设置Kafka配置实例Broker属性。
(4)设置Topic配置实例属性。
(5)注册回调函数。
(6)创建Kafka Consumer客户端实例。
(7)创建Topic实例。
(8)订阅主题。
(9)消费消息。
(10)关闭消费者实例。
(11)销毁释放RdKafka资源。
?
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!