【Skynet 入门实战练习】事件模块 | 批处理模块 | GM 指令 | 模糊搜索

2023-12-22 17:16:19

前言

本节完善了项目,实现了事件、批处理、模糊搜索模块、GM 指令模块。

事件模块

什么是事件模块?事件模块是用来在各系统之间传递事件消息的。

为什么需要事件模块?主要目的是用来减少模块之间的耦合。

事件模块使用场景:

  • 常见的有网游中的任务系统,监听角色的升级事件,分派相应的任务
  • 监听登录登出事件,做相应的资源分配与销毁
  • 监听玩家的属性变化,更新其他模块缓存玩家的数据

事件模块实现

实现三个基本接口:

  • 加入监听列表:add_listener(event_type, func)
  • 从监听列表删除:del_listener(id)
  • 触发一个事件:fire_event(event_type, ...)

lualib/event.lua

local _M = {}
local handler_inc_id = 1
local dispatchs = {}  -- event type: { id: func }
local handlers = {}   -- id: event type

function _M.add_listener(event_type, func)
    local cbs = dispatchs[event_type]
    if not cbs then 
        cbs = {} 
        dispatchs[event_type] = cbs
    end 

    handler_inc_id = handler_inc_id + 1
    local id = handler_inc_id
    cbs[id] = func 
    handlers[id] = event_type

    return id 
end 

function _M.del_listener(id)
    local event_type = handlers[id]
    if not event_type then return end 

    handlers[id] = nil 
    local cbs = dispatchs[event_type] 
    if not cbs then return end 
    cbs[id] = nil 
end 

function _M.fire_event(event_type, ...)
    local cbs = dispatchs[event_type]
    if not cbs or not next(cbs) then return end 

    local res = true
    for id, func in pairs(cbs) do 
        local ok, err = xpcall(func, debug.traceback, ...)
        if not ok then 
            logger.error("[event]", "fire event error", "event type:", event_type, "handle id:", id, "err:", err)
            res = false 
        end 
    end 
    return res 
end 

return _M 
  • handler_inc_id:处理函数的对应自增 ID
  • dispatchs:记录事件类型对应的回调函数列表
  • handlers:记录处理函数属于哪个事件

add_listenerdel_listener 两个函数就是维护上述两个表。fire_event 即对要触发的事件类型的所有回调函数进行执行,采用 xpcall 保证每个触发逻辑互不影响,某个处理报错,其余逻辑仍会正常执行。


测试事件模块

测试玩家升级接口触发的升级事件

添加一个事件常量表,lualib/event_type.lua

local _M = {}
_M.EVENT_TYPE_UPLEVEL = "UPLEVEL" -- 玩家升级事件
return _M 

添加一个经验值常量表,data/lvexp.lua

return {
    [1] = {
        ["exp"] = 0
    },
    [2] = {
        ["exp"] = 300
    },
    [3] = {
        ["exp"] = 700
    },
    [4] = {
        ["exp"] = 1700
    },
    [5] = {
        ["exp"] = 4450
    },
    [6] = {
        ["exp"] = 10950
    },
    [7] = {
        ["exp"] = 28000
    },
    [8] = {
        ["exp"] = 58000
    },
    [9] = {
        ["exp"] = 83000
    },
    [10] = {
        ["exp"] = 104000
    },
    [11] = {
        ["exp"] = 153500
    },
    [12] = {
        ["exp"] = 222500
    },
    [13] = {
        ["exp"] = 357500
    }
}

类似的数值表一般由策划给定,通过导表工具,将 excel 表导出为可供使用的 lua 表结构,如上述。

添加经验值增加接口实现,监听事件模块实现,并通过 GM 指令进行升级测试

module/cached/user.lua

local function get_next_lv(lv)
    local next_lv = lv + 1
    local cfg = data_lvexp[next_lv]
    if not cfg then 
        return false
    end 

    return true, next_lv, cfg.exp
end 

function CMD.add_exp(uid, cache, exp)
    _M.add_exp(uid, cache, exp)
    return cache.lv, cache.exp
end 

