dcf写入机制

2023-12-15 17:41:33

dcf写入机制

写入

dcf提供如下两个写入接口:

  • dcf_write

    int dcf_write(unsigned int stream_id, const char* buffer, unsigned int length, unsigned long long key, unsigned long long *index);
    

    仅在leader节点调用。

  • dcf_universal_write

    int dcf_universal_write(unsigned int stream_id, const char* buffer, unsigned int length, unsigned long long key, unsigned long long *index);
    

    可以在任意节点调用。

确认

  • dcf_register_after_writer

    用于注册leader节点写入成功回调函数。

    int dcf_register_after_writer(usr_cb_after_writer_t cb_func)
    {
        return rep_register_after_writer(ENTRY_TYPE_LOG, cb_func);
    }
    

    最终将回调函数注册到全局变量

    int rep_register_after_writer(entry_type_t type, usr_cb_after_writer_t cb_func)
    {
        g_cb_after_writer[type] = cb_func;
        return CM_SUCCESS;
    }
    

    调用流程如下:

启动个线程来执行apply
rep_common_init
cm_create_thread
rep_apply_thread_entry
rep_apply_proc
g_cb_after_writer

线程在等待条件变量释放,并执行apply。

// 先查出有多少条流
if (md_get_stream_list(streams, &stream_count) != CM_SUCCESS) {
        LOG_DEBUG_ERR("[REP]md_get_stream_list failed");
        return;
    }
while (!thread->closed) { // 若线程没有关闭则循环等待
        if (!exists_log) { // 首先判断是否存在日志,不存在就休眠等待唤醒
            (void)cm_event_timedwait(&g_apply_cond, CM_SLEEP_500_FIXED);
        }
        LOG_TRACE(g_rep_tracekey, "apply_thread work");
        exists_log = CM_FALSE;
        for (uint32 i = 0; i < stream_count; i++) { // 遍历每一条流
            uint32 stream_id = streams[i];
            bool8 stream_exists_log = CM_FALSE;
            LOG_TIME_BEGIN(rep_apply_proc);
            // 执行apply
            if (rep_apply_proc(stream_id, &stream_exists_log) != CM_SUCCESS) {
                LOG_DEBUG_ERR_EX("[REP]rep_apply_proc failed.");
            }
            LOG_TIME_END(rep_apply_proc);

            exists_log = (exists_log || stream_exists_log);
        }
    }

其中等待的条件变量为,在如下的地方唤醒。

void rep_apply_trigger()
{
  LOG_DEBUG_INF("[REP]rep_apply_trigger");
  LOG_TRACE(g_rep_tracekey, "common:rep_apply_trigger.");
  cm_event_notify(&g_apply_cond);
}

rep_apply_trigger的调用栈如下;

leader
follower
rep_acceptlog_proc
rep_leader_acceptlog_proc
rep_try_commit_log
rep_apply_trigger
rep_accept_thread_entry
rep_follower_acceptlog_proc

经过上面调用流程可以看到,apply线程是由accept线程唤醒的。accept线程与apply线程类似,同样是等待条件变量将线程唤醒。

if (md_get_stream_list(streams, &stream_count) != CM_SUCCESS) {
        LOG_DEBUG_ERR("[REP]md_get_stream_list failed");
        return;
    }

    while (!thread->closed) {
        if (!exists_log) {
            LOG_TRACE(g_rep_tracekey, "accept_thread wait.");
            (void)cm_event_timedwait(&g_accept_cond, CM_SLEEP_500_FIXED);
        }
        LOG_TRACE(g_rep_tracekey, "accept_thread work.");

        exists_log = CM_FALSE;
        for (uint32 i = 0; i < stream_count; i++) {
            uint32 stream_id = streams[i];
            date_t now = g_timer()->now;
            exists_log = (exists_log || g_common_state[stream_id].accept_log);
            if (g_common_state[stream_id].accept_log ||
                now - g_common_state[stream_id].last_accept_time > CM_DEFAULT_HB_INTERVAL*MICROSECS_PER_MILLISEC) {
                LOG_TRACE(g_rep_tracekey, "accept_thread do work.");
                g_common_state[stream_id].accept_log = CM_FALSE;
                g_common_state[stream_id].last_accept_time = now;
                if (rep_acceptlog_proc(stream_id) != CM_SUCCESS) {
                    LOG_DEBUG_ERR("[REP]rep_acceptlog_proc failed.");
                }
            } else {
                LOG_TRACE(g_rep_tracekey, "accept_thread no work.");
            }
        }
    }

accept线程等待的条件变量为g_accept_cond,其唤醒流程如下:

void rep_set_accept_flag(uint32 stream_id)
{
    LOG_DEBUG_INF("rep_set_accept_flag.");
    g_common_state[stream_id].accept_log = CM_TRUE;
    cm_event_notify(&g_accept_cond);
}

rep_set_accept_flag调用堆栈如下:

stg_register_cb
rep_accepted_trigger

可以看已看到accept线程的唤醒是通过注册回调函数来触发的。

    if (stg_register_cb(ENTRY_TYPE_LOG, rep_accepted_trigger) != CM_SUCCESS) {
        LOG_DEBUG_ERR("[REP]rep register stg callback failed");
        return CM_ERROR;
    }
status_t stg_register_cb(entry_type_t type, void *func)
{
    switch (type) {
        case ENTRY_TYPE_CONF:
            g_write_conf_func = (write_conf_func_t)func;
            break;
        case ENTRY_TYPE_LOG:
            g_notify_rep_func = (notify_rep_func_t)func;
            break;
        default:
            LOG_RUN_ERR("[STG]Register callback failed");
            return CM_ERROR;
    }
    return CM_SUCCESS;
}

可以看到回调函数最终被注册到了g_notify_rep_func。

append线程
disk_thread_entry
process_append_action
stream_append_entry_impl
callback_rep_func
g_notify_rep_func
stream_batcher_flush

可以看到其中一个回调函数是由append线程触发的,append线程会等待stream->disk_event条件变量被唤醒。stream->disk_event在stream_append_entry中被唤醒。

rep_appendlog_req_proc
rep_follower_process
rep_follower_appendlog
stg_append_entry
stream_append_entry
rep_write
register_msg_process(MEC_CMD_APPEND_LOG_RPC_REQ, rep_appendlog_req_proc, PRIV_LOW);

MEC_CMD_APPEND_LOG_RPC_REQ在rep_appendlog_node中发送。

rep_appendlog_thread_entry
rep_appendlog_stream
rep_appendlog_node

rep_appendlog_thread_entry有条件变量g_appendlog_cond唤醒。g_appendlog_cond又通过rep_appendlog_trigger唤醒。

rep_write
rep_appendlog_trigger
rep_appendlog_ack_proc
rep_rematch_proc

rep_appendlog_trigger由MEC_CMD_APPEND_LOG_RPC_ACK消息触发。

register_msg_process(MEC_CMD_APPEND_LOG_RPC_ACK, rep_appendlog_ack_proc, PRIV_LOW);

dd

  • dcf_register_consensus_notify

    用于注册follower节点写入数据成功的回调函数。

文章来源:https://blog.csdn.net/github_38294679/article/details/135021499
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。