RocketMQ源码 Broker-ConsumerManager 消费者管理组件源码分析

2023-12-14 23:53:48

前言

ConsumerManager消费者管理组件,主要负责对Consumer消费者连接进行管理,内存中维护了所有消费者相关的消费组-topic订阅关系,并且在网络连接发生变化(注册、下线、关闭、消费者弹性伸缩)时,会同步维护内存信息,且通知到消费组内所有消费者。


源码版本:4.9.3

源码架构图

核心数据结构

先来看一下ConsumerManager的核心数据结构:

Map<consumerGroupName, ConsumerGroupInfo> consumerTable 消费组名称与消费组信息映射关系

// 消费者管理组件
public class ConsumerManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
    // 长连接过期时间 120秒
    private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
    // 核心数据结构,Map<groupName, ConsumerGroupInfo>
    private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
        new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
    private final ConsumerIdsChangeListener consumerIdsChangeListener;
}

深入看下ConsumerGroupInfo的数据结构,比较重要的是:

  • Map<topic, SubscriptionData> topic与topic订阅关系映射元数据
  • Map<长连接, 客户端长连接信息> 网络通信长连接元数据
public class ConsumerGroupInfo {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
    // 消费组名称
    private final String groupName;
    // 订阅关系,Map<topic, SubscriptionData>>
    private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
        new ConcurrentHashMap<String, SubscriptionData>();
    // 客户端长连接映射关系
    private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
        new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
    // 消费者拉取消息的类型 pull or push
    private volatile ConsumeType consumeType;

    // 消费者拉取消息的模式 cluster or broadcast
    private volatile MessageModel messageModel;
    // 消费者从哪里拉取消息,从哪个位点开始拉取消息
    private volatile ConsumeFromWhere consumeFromWhere;
    // 最后更新时间
    private volatile long lastUpdateTimestamp = System.currentTimeMillis();
}

在深入看下SubscriptionData数据结构:包含了所有订阅相关的信息,包括订阅表达式、订阅tag。

public class SubscriptionData implements Comparable<SubscriptionData> {
    // *代表订阅所有消息
    public final static String SUB_ALL = "*";
    // 过滤模式
    private boolean classFilterMode = false;
    // 订阅的topic
    private String topic;
    // 订阅的表达式
    private String subString;
    // 订阅的tag
    private Set<String> tagsSet = new HashSet<String>();
    // 订阅的code
    private Set<Integer> codeSet = new HashSet<Integer>();
    // 订阅版本
    private long subVersion = System.currentTimeMillis();
    // 表达式类型
    private String expressionType = ExpressionType.TAG;

    @JSONField(serialize = false)
    private String filterClassSource;

同时也深入看下 ClientChannelInfo 客户端长连接信息的数据结构:包含了客户端id、版本、网络通信长连接等网络通信元数据信息。

public class ClientChannelInfo {
    // 长连接
    private final Channel channel;
    // 客户端id
    private final String clientId;
    // 客户端语言
    private final LanguageCode language;
    // 客户端版本
    private final int version;
    // 最后更新时间
    private volatile long lastUpdateTimestamp = System.currentTimeMillis();
}

?

核心数据行为

ConsumerManager所有行为方法及注释,如下:大部分都是对上面提到的内存数据结构的维护,同时也会调用ConsumerIdsChangeListener 监听器,通知相关组件变更事件。

// 消费者管理组件
public class ConsumerManager {
  

    public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener) {
        this.consumerIdsChangeListener = consumerIdsChangeListener;
    }

    // 查询指定消费组指定客户端id的网络长连接
    public ClientChannelInfo findChannel(final String group, final String clientId) {
        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
        if (consumerGroupInfo != null) {
            return consumerGroupInfo.findChannel(clientId);
        }
        return null;
    }

    // 查询指定消费组指定主题的订阅信息
    public SubscriptionData findSubscriptionData(final String group, final String topic) {
        ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
        if (consumerGroupInfo != null) {
            return consumerGroupInfo.findSubscriptionData(topic);
        }

        return null;
    }

    // 查询指定消费组信息
    public ConsumerGroupInfo getConsumerGroupInfo(final String group) {
        return this.consumerTable.get(group);
    }

