FastAPI使用异步Redis
2023-12-29 22:35:05
1. 安装依赖
pip install fastapi redis uvicorn
2. 主要代码
#!/usr/bin/env python
import os
import sys
from contextlib import AbstractAsyncContextManager, asynccontextmanager
from datetime import timedelta
from pathlib import Path
from fastapi import FastAPI, Request
from pydantic import BaseModel
from redis import asyncio as aioredis
class RegisterRedis(AbstractAsyncContextManager):
def __init__(self, app: FastAPI, *args, **kw) -> None:
app.state.redis = self._redis = aioredis.Redis(*args, **kw)
async def __aenter__(self) -> "RegisterRedis":
await self._redis.ping()
return self
async def __aexit__(self, *args, **kw):
await self._redis.aclose()
@staticmethod
def get_client(request: Request) -> aioredis.Redis:
return request.app.state.redis
@asynccontextmanager
async def lifespan(app: FastAPI):
async with RegisterRedis(app):
yield
app = FastAPI(lifespan=lifespan)
@app.get("/")
async def get_all_redis_keys(request: Request) -> list[str]:
redis = RegisterRedis.get_client(request)
return await redis.keys()
class Item(BaseModel):
key: str
value: str | int | float | bytes
expire: int | timedelta | None = None
@app.post("/")
async def set_key_value(request: Request, item: Item) -> dict[str, str]:
redis = RegisterRedis.get_client(request)
await redis.set(item.key, item.value, item.expire or None)
value = await redis.get(item.key)
return {item.key: value.decode()}
def runserver() -> None:
"""This is for debug mode to start server. For prod, use supervisor+gunicorn instead."""
import uvicorn # type:ignore
root_app = Path(__file__).stem + ":app"
auto_reload = "PYCHARM_HOSTED" not in os.environ
host = "0.0.0.0"
port = 8000
if sys.argv[1:]:
port = int(sys.argv[1])
if sys.platform == "darwin" or sys.platform.lower().startswith("win"):
tool = "open" if Path("/usr/bin/open").exists() else "explorer"
os.system(f"{tool} http://127.0.0.1:{port}") # Auto open browser
uvicorn.run(root_app, host=host, port=port, reload=auto_reload)
if __name__ == "__main__":
runserver()
文章来源:https://blog.csdn.net/jaket5219999/article/details/135299109
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!