function _M.add_exp(uid, cache, exp)
    cache.exp = cache.exp + exp 

    local lvchanged = false 
    while true do 
        local lv = cache.lv 
        local cur_exp = cache.exp
        local ok, next_lv, next_exp = get_next_lv(lv)

        if ok and cur_exp >= next_exp then 
            cur_exp = cur_exp - next_exp
            cache.exp = cur_exp
            cache.lv = next_lv
            lvchanged = true 
        else
            break  
        end 
    end 

    if lvchanged then 
        event.fire_event(event_type.EVENT_TYPE_UPLEVEL, uid, cache.lv)
    end 
end 

在缓存模块下相应的用户模块,添加增加经验值的接口,CMD.add_exp 供其他服务调用。get_next_lv 获取升到下一级所需经验值,_M.add_exp 为实际经验值增加逻辑实现,判断如果升级则需要对监听升级事件的相关逻辑进行触发。

完整代码:module/cached/user.lua

到此我们还需要添加一个监听升级事件的模块,物品模块,用来验证升级后能否正确触发逻辑。

module/cached/item.lua

local function init_cb(uid, cache)
    if not cache.items then 
        cache.items = {}
    end 
end

local function on_uplevel(uid, lv)
    logger.debug("item", "on_uplevel", "uid:", uid, "lv:", lv)
end 

function _M.init()
    mng.register_cmd("user", "item", CMD)
    mng.register_init_cb("user", "item", init_cb)
    event.add_listener(event_type.EVENT_TYPE_UPLEVEL, on_uplevel)
end
  • on_uplevel 即为监听到升级事件后需要触发的模块,简单打印用户和等级

通过 event.add_listener 主动将这个模块加到事件模块的升级事件中,对用该事件触发的回调函数即为 on_uplevel

不要忘了模块的初始化,service/cached.lua

local item = require "cached.item"

skynet.start(function()
    item.init()
end)

通过 GM 指令(后续讨论),增加经验值后,可以看到能正常执行 on_uplevel
在这里插入图片描述

上述就是事件模块的简单使用,设计一个监听某事件的模块,事件类型写在常量表 event_type 中,然后在初始化 init 时,主动添加 add_listener 进事件模块中。

在其余服务中,如果产生了相应的事件,则主动调用 fire_event,触发事件即可。

实现完了事件模块,可以发现事件模块的使用有什么好处?

如果没有事件模块的解耦,那么每个监听玩家等级变化的模块,都需要在等级模块插入一行代码,不利于维护,模块之间的直接调用代码非常丑且容易漏调用。


批处理模块

什么是批处理模块?批处理模块用于自动化批量执行任务的。

为什么需要批处理模块?分批次处理任务,避免某个任务长时间占用资源。

批处理模块的应用场景:

  • 系统维护,批量踢出用户
  • 系统广播,批量给在线玩家广播数据
  • 排行榜结算,批量给玩家发放奖励

批处理模块实现

实现两个基本接口:

  • new_batch_task(tid, interval, step, list, cb, ...),创建一个批处理任务
  • remove_batch_task(tid),删除一个批处理任务

lualib/batch.lua

local all_batch_tasks = {} -- taskid: taskinfo
local all_batch_tasks_cnt = 0 -- 待处理任务数

-- 创建新任务
local function new_empty_batch_task(tid)
    local info = {}
    all_batch_tasks[tid] = info 
    all_batch_tasks_cnt = all_batch_tasks_cnt + 1
    return info 
end 

function _M.new_batch_task(tid, interval, step, list, cb, ...)
    local info = new_empty_batch_task(tid) 
    info.timer = timer.timeout(interval, batch_task_heartbeat, tid) 
    info.deal_idx = 0 -- 已处理数量
    info.list = list 
    info.interval = interval
    info.step = step 
    info.func = cb 
    info.args = pack(...)
    return true 
end 

-- 删除任务
function _M.remove_batch_task(tid)
    if all_batch_tasks[tid] and all_batch_tasks[tid].timer then 
        timer.cancel(all_batch_tasks[tid].timer)
    end 
    all_batch_tasks[tid] = nil 
    all_batch_tasks_cnt = all_batch_tasks_cnt - 1
