用户登出、修改密码或重置密码后,token的删除(flask)
项目已经实现了用户登录,并在登录之后,接口返回token,在后续的访问中,将token放入请求头里
用户登出、修改密码或重置密码后,需要删除携带的token,用户重新登录
实现token删除有两种方法:
一是写入黑名单文件,后续验证时读取文件里是否有这个token
二是用redis来实现
写黑名单文件的方式
import os# 写入黑名单文件
def save_blacklist(token):# 文件目录为函数所在的文件的父级目录(token_blacklist.txt与该文件在同一目录下)basedir = os.path.dirname(os.path.abspath(__file__))file_path = os.path.join(basedir, "token_blacklist.txt")timestamp = int(time.time())with open(file_path, "a") as file:file.write(f"{token}\n")# 读取黑名单文件
def load_blacklist(token):basedir = os.path.dirname(os.path.abspath(__file__))file_path = os.path.join(basedir, "token_blacklist.txt")try:with open(file_path, "r") as file:lines = file.read().splitlines()# 提取tokenlines = [line.strip() for line in lines]return token in blacklistexcept FileNotFoundError:return False# 验证token
def token_verify():token = request.headers.get("token")if not token:return error_response("token缺失!", 401)if load_blacklist(token):return error_response("请重新登录!", 401)try:# 这里我用的加密算法是RS256,需要提前生成密钥对jwt.decode(token, public_key, algorithms=["RS256"])# 也可以试试SH256算法,对称加密算法,安全性低一点# jwt.decode(token, "123456", algorithms=["SH256"])return Trueexcept InvalidSignatureError:return error_response("token不合法!", 401)except ExpiredSignatureError:return error_response("token过期!", 401)except Exception as e:# 捕获其他异常return logger.error(f"Token 验证失败: {e}")# 验证token功能封装成装饰器
def login_required(f):@wraps(f)def decorated(*args, **kwargs):result = token_verify()# 如果 token_verify 返回的是 Response 对象if isinstance(result, Response): return resultreturn f(*args, **kwargs)return decorated
如果怕文件会越来越大,可以加一个定时任务,每天固定时间去清理过期的token,当然这需要在存token的时候也同时把时间戳一起存上。
# 定期清理token,需要存token的格式为token 时间戳
# file.write(f"{token} {timestamp}\n")
# 获取token
# blacklist = [line.split()[0] for line in lines if line.strip()]# 清理三天前的token
def clean_old_tokens():basedir = os.path.dirname(os.path.abspath(__file__))file_path = os.path.join(basedir, "token_blacklist.txt")# 获取当前时间current_time = time.time()# 三天前的时间戳three_days_ago = current_time - 3 * 24 * 60 * 60if not os.path.exists(file_path):# 文件不存在,无需清理returnwith open(file_path, "r") as file:lines = file.readlines()valid_tokens = []for line in lines:token, timestamp = line.strip().split()timestamp = float(timestamp)if timestamp >= three_days_ago:valid_tokens.append((token, timestamp))# 将有效的token写回文件with open(file_path, "w") as file:for token, timestamp in valid_tokens:file.write(f"{token} {timestamp}\n")print(f"已清理 {len(lines) - len(valid_tokens)} 个过期 token。")
在项目启动文件:
import schedule
# 每天凌晨0点执行清理任务
schedule.every().day.at("00:00").do(clean_old_tokens)
if __name__ == "__main__":tz_app.run(host=tz_app.config['HOST'],port=tz_app.config['PORT'],debug=tz_app.config['DEBUG'])while True:schedule.run_pending()time.sleep(1)
用文件存储的方式还有一点不好,就是在高并发场景下的性能较低。
使用redis
使用redis会比用文件的方式好很多,不需要担心文件太大,redis的存储形式是键值对的形式,并且会自动清理过期的数据。读写速度快,适合高频访问的场景。只需要对redis进行维护。
import redis
redis_client = redis.Redis(host='192.168.190.110', port=6379, db=0, password='123456')# token写入黑名单
def save_blacklist(token):# 这里设置token过期时间,到期redis会自动将token删除expires_in = 60 * 60 * 24 * 3try:redis_client.setex(token, expires_in, 1)logger.error("原token移入黑名单")return Trueexcept ConnectionError:logger.error("redis连接失败")return Falseexcept Exception as e:logger.error("token移入黑名单失败:", e)return False# 验证token
def token_verify():token = request.headers.get("token")if not token:return error_response("token缺失!", 401)# if load_blacklist(token):if redis_client.exists(token):return error_response("请重新登录!", 401)try:key = current_app.config.get("SECRET_KEY")jwt.decode(token, public_key, algorithms=["RS256"])return Trueexcept InvalidSignatureError:return error_response("token不合法!", 401)except ExpiredSignatureError:return error_response("token过期!", 401)except Exception as e:# 捕获其他异常return logger.error(f"Token 验证失败: {e}")from functools import wraps
from flask import Responsedef login_required(f):@wraps(f)def decorated(*args, **kwargs):result = token_verify()if isinstance(result, Response): # 如果 token_verify 返回的是 Response 对象return resultreturn f(*args, **kwargs)return decorated
用户登出等接口调用上面的函数就行
执行之后可以检查一下token是否已经存在黑名单里
import redis
redis_client = redis.Redis(host='192.168.190.110', port=6379, db=0, password='123456')
if redis_client.exists("token123"):print("Token 在黑名单中")
else:print("Token 不在黑名单中")