如何在 FastAPI 中设置定时任务:完全指南
2023-12-20 21:40:27
Web 应用程序开发中,及时高效处理常规任务至关重要,包括定时收集数据或管理任务计划。针对强大且性能卓越的?FastAPI?框架,我们可以通过几种策略来管理这些必要的定时任务。
实现 FastAPI 中的定时任务
本指南将探讨在 FastAPI 环境中管理定时任务的三种实用方法:使用 APScheduler,利用 Celery 任务队列的力量,以及利用内置的 asyncio 进行调度。
1. 利用 APScheduler
APScheduler 是 Python 调度库,以其灵活性和易于集成而著称。以下是如何在?FastAPI?中使用它:
安装
pip install APScheduler
集成与初始化
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
定义定时函数
def tick():
print('Tick! The time is: %s' % datetime.now())
在 FastAPI 初始化后启动 APScheduler
from fastapi import FastAPI
app = FastAPI()
@app.on_event("startup")
async def app_start():
scheduler.add_job(execute_periodic_function, 'interval', seconds=3)
scheduler.start()
2. 使用 Celery
Celery 是一个高效的分布式任务队列系统,可与 FastAPI 无缝集成。
设置 Celery
pip install celery
定义 Celery 应用与任务
from celery import Celery
celery_app = Celery('my_fastapi_app')
@celery_app.task
def celery_periodic_task():
print('执行了 Celery 任务')
在 FastAPI 启动时安排任务
from celery.schedules import crontab
@app.on_event("startup")
async def app_start():
celery_app.conf.beat_schedule = {
'每半分钟执行': {
'task': 'celery_periodic_task',
'schedule': 30.0,
},
}
3. 使用 Asyncio 进行定时任务
Python 的原生异步库 asyncio 也可用于调度定时任务。
Asyncio 定时任务示例
import asyncio
@app.on_event("startup")
async def app_start():
asyncio.create_task(async_cron())
async def async_cron():
while True:
print('执行 Async 定时任务')
await asyncio.sleep(10)
实践示例:使用 APScheduler
以下是完整的使用 APScheduler 管理定时任务的 FastAPI 应用示例:
from fastapi import FastAPI
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
app = FastAPI()
scheduler = BackgroundScheduler()
def periodic_function():
print(f'定时执行的操作时间:{datetime.now()}')
@app.on_event("startup")
async def app_start():
scheduler.add_job(periodic_function, 'interval', seconds=3)
scheduler.start()
@app.get("/")
async def welcome():
return {"message": "欢迎访问定时任务演示"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000)
提示、技巧及注意事项
- 使用 Celery 时,请确保已安装并配置 Redis(或其他消息代理)。
- 注意任务执行时长,避免对应用性能产生负面影响。
- 使用 try-except 模块处理定时任务中的异常,并执行适当的错误处理。
使用?Apifox?这样的工具可以简化 API 测试,这是 Postman 等竞品的更强大的替代品。Apifox 将 Postman、Swagger、Mock?和 JMeter 的功能整合在一起,简化了对各种协议 API 的调试,提高了项目投产效率。
结论
无论选择 APScheduler、Celery 还是 asyncio,FastAPI 都为实现定时任务提供了强大的解决方案。每种方法都有其优点,APScheduler 使用友好,asyncio 与 FastAPI 的异步特性相契合。根据您的具体需求和场景选择最合适的方法。
知识扩展:
参考链接:
- APScheduler Documentation:?https://apscheduler.readthedocs.io/en/latest/index.html
- Celery Documentation:?https://docs.celeryq.dev/en/stable/index.html
- asyncio Documentation:?https://docs.python.org/3/library/asyncio.html
文章来源:https://blog.csdn.net/m0_71808387/article/details/135113118
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!