end 
  • all_batch_tasks_cnt:待处理的批处理任务数量

  • all_batch_tasks:存放所有待处理任务,一个任务 ID 对应一个批处理任务,批处理任务包含下面几个字段

    • timer:定时器 ID,定时器用于定时分批处理任务,并且方便随时中断批处理
    • deal_idx:表示已经处理到第几个逻辑
    • list:需要处理逻辑的数组,每次执行处理函数时,它每个值都作为第一个参数传入
    • interval:每次处理事件间隔,单位为秒
    • step:每次处理步长,即一次消化多少个
    • func:处理逻辑函数
    • args:处理逻辑函数的其他参数

创建和删除批处理任务如上述逻辑,维护 all_batch_tasks 表,初始化创建相应的批处理任务字段。

定时处理的函数逻辑,batch_task_heartbeat 任务心跳循环:

local function batch_task_heartbeat(tid)
    local info = all_batch_tasks[tid]

    local list_cnt = #info.list 
    local start_idx = info.deal_idx + 1
    if info.deal_idx > list_cnt then 
        _M.remove_batch_task(tid)
        return 
    end 

    local end_idx = start_idx + info.step - 1
    if end_idx > list_cnt then 
        end_idx = list_cnt
        _M.remove_batch_task(tid)
    else 
        -- 这批次还没处理完,开启定时器等下次再处理
        info.deal_idx = end_idx
        info.timer = timer.timeout(info.interval, batch_task_heartbeat, tid)
    end 

    -- 处理本批次
    for i = start_idx, end_idx do 
        local ok, err = xpcall(info.func, traceback, info.list[i], unpack(info.args, 1, info.args.n))
    end 
end 

通过 deal_idxstep 两个字段,计算出当前批次要处理的逻辑的起始 start_idx 和结尾 end_idx

如果本批次能处理完该任务,则删除,否则继续创建该任务的下一个定时器。

最后执行当前批次所有逻辑,通过 xpcall 保护环境进行调用。

完整代码:lualib/batch.lua

批处理实现广播消息,通过 GM 指令测试

ws_gate.lua

-- 发送消息接口
local function send_msg(fd, msg)
    if connection[fd] then 
        websocket.write(fd, msg)
    end 
end 

-- 广播消息接口
function CMD.broadcast(source, msg)
    local fds = utils_table.klist(connection)
    -- 调用批处理接口
    local ok, err = batch.new_batch_task({"broadcast", source, msg}, 1, 100, fds, send_msg, msg)
end 

在网关服务中,实现广播接口 broadcast,消息通过批处理模块调用 send_msg 处理函数,回发给每个客户端。

utils/table.lua 模块实现了 klist 接口,将 lua 表以 key 值存放为一个 array 类型的 table 结构:

function _M.klist(t)
    local klist = {}
    local idx = 1
    for k, _ in pairs(t) do 
        klist[idx] = k
        idx = idx + 1
    end 
    return klist
end 

还可以发现,我们创建批处理任务,传入的 tid 是一个表结构。这样每次广播消息时,都是用的新任务 ID。

但有些逻辑需要防止重入,比如定时批量保存玩家数据,上一次保存逻辑没有处理完毕时,下一次批处理需要忽略,直接延迟到下下一次即可。这时候只需要传入一个字符串作为任务 ID 即可。

之前实现的缓存模块中,定时保存玩家数据可以修改为批处理任务执行,这里就不做演示了。

我们修改一下参数 step = 1 参数,这里开启三个客户端,并且一个客户端广播消息:
在这里插入图片描述
从上图可以看出,每秒执行一次消息广播,三个客户端先后收到消息。


GM 指令模块

前面两个模块测试时都使用了 GM 指令,当然通过之前的通信的方法也可以实现,module/cached/user 模块和 test/cmds/ws 模块下写指令接口即可。

那为什么还需要 GM 指令呢?

GM(Game Manager) 指令在维基百科上是这样解释的:

