Redis哨兵源码分析

2023-12-28 13:35:30

在Redis server启动过程中,实现了实例化和初始化

1、哨兵实例化过程

? ? ? ? 1-1、checkForSentinelMode(int argc, char **argv)函数

????????采用redis sentinel指令实例化还是redis server下的参数实例化--sentinel。

// 检查服务器是否以 Sentinel 模式启动
server.sentinel_mode = checkForSentinelMode(argc,argv);

/* Returns 1 if there is --sentinel among the arguments or if
 * argv[0] is exactly "redis-sentinel". */
int checkForSentinelMode(int argc, char **argv) {
    int j;

    if (strstr(argv[0],"redis-sentinel") != NULL) return 1;
    for (j = 1; j < argc; j++)
        if (!strcmp(argv[j],"--sentinel")) return 1;
    return 0;
}

2、如果Redis实例以哨兵模式启动,执行初始化

    // 如果服务器以 Sentinel 模式启动。需要执行以下两个初始化操作。
    if (server.sentinel_mode) {
        initSentinelConfig();
        initSentinel();
    }

? ? ? ? 2-1、initSentinelConfig函数

????????????????负责初始化哨兵的配置端口号,供全局使用。

// 这个函数会用 Sentinel 所属的属性覆盖服务器默认的属性
void initSentinelConfig(void) {
    server.port = REDIS_SENTINEL_PORT;
}

? ? ? ? ?2-2、initSentinel函数

????????????????基本就是预分配一些配置需要的空间并且初始化默认值。????????

  • 初始化server哨兵命令列表。
  • 初始化主服务器信息字典。
  • 初始化 TILT 模式的相关选项。
  • 初始化脚本相关选项。
/* Perform the Sentinel mode initialization. */
// 以 Sentinel 模式初始化服务器
void initSentinel(void) {
    int j;

    /* Remove usual Redis commands from the command table, then just add
     * the SENTINEL command. */

    // 清空 Redis 服务器的命令表(该表用于普通模式)
    dictEmpty(server.commands,NULL);
    // 将 SENTINEL 模式所用的命令添加进命令表
    for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
        int retval;
        struct redisCommand *cmd = sentinelcmds+j;

        retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
        redisAssert(retval == DICT_OK);
    }

    /* Initialize various data structures. */
    /* 初始化 Sentinel 的状态 */
    // 初始化纪元
    sentinel.current_epoch = 0;

    // 初始化保存主服务器信息的字典
    sentinel.masters = dictCreate(&instancesDictType,NULL);

    // 初始化 TILT 模式的相关选项
    sentinel.tilt = 0;
    sentinel.tilt_start_time = 0;
    sentinel.previous_time = mstime();

    // 初始化脚本相关选项
    sentinel.running_scripts = 0;
    sentinel.scripts_queue = listCreate();
}

?3、Redis初始化sentinel.conf配置文件过程

loadServerConfig(configfile,options);

loadServerConfigFromString(config);

sentinelHandleConfiguration(argv+1,argc-1);

