flask中生成token,校验token,token装饰器

2023-12-14 04:59:41
1.生成token
??????? 注意点:将用户的id,用户名,密码传入,实际开始时可随时更改;添加了过期时间这个时间与redis中token的过期时间是独立的
import functools
from datetime import datetime

import jwt
from flask import current_app, g, request
from jwt import PyJWTError

from manger import redis_store
from manger.models import User
from utils.message import JsonResponse


def encode_token(uid, username, password):
    """
    生成token
    :param username:
    :param password:
    :return:
    """
    with current_app.app_context():
        key = current_app.config['SECRET_KEY']
        algorithm = 'HS256'
        headers = {
            'typ': 'jwt',
            'alg': 'HS256'
        }
        import datetime
        payload = {
            'id': uid,
            'username': username,
            'password': password,
            'exp': datetime.datetime.utcnow() + datetime.timedelta(seconds=10)  # 超时时间
        }
        token = jwt.encode(payload=payload, key=key, algorithm=algorithm, headers=headers)

    return token
2. 验证token
def decode_token(token):
    """
    验证token
    :param token:
    :return:
    """
    with current_app.app_context():
        key = current_app.config['SECRET_KEY']
        try:
            # 返回之前生成token的时候的字典,字典种包含id和exp
            data = jwt.decode(token, key=key, algorithms='HS256')
            current_app.logger.info(data)
            user = User.query.filter(User.id == data['id']).first()
            if user:  # 如果用户存在,并且没有锁定
                data = {
                    "id": user.id,
                    "name": user.name,
                }
                return JsonResponse(code=200, data=data, msg='token验证成功!')
            else:
                return JsonResponse(code=20000, data=data, msg='token中用户信息不存在!')
        except PyJWTError as e:
            current_app.logger.error(e)
    return JsonResponse(code=20000, data=data, msg='token验证失败!')
3.装饰器
??????? 注意点:在此处需要验证redis中的token是否已经过期,倘若已经过期直接抛出错误,如果redis中没有过期那么在JWT中内部的时间过期了也会通过PyJWTError抛出错误
def login_required(view_func):
    @functools.wraps(view_func)
    def verify_token(*args, **kwargs):
        with current_app.app_context():
            key = current_app.config['SECRET_KEY']
            try:
                authorization_header = request.headers.get('Authorization')
                if authorization_header and authorization_header.startswith('Bearer '):
                    token = authorization_header.split(' ')[1]
                else:
                    return JsonResponse(code=20001, msg="未获取到token").to_response()
            except Exception:
                return JsonResponse(code=20001, msg="未获取到token").to_response()

            # 检查Redis中的token是否过期
            if redis_store.ttl('token') < 0:
                return JsonResponse(code=20002, msg="redis中Token已过期").to_response()
            try:
                data = jwt.decode(token, key=key, algorithms='HS256')
                user = User.query.filter(User.id == data['id']).first()
                if not user:
                    return JsonResponse(code=20003, msg="该用户不存在!").to_response()
                return view_func(*args, **kwargs)
            except PyJWTError as e:
                current_app.logger.error(e)
                return JsonResponse(code=20004, msg="JWT中token已过期!").to_response()

    return verify_token

文章来源:https://blog.csdn.net/qq_44906497/article/details/134850180
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。