在游戏正式发布之前,游戏公司通常会组织专人对游戏内容进行全面测试,而为了方便测试,游戏程序员在开发时就将大量专供测试和操作游戏内容用的专用命令写入。这些开放给 GM 使用的命令就是 GM 指令。

指令一般会涵盖游戏的全部功能,这些指令包括对服务器操作类(服务器重启,刷新,关闭等)、操作角色类(修改角色属性,角色位置,角色状态等)、广播类(发送全服消息,发布游戏活动消息),亦有方便 GM 活动的 GM 隐身,无敌等指令。例如:魔兽世界新的资料片巫妖王之怒开放的 GM 指令就包括直接到达 80 级等

由于 GM 指令功能多样,一些私服为了吸引玩家,也有将 GM 指令开放给普通玩家的。

GM 指令在游戏发开中有两个用途:

  • 游戏开发期间用指令制造测试环境
  • 游戏上线期间用指令修复玩家数据

GM 指令模块实现

指定客户端输入格式:gm user setname cauchy(指令名称、模块、函数、参数)。

上述表示 gm 指令,在用户 user 模块下,执行 setname 函数,参数是 cauchy

客户端实现 gm 模块,test/cmds/gm.lua

local cjson = require "cjson"
local websocket = require "http.websocket"

local _M = {}

------------  CMD -----------------

-- 执行指令
function _M.run_command(ws_id, ...)
    local cmd = table.concat({...}, " ")
    local req = {
        pid = "c2s_gm_run_cmd",
        cmd = cmd,
    }
    websocket.write(ws_id, cjson.encode(req))
end 

return _M 

指令上行协议统一为 c2s_gm_run_cmd,在服务端做逻辑分解。

服务器端 gm 模块:

| gm
-- |- main.lua
-- |- user.lua

ws_agent/gm/main.lua

local _M = {}
local RPC = {}
local gm_cmds = {} -- 指令模块

-- 执行对应模块下的 CMD 中对应的指令 cmd
function _M.do_cmd(CMD, uid, cmd, ...)
    local cb = CMD[cmd]

    local func = cb.func 
    local args_format = cb.args 
    local ok, n, args = parse_cmd_args(uid, args_format, ...)

    return func(table.unpack(args, 1, n))
end 

-- req.cmd: "user" "setname" "cauchy" 
-- GM 指令:   模块、指令、参数
function RPC.c2s_gm_run_cmd(req, fd, uid)
    local iter = string.gmatch(trim(req.cmd), "[^ ,]+")
    local mod = iter() -- 获取第一个参数:cmd
    local args = {}
    for v in iter do 
        table.insert(args, v)
    end 

    local ok = false
    local msg 
    -- 获取对应模块
    local m = gm_cmds[mod]
    if m then 
        ok, msg = _M.do_cmd(m.CMD, uid, table.unpack(args))
    else 
        msg = "invalid cmd!"
    end 

    local res = {
        pid = "c2s_gm_run_cmd",
        ok = ok, 
        msg = msg,
    }
    return res
end 

function _M.init() 
    gm_cmds.user = require "ws_agent.gm.user"
end 

_M.RPC = RPC

return _M 

gm/main.lua 进行封装接口,接受上行的消息,c2s_gm_run_cmd 即对 user setname cauchy 这种格式的消息进行参数提取,然后执行对应 user 模块的 setname 函数。


gm/user.lua 的实现:

local _M = {}

local function set_name(uid, name)
    local ret = mng.set_username(uid, name)
    return true, "set name success"
end 

_M.CMD = {
    setname = {
        func = set_name,
        args = { "uid", "string" },
    },
}

return _M 

到此 gm 指令执行流程就一目了然了。

客户端输入的消息指令存在 cmd 中,通过协议 c2s_gm_run_cmd 上行到 gm/main.luaRPC.c2s_gm_run_cmd 中处理,分解指令后,通过其他模块注册的 CMD 函数表,找到对应模块 do_cmd 执行的相应方法,并且该方法需要的参数 args 以字符串列表的形式指定,然后自定义解析参数 parse_cmd_args,传入函数并执行。