// Sentinel 配置文件分析器
char *sentinelHandleConfiguration(char **argv, int argc) {
    sentinelRedisInstance *ri;

    // SENTINEL monitor 选项
    if (!strcasecmp(argv[0],"monitor") && argc == 5) {
        /* monitor <name> <host> <port> <quorum> */

        // 读入 quorum 参数
        int quorum = atoi(argv[4]);

        // 检查 quorum 参数必须大于 0
        if (quorum <= 0) return "Quorum must be 1 or greater.";

        // 创建主服务器实例
        if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
                                        atoi(argv[3]),quorum,NULL) == NULL)
        {
            switch(errno) {
            case EBUSY: return "Duplicated master name.";
            case ENOENT: return "Can't resolve master instance hostname.";
            case EINVAL: return "Invalid port number";
            }
        }

    // SENTINEL down-after-milliseconds 选项
    } else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {

        /* down-after-milliseconds <name> <milliseconds> */

        // 查找主服务器
        ri = sentinelGetMasterByName(argv[1]);
        if (!ri) return "No such master with specified name.";

        // 设置选项
        ri->down_after_period = atoi(argv[2]);
        if (ri->down_after_period <= 0)
            return "negative or zero time parameter.";

        sentinelPropagateDownAfterPeriod(ri);

    // SENTINEL failover-timeout 选项
    } else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {

        /* failover-timeout <name> <milliseconds> */

        // 查找主服务器
        ri = sentinelGetMasterByName(argv[1]);
        if (!ri) return "No such master with specified name.";

        // 设置选项
        ri->failover_timeout = atoi(argv[2]);
        if (ri->failover_timeout <= 0)
            return "negative or zero time parameter.";

   // Sentinel parallel-syncs 选项
   } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {

        /* parallel-syncs <name> <milliseconds> */

        // 查找主服务器
        ri = sentinelGetMasterByName(argv[1]);
        if (!ri) return "No such master with specified name.";

        // 设置选项
        ri->parallel_syncs = atoi(argv[2]);

    // SENTINEL notification-script 选项
   } else if (!strcasecmp(argv[0],"notification-script") && argc == 3) {

        /* notification-script <name> <path> */
        
        // 查找主服务器
        ri = sentinelGetMasterByName(argv[1]);
        if (!ri) return "No such master with specified name.";

        // 检查给定路径所指向的文件是否存在,以及是否可执行
        if (access(argv[2],X_OK) == -1)
            return "Notification script seems non existing or non executable.";

        // 设置选项
        ri->notification_script = sdsnew(argv[2]);

    // SENTINEL client-reconfig-script 选项
   } else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) {

        /* client-reconfig-script <name> <path> */

        // 查找主服务器
        ri = sentinelGetMasterByName(argv[1]);
        if (!ri) return "No such master with specified name.";
        // 检查给定路径所指向的文件是否存在,以及是否可执行
        if (access(argv[2],X_OK) == -1)
            return "Client reconfiguration script seems non existing or "
                   "non executable.";

        // 设置选项
        ri->client_reconfig_script = sdsnew(argv[2]);

    // 设置 SENTINEL auth-pass 选项
   } else if (!strcasecmp(argv[0],"auth-pass") && argc == 3) {

        /* auth-pass <name> <password> */

        // 查找主服务器
        ri = sentinelGetMasterByName(argv[1]);
        if (!ri) return "No such master with specified name.";

        // 设置选项
        ri->auth_pass = sdsnew(argv[2]);

    } else if (!strcasecmp(argv[0],"current-epoch") && argc == 2) {
        /* current-epoch <epoch> */
        unsigned long long current_epoch = strtoull(argv[1],NULL,10);
        if (current_epoch > sentinel.current_epoch)
            sentinel.current_epoch = current_epoch;

    // SENTINEL config-epoch 选项
    } else if (!strcasecmp(argv[0],"config-epoch") && argc == 3) {

        /* config-epoch <name> <epoch> */

        ri = sentinelGetMasterByName(argv[1]);
        if (!ri) return "No such master with specified name.";

        ri->config_epoch = strtoull(argv[2],NULL,10);
        /* The following update of current_epoch is not really useful as
         * now the current epoch is persisted on the config file, but
         * we leave this check here for redundancy. */
        if (ri->config_epoch > sentinel.current_epoch)
            sentinel.current_epoch = ri->config_epoch;

    } else if (!strcasecmp(argv[0],"leader-epoch") && argc == 3) {
        /* leader-epoch <name> <epoch> */
        ri = sentinelGetMasterByName(argv[1]);
        if (!ri) return "No such master with specified name.";
        ri->leader_epoch = strtoull(argv[2],NULL,10);

    // SENTINEL known-slave 选项
    } else if (!strcasecmp(argv[0],"known-slave") && argc == 4) {
        sentinelRedisInstance *slave;

        /* known-slave <name> <ip> <port> */

        ri = sentinelGetMasterByName(argv[1]);
        if (!ri) return "No such master with specified name.";
        if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,argv[2],
                    atoi(argv[3]), ri->quorum, ri)) == NULL)
        {
            return "Wrong hostname or port for slave.";
        }

    // SENTINEL known-sentinel 选项
    } else if (!strcasecmp(argv[0],"known-sentinel") &&
               (argc == 4 || argc == 5)) {
        sentinelRedisInstance *si;

        /* known-sentinel <name> <ip> <port> [runid] */

        ri = sentinelGetMasterByName(argv[1]);
        if (!ri) return "No such master with specified name.";
        if ((si = createSentinelRedisInstance(NULL,SRI_SENTINEL,argv[2],
                    atoi(argv[3]), ri->quorum, ri)) == NULL)
        {
            return "Wrong hostname or port for sentinel.";
        }
        if (argc == 5) si->runid = sdsnew(argv[4]);

    } else {
        return "Unrecognized sentinel configuration statement.";
    }
    return NULL;
}

