RocketMQ源码 Broker-ConsumerManager 消费者管理组件源码分析
2023-12-14 23:53:48
前言
ConsumerManager消费者管理组件,主要负责对Consumer消费者连接进行管理,内存中维护了所有消费者相关的消费组-topic订阅关系,并且在网络连接发生变化(注册、下线、关闭、消费者弹性伸缩)时,会同步维护内存信息,且通知到消费组内所有消费者。
源码版本:4.9.3
源码架构图
核心数据结构
先来看一下ConsumerManager的核心数据结构:
Map<consumerGroupName, ConsumerGroupInfo> consumerTable 消费组名称与消费组信息映射关系
// 消费者管理组件
public class ConsumerManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
// 长连接过期时间 120秒
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
// 核心数据结构,Map<groupName, ConsumerGroupInfo>
private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
private final ConsumerIdsChangeListener consumerIdsChangeListener;
}
深入看下ConsumerGroupInfo的数据结构,比较重要的是:
- Map<topic, SubscriptionData> topic与topic订阅关系映射元数据
- Map<长连接, 客户端长连接信息> 网络通信长连接元数据
public class ConsumerGroupInfo {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
// 消费组名称
private final String groupName;
// 订阅关系,Map<topic, SubscriptionData>>
private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
new ConcurrentHashMap<String, SubscriptionData>();
// 客户端长连接映射关系
private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
// 消费者拉取消息的类型 pull or push
private volatile ConsumeType consumeType;
// 消费者拉取消息的模式 cluster or broadcast
private volatile MessageModel messageModel;
// 消费者从哪里拉取消息,从哪个位点开始拉取消息
private volatile ConsumeFromWhere consumeFromWhere;
// 最后更新时间
private volatile long lastUpdateTimestamp = System.currentTimeMillis();
}
在深入看下SubscriptionData数据结构:包含了所有订阅相关的信息,包括订阅表达式、订阅tag。
public class SubscriptionData implements Comparable<SubscriptionData> {
// *代表订阅所有消息
public final static String SUB_ALL = "*";
// 过滤模式
private boolean classFilterMode = false;
// 订阅的topic
private String topic;
// 订阅的表达式
private String subString;
// 订阅的tag
private Set<String> tagsSet = new HashSet<String>();
// 订阅的code
private Set<Integer> codeSet = new HashSet<Integer>();
// 订阅版本
private long subVersion = System.currentTimeMillis();
// 表达式类型
private String expressionType = ExpressionType.TAG;
@JSONField(serialize = false)
private String filterClassSource;
同时也深入看下 ClientChannelInfo 客户端长连接信息的数据结构:包含了客户端id、版本、网络通信长连接等网络通信元数据信息。
public class ClientChannelInfo {
// 长连接
private final Channel channel;
// 客户端id
private final String clientId;
// 客户端语言
private final LanguageCode language;
// 客户端版本
private final int version;
// 最后更新时间
private volatile long lastUpdateTimestamp = System.currentTimeMillis();
}
?
核心数据行为
ConsumerManager所有行为方法及注释,如下:大部分都是对上面提到的内存数据结构的维护,同时也会调用ConsumerIdsChangeListener 监听器,通知相关组件变更事件。
// 消费者管理组件
public class ConsumerManager {
public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener) {
this.consumerIdsChangeListener = consumerIdsChangeListener;
}
// 查询指定消费组指定客户端id的网络长连接
public ClientChannelInfo findChannel(final String group, final String clientId) {
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (consumerGroupInfo != null) {
return consumerGroupInfo.findChannel(clientId);
}
return null;
}
// 查询指定消费组指定主题的订阅信息
public SubscriptionData findSubscriptionData(final String group, final String topic) {
ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
if (consumerGroupInfo != null) {
return consumerGroupInfo.findSubscriptionData(topic);
}
return null;
}
// 查询指定消费组信息
public ConsumerGroupInfo getConsumerGroupInfo(final String group) {
return this.consumerTable.get(group);
}
// 查询指定消费组订阅信息数量
public int findSubscriptionDataCount(final String group) {
ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
if (consumerGroupInfo != null) {
return consumerGroupInfo.getSubscriptionTable().size();
}
return 0;
}
// 处理网络长连接关闭事件
public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, ConsumerGroupInfo> next = it.next();
ConsumerGroupInfo info = next.getValue();
// 处理网络长连接关闭事件,删除内存中的长连接
boolean removed = info.doChannelCloseEvent(remoteAddr, channel);
if (removed) {
if (info.getChannelInfoTable().isEmpty()) {
// 如果当前消费组没有任何网络长连接,则删除消费组
ConsumerGroupInfo remove = this.consumerTable.remove(next.getKey());
if (remove != null) {
log.info("unregister consumer ok, no any connection, and remove consumer group, {}",
next.getKey());
// 通知消费组信息变更
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, next.getKey());
}
}
// 通知消费组信息变更
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, next.getKey(), info.getAllChannel());
}
}
}
// 注册消费者
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
// 获取消费组信息
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
// 更新客户端长连接信息
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
// 更新消费组订阅信息
boolean r2 = consumerGroupInfo.updateSubscription(subList);
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
// 通知其他消费者,消费者ids变更
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
// 通知消费组信息变更,更新ConsumerFilterManager订阅信息
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
}
// 取消注册,逻辑类似
public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo,
boolean isNotifyConsumerIdsChangedEnable) {
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null != consumerGroupInfo) {
consumerGroupInfo.unregisterChannel(clientChannelInfo);
if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
ConsumerGroupInfo remove = this.consumerTable.remove(group);
if (remove != null) {
log.info("unregister consumer ok, no any connection, and remove consumer group, {}", group);
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group);
}
}
if (isNotifyConsumerIdsChangedEnable) {
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
}
// 扫描消费组中过期的网络长连接
public void scanNotActiveChannel() {
Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, ConsumerGroupInfo> next = it.next();
String group = next.getKey();
ConsumerGroupInfo consumerGroupInfo = next.getValue();
ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
consumerGroupInfo.getChannelInfoTable();
Iterator<Entry<Channel, ClientChannelInfo>> itChannel = channelInfoTable.entrySet().iterator();
while (itChannel.hasNext()) {
Entry<Channel, ClientChannelInfo> nextChannel = itChannel.next();
ClientChannelInfo clientChannelInfo = nextChannel.getValue();
// 遍历所有消费组长连接信息,判断是否过期
long diff = System.currentTimeMillis() - clientChannelInfo.getLastUpdateTimestamp();
if (diff > CHANNEL_EXPIRED_TIMEOUT) {
log.warn(
"SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}",
RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group);
// 如果长连接过期,就关闭长连接,然后删除内存中的引用
RemotingUtil.closeChannel(clientChannelInfo.getChannel());
itChannel.remove();
}
}
// 如果消费组长连接为空,则删除消费组
if (channelInfoTable.isEmpty()) {
log.warn(
"SCAN: remove expired channel from ConsumerManager consumerTable, all clear, consumerGroup={}",
group);
it.remove();
}
}
}
public HashSet<String> queryTopicConsumeByWho(final String topic) {
HashSet<String> groups = new HashSet<>();
Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, ConsumerGroupInfo> entry = it.next();
ConcurrentMap<String, SubscriptionData> subscriptionTable =
entry.getValue().getSubscriptionTable();
if (subscriptionTable.containsKey(topic)) {
groups.add(entry.getKey());
}
}
return groups;
}
}
文章来源:https://blog.csdn.net/hzwangmr/article/details/134959651
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!