celery是封装记录

2023-12-21 12:45:03

目录

入口主程序

celery配置文件

config/config.conf(可取消)

redis 配置

nanny 保姆文件


入口主程序

celery启动时调用的主程序

tasks.py

#! /usr/bin/env python                                                                                                                                                                   
# -*- coding: utf-8 -*-

from celery import Celery

from config.config import CeleryConfig
from share.log_config import setup_log

# 创建 Celery 应用
setup_log()

aaa_app = Celery('attack_app',
    include=[
        'nanny',
    ]
)
aaa_app.config_from_object(CeleryConfig)


if __name__ == '__main__':
    aaa_app.worker_main()

celery配置文件

设置celery 参数 与 redis 连接

config/config.py

#! /usr/bin/env python
# -*- coding: utf-8 -*-

import os
import redis

from share.app_settings import AppSetting


class RedisConfig():
    flag = AppSetting().get_config_value("log", "debug")
    DEBUG = True if flag != "False" else False
    ENV_REDIS_HOST = os.getenv('REDIS_URL')
    #host = ENV_REDIS_HOST if ENV_REDIS_HOST else ('192.168.0.156' if DEBUG else '192.168.154.247')
    host = ENV_REDIS_HOST if ENV_REDIS_HOST else ('127.0.0.1' if DEBUG else '127.0.0.1')
    port = 6379
    password = '123456' if DEBUG else '123456'
    cache_db = 2
    borker = 0
    backend = 1
    rc = redis.StrictRedis(
            host=host, 
            port=port, 
            db=cache_db,
            password=password
    )

class CeleryConfig(object):
    BROKER_URL = f"redis://:{RedisConfig.password}@{RedisConfig.host}:{RedisConfig.port}/{RedisConfig.borker}"    # borker
    CELERY_RESULT_BACKEND = f"redis://:{RedisConfig.password}@{RedisConfig.host}:{RedisConfig.port}/{RedisConfig.backend}"   # backend
    CELERY_TASK_SERIALIZER = 'json'  # " json从4.0版本开始默认json,早期默认为pickle(可以传二进制对象)
    CELERY_RESULT_SERIALIZER = 'json'
    CELERY_ACCEPT_CONTENT = ['json']
    CELERY_ENABLE_UTC = True  # 启用UTC时区设置
    CELERY_TIMEZONE = 'Asia/Shanghai'  # 上海时区
    CELERYD_MAX_TASKS_PER_CHILD = 1  # 每个进程最多执行1个任务后释放进程(再有任务,新建进程执行,解决内存泄漏)
    WORKER_HIJACK_ROOT_LOGGER = False

config/config.conf(可取消)

[log]
debug = False
path  = logs/celery.tasks.log

redis 配置

主要设置密码,举例

config/redis.conf

requirepass 123456

nanny 保姆文件

在入口文件中 include 中

用于异步动态调用模块使用

这里注意一处注释的代码

@aaa_app.task(base=CallbackTask, ignore_result=True)

ignore_result参数:如果使用该参数,则当调用 load_and_run_plugin.delay(module_path, data, task_meta) 函数时,返回值将无法获取

res = load_and_run_plugin.delay(module_path, data, task_meta)
result = res.get()

头文件中的?sys.path.append("./") 必不可缺,否则动态调用模块无法获取正确的路径

nanay.py

#! /usr/bin/env python                                                                                                                                                                   
# -*- coding: utf-8 -*-

import os
import sys
import json
import logging
import traceback
import importlib
from celery import Task

from tasks import aaa_app

sys.path.append("./")
logger = logging.getLogger("log")


class CallbackTask(Task):
    """
    exc     : 失败时的错误的类型
    task_id : 任务的id;
    args    : 任务函数的参数
    kwargs  : 键值对参数
    einfo   : 失败或重试时的异常详细信息
    retval  : 任务成功执行的返回值
    """

    def on_success(self, retval, task_id, args, kwargs):
        pass
        """
        aaa_app.send_task(
            "task_manager.load_worker_task_result",
            args=(
                {"retval": retval, "task_id": task_id, "args": args, "kwargs": kwargs}, "success"
            ),
            queue="queue_task_manager"
        )
        """

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        pass
        """
        attack_app.send_task(
            "task_manager.load_worker_task_result",
            args=(
                {"exc": exc, "task_id": task_id, "args": args, "kwargs": kwargs, "einfo": einfo}, "failure"
            ),
            queue="queue_task_manager"
        )
        """

#@aaa_app.task(base=CallbackTask, ignore_result=True)
@aaa_app.task(base=CallbackTask)
def load_and_run_plugin(module, data, task_meta, func="run"):
    model_obj = importlib.import_module(module)
    print(f'开始获取{module}插件的{func}方法')

    _func = getattr(model_obj, func, None)
    if _func:
        try:
            print(f'开始运行{module}.{func}方法,入参:{data}')
            print(f"task_meta:{task_meta}")
            module_result = dict()
            module_result["result"] = _func(data, task_meta)
        except Exception  as err:
            errortrace = traceback.format_exc()
            module_result["status"] = False
            module_result["errinfo"] = f"{module}.{func}出错,错误信息:{errortrace}"
            print(f"{module}.{func}出错,错误信息:{errortrace}")
            logger.error(f"{module}.{func}出错,错误信息:{errortrace}")
    else:
        module_result["status"] = False
        module_result["errinfo"] = f"{module}不存在方法{func},请检查插件"
        logger.error(f"{module}不存在方法{func},请检查插件")
    return module_result

文章来源:https://blog.csdn.net/u012206617/article/details/135104147
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。