    // 查询指定消费组订阅信息数量
    public int findSubscriptionDataCount(final String group) {
        ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
        if (consumerGroupInfo != null) {
            return consumerGroupInfo.getSubscriptionTable().size();
        }

        return 0;
    }

    // 处理网络长连接关闭事件
    public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
        Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, ConsumerGroupInfo> next = it.next();
            ConsumerGroupInfo info = next.getValue();
            // 处理网络长连接关闭事件,删除内存中的长连接
            boolean removed = info.doChannelCloseEvent(remoteAddr, channel);
            if (removed) {
                if (info.getChannelInfoTable().isEmpty()) {
                    // 如果当前消费组没有任何网络长连接,则删除消费组
                    ConsumerGroupInfo remove = this.consumerTable.remove(next.getKey());
                    if (remove != null) {
                        log.info("unregister consumer ok, no any connection, and remove consumer group, {}",
                            next.getKey());
                        // 通知消费组信息变更
                        this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, next.getKey());
                    }
                }
                // 通知消费组信息变更
                this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, next.getKey(), info.getAllChannel());
            }
        }
    }

    // 注册消费者
    public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
        ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
        final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
        // 获取消费组信息
        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
        if (null == consumerGroupInfo) {
            ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
            ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
            consumerGroupInfo = prev != null ? prev : tmp;
        }
        // 更新客户端长连接信息
        boolean r1 =
            consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
                consumeFromWhere);
        // 更新消费组订阅信息
        boolean r2 = consumerGroupInfo.updateSubscription(subList);

        if (r1 || r2) {
            if (isNotifyConsumerIdsChangedEnable) {
                // 通知其他消费者,消费者ids变更
                this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
            }
        }
        // 通知消费组信息变更,更新ConsumerFilterManager订阅信息
        this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);

        return r1 || r2;
    }

    // 取消注册,逻辑类似
    public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo,
        boolean isNotifyConsumerIdsChangedEnable) {
        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
        if (null != consumerGroupInfo) {
            consumerGroupInfo.unregisterChannel(clientChannelInfo);
            if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
                ConsumerGroupInfo remove = this.consumerTable.remove(group);
                if (remove != null) {
                    log.info("unregister consumer ok, no any connection, and remove consumer group, {}", group);

                    this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group);
                }
            }
            if (isNotifyConsumerIdsChangedEnable) {
                this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
            }
        }
    }

    // 扫描消费组中过期的网络长连接
    public void scanNotActiveChannel() {
        Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, ConsumerGroupInfo> next = it.next();
            String group = next.getKey();
            ConsumerGroupInfo consumerGroupInfo = next.getValue();
            ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
                consumerGroupInfo.getChannelInfoTable();

            Iterator<Entry<Channel, ClientChannelInfo>> itChannel = channelInfoTable.entrySet().iterator();
            while (itChannel.hasNext()) {
                Entry<Channel, ClientChannelInfo> nextChannel = itChannel.next();
                ClientChannelInfo clientChannelInfo = nextChannel.getValue();
                // 遍历所有消费组长连接信息,判断是否过期
                long diff = System.currentTimeMillis() - clientChannelInfo.getLastUpdateTimestamp();
                if (diff > CHANNEL_EXPIRED_TIMEOUT) {
                    log.warn(
                        "SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}",
                        RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group);
                    // 如果长连接过期,就关闭长连接,然后删除内存中的引用
                    RemotingUtil.closeChannel(clientChannelInfo.getChannel());
                    itChannel.remove();
                }
            }
            // 如果消费组长连接为空,则删除消费组
            if (channelInfoTable.isEmpty()) {
                log.warn(
                    "SCAN: remove expired channel from ConsumerManager consumerTable, all clear, consumerGroup={}",
                    group);
                it.remove();
            }
        }
    }

    public HashSet<String> queryTopicConsumeByWho(final String topic) {
        HashSet<String> groups = new HashSet<>();
        Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, ConsumerGroupInfo> entry = it.next();
            ConcurrentMap<String, SubscriptionData> subscriptionTable =
                entry.getValue().getSubscriptionTable();
            if (subscriptionTable.containsKey(topic)) {
                groups.add(entry.getKey());
            }
        }
        return groups;
    }
}

文章来源:https://blog.csdn.net/hzwangmr/article/details/134959651
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。