4、启动哨兵实例。

sentinelIsRunning函数-->sentinelGenerateInitialMonitorEvents函数-->sentinelEvent函数-->

pubsubPublishMessage函数

? ? ?4-1、pubsubPublishMessage函数

? ? ? 当前Redis实例发布消息给订阅了该频道的实例(master、salve、sentinel)或者redis客户端。? ?

/* Publish a message 
 *
 * 将 message 发送到所有订阅频道 channel 的客户端,
 * 以及所有订阅了和 channel 频道匹配的模式的客户端。
 */
int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    dictEntry *de;
    listNode *ln;
    listIter li;

    /* Send to clients listening for that channel */
    // 取出包含所有订阅频道 channel 的客户端的链表
    // 并将消息发送给它们
    de = dictFind(server.pubsub_channels,channel);
    if (de) {
        list *list = dictGetVal(de);
        listNode *ln;
        listIter li;

        // 遍历客户端链表,将 message 发送给它们
        listRewind(list,&li);
        while ((ln = listNext(&li)) != NULL) {
            redisClient *c = ln->value;

            // 回复客户端。
            // 示例:
            // 1) "message"
            // 2) "xxx"
            // 3) "hello"
            addReply(c,shared.mbulkhdr[3]);
            // "message" 字符串
            addReply(c,shared.messagebulk);
            // 消息的来源频道
            addReplyBulk(c,channel);
            // 消息内容
            addReplyBulk(c,message);

            // 接收客户端计数
            receivers++;
        }
    }

    /* Send to clients listening to matching channels */
    // 将消息也发送给那些和频道匹配的模式
    if (listLength(server.pubsub_patterns)) {

        // 遍历模式链表
        listRewind(server.pubsub_patterns,&li);
        channel = getDecodedObject(channel);
        while ((ln = listNext(&li)) != NULL) {

            // 取出 pubsubPattern
            pubsubPattern *pat = ln->value;

            // 如果 channel 和 pattern 匹配
            // 就给所有订阅该 pattern 的客户端发送消息
            if (stringmatchlen((char*)pat->pattern->ptr,
                                sdslen(pat->pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) {

                // 回复客户端
                // 示例:
                // 1) "pmessage"
                // 2) "*"
                // 3) "xxx"
                // 4) "hello"
                addReply(pat->client,shared.mbulkhdr[4]);
                addReply(pat->client,shared.pmessagebulk);
                addReplyBulk(pat->client,pat->pattern);
                addReplyBulk(pat->client,channel);
                addReplyBulk(pat->client,message);

                // 对接收消息的客户端进行计数
                receivers++;
            }
        }

        decrRefCount(channel);
    }

    // 返回计数
    return receivers;
}

文章来源:https://blog.csdn.net/lyyCSDNBLOG/article/details/135091785
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。