不要忘记将 gm 指令模块的 RPC 注册到 ws_agent/mng.lua 中,这样上行的消息才能正确找到并执行。

ws_agent/mng.lua

local gm = require "ws_agent.gm.main"

function _M.register_rpc(rpc)
    for k, v in pairs(rpc) do 
        RPC[k] = v
    end 
end
 
function _M.init(gate, watchdog)
    gm.init()
    _M.register_rpc(gm.RPC)
end 

完整代码:ws_agent/gm/main.luaws_agent/gm/user.lua


模糊搜索

模糊搜索模块使用场景:

  • 加好友时搜索玩家昵称
  • 购买物品时搜索物品名称
  • 加帮会时搜索帮会名称

模糊搜索模块实现

实现一个缓存,在玩家搜索时,如果缓存命中直接返回,否则调用数据库接口,实现模糊匹配,并记入缓存。

以搜索玩家昵称为例,这里先在数据库模块中,提供接口 find_by_name,使用 mongodb 自带的模糊匹配,并且忽略大小写返回匹配结果。

ws_agent/db.lua

-- 根据 name 名字查找,忽略大小写
function _M.find_by_name(name, limit) 
    -- 查询语法
    local query = {
        name = {
            ['$regex'] = name,
            ["$options"] = 'i',
        }
    }
    -- 映射集
    local proj = {
        ["_id"] = 0,
        ["uid"] = 1,
        ["name"] = 1,
    }

    local ret = mongo_col:find(query, proj):limit(limit)
    local ret_list = {}
    while ret:hasNext() do 
        local data = ret:next()
        table.insert(ret_list, {
            uid = data.uid,
            name = data.name,
        })
    end 
    return ret_list
end 

模糊搜索模块 ws_agent/search.lua

local skynet = require "skynet"
local lru = require "lru"
local db = require "ws_agent.db"

local _M = {}
local lru_cache_data 

local limit = tonumber(skynet.getenv("search_limit")) or 10
local expire = tonumber(skynet.getenv("search_expire")) or 10
local cache_max_cnt = tonumber(skynet.getenv("search_max_cache")) or 100

function _M.search(name)
    local now = skynet.time()
    local cache_ret = lru_cache_data:get(name)
    if cache_ret and cache_ret.expire > now and cache_ret. search_list then 
        return cache_ret.search_list
    end 

    local search_list = db.find_by_name(name, limit)
    lru_cache_data:set(name, {
        expire = now + expire,
        search_list = search_list,
    })
    return search_list
end 

function _M.init()
    lru_cache_data = lru.new(cache_max_cnt)
end 

return _M 
  • lru_cache_data 用来缓存历史查询结果,并且在 LRU 基础上加了超时机制。即使在缓存中找出了历史查询的结果,如果时间超出了设定时间,也从数据库里重新查询。从数据库里查询到结果后,把结果放入缓存中。

模块需要在 ws_agent/mng.lua 中初始化

function _M.init(gate, watchdog)
    search.init()
end 

最后

本节完善了项目最后的几个小功能,提供了几个简单的 gm 指令接口进行测试,这里不在做演示。基本的项目框架构建完成了,只是相应的业务逻辑不够丰富,感兴趣的读者可以自己新增模块,新增接口去完善。

完整代码参考项目地址:https://gitee.com/Cauchy_AQ/skynet_practice/tree/skynet


路漫漫其修远兮,学习游戏服务器开发的路途才刚开始,skyent 作为入门级首选框架,到此也才算入门。只是能简单使用这个框架,要学习的东西还很多,继续努力!!!

之后还会继续学习 skynet 相关的项目,比如:@huahua132 大佬的项目 skynet_fly@hanxi 大佬的项目 skynet-demo

万国觉醒的源码还未尝研究,等有能力了在考虑研读一下这份源码,听说质量也是参差不齐,但总归是个大项目,能学不少知识。

未来也可能会尝试阅读一下 skynet 的源码,深入的理解底层机制。一起加油!

文章来源:https://blog.csdn.net/qq_52678569/article/details/135078813
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。