当前位置: 首页 > news >正文

用户登出、修改密码或重置密码后,token的删除(flask)

项目已经实现了用户登录,并在登录之后,接口返回token,在后续的访问中,将token放入请求头里

用户登出、修改密码或重置密码后,需要删除携带的token,用户重新登录

实现token删除有两种方法:
一是写入黑名单文件,后续验证时读取文件里是否有这个token
二是用redis来实现

写黑名单文件的方式

import os# 写入黑名单文件
def save_blacklist(token):# 文件目录为函数所在的文件的父级目录(token_blacklist.txt与该文件在同一目录下)basedir = os.path.dirname(os.path.abspath(__file__))file_path = os.path.join(basedir, "token_blacklist.txt")timestamp = int(time.time())with open(file_path, "a") as file:file.write(f"{token}\n")# 读取黑名单文件
def load_blacklist(token):basedir = os.path.dirname(os.path.abspath(__file__))file_path = os.path.join(basedir, "token_blacklist.txt")try:with open(file_path, "r") as file:lines = file.read().splitlines()# 提取tokenlines = [line.strip() for line in lines]return token in blacklistexcept FileNotFoundError:return False# 验证token
def token_verify():token = request.headers.get("token")if not token:return error_response("token缺失!", 401)if load_blacklist(token):return error_response("请重新登录!", 401)try:# 这里我用的加密算法是RS256,需要提前生成密钥对jwt.decode(token, public_key, algorithms=["RS256"])# 也可以试试SH256算法,对称加密算法,安全性低一点# jwt.decode(token, "123456", algorithms=["SH256"])return Trueexcept InvalidSignatureError:return error_response("token不合法!", 401)except ExpiredSignatureError:return error_response("token过期!", 401)except Exception as e:# 捕获其他异常return logger.error(f"Token 验证失败: {e}")# 验证token功能封装成装饰器
def login_required(f):@wraps(f)def decorated(*args, **kwargs):result = token_verify()# 如果 token_verify 返回的是 Response 对象if isinstance(result, Response):  return resultreturn f(*args, **kwargs)return decorated

如果怕文件会越来越大,可以加一个定时任务,每天固定时间去清理过期的token,当然这需要在存token的时候也同时把时间戳一起存上。

# 定期清理token,需要存token的格式为token 时间戳
#     file.write(f"{token} {timestamp}\n")
# 获取token
#     blacklist = [line.split()[0] for line in lines if line.strip()]# 清理三天前的token
def clean_old_tokens():basedir = os.path.dirname(os.path.abspath(__file__))file_path = os.path.join(basedir, "token_blacklist.txt")# 获取当前时间current_time = time.time()# 三天前的时间戳three_days_ago = current_time - 3 * 24 * 60 * 60if not os.path.exists(file_path):# 文件不存在,无需清理returnwith open(file_path, "r") as file:lines = file.readlines()valid_tokens = []for line in lines:token, timestamp = line.strip().split()timestamp = float(timestamp)if timestamp >= three_days_ago:valid_tokens.append((token, timestamp))# 将有效的token写回文件with open(file_path, "w") as file:for token, timestamp in valid_tokens:file.write(f"{token} {timestamp}\n")print(f"已清理 {len(lines) - len(valid_tokens)} 个过期 token。")

在项目启动文件:

import schedule
# 每天凌晨0点执行清理任务
schedule.every().day.at("00:00").do(clean_old_tokens)
if __name__ == "__main__":tz_app.run(host=tz_app.config['HOST'],port=tz_app.config['PORT'],debug=tz_app.config['DEBUG'])while True:schedule.run_pending()time.sleep(1)

用文件存储的方式还有一点不好,就是在高并发场景下的性能较低。

使用redis

使用redis会比用文件的方式好很多,不需要担心文件太大,redis的存储形式是键值对的形式,并且会自动清理过期的数据。读写速度快,适合高频访问的场景。只需要对redis进行维护。

import redis
redis_client = redis.Redis(host='192.168.190.110', port=6379, db=0, password='123456')# token写入黑名单
def save_blacklist(token):# 这里设置token过期时间,到期redis会自动将token删除expires_in = 60 * 60 * 24 * 3try:redis_client.setex(token, expires_in, 1)logger.error("原token移入黑名单")return Trueexcept ConnectionError:logger.error("redis连接失败")return Falseexcept Exception as e:logger.error("token移入黑名单失败:", e)return False# 验证token
def token_verify():token = request.headers.get("token")if not token:return error_response("token缺失!", 401)# if load_blacklist(token):if redis_client.exists(token):return error_response("请重新登录!", 401)try:key = current_app.config.get("SECRET_KEY")jwt.decode(token, public_key, algorithms=["RS256"])return Trueexcept InvalidSignatureError:return error_response("token不合法!", 401)except ExpiredSignatureError:return error_response("token过期!", 401)except Exception as e:# 捕获其他异常return logger.error(f"Token 验证失败: {e}")from functools import wraps
from flask import Responsedef login_required(f):@wraps(f)def decorated(*args, **kwargs):result = token_verify()if isinstance(result, Response):  # 如果 token_verify 返回的是 Response 对象return resultreturn f(*args, **kwargs)return decorated

用户登出等接口调用上面的函数就行
执行之后可以检查一下token是否已经存在黑名单里

import redis
redis_client = redis.Redis(host='192.168.190.110', port=6379, db=0, password='123456')
if redis_client.exists("token123"):print("Token 在黑名单中")
else:print("Token 不在黑名单中")

在这里插入图片描述


http://www.mrgr.cn/news/94846.html

相关文章:

  • LeRobot源码剖析——对机器人各个动作策略的统一封装:包含ALOHA ACT、Diffusion Policy、VLA模型π0
  • 数据结构------线性表(链表)
  • Flask+Vue-Router+JWT实现登录验证
  • 项目实战系列:基于瑞萨RA6M5构建多节点OTA升级-系统设计<一>
  • 【WRF数据准备】 基于CDO/Python 拼接 grib 数据:如ERA5 气象数据
  • 设计模式之外观模式:原理、实现与应用
  • HarmonyOS三层架构实战
  • 【Linux内核系列】:进程板块与文件板块的综合
  • C# 一文读懂委托与事件
  • 【C++进阶一】STL和string
  • Python集合
  • 【MySQL基础-9】深入理解MySQL中的聚合函数
  • SpringCloud 学习笔记2(Nacos)
  • 数据结构篇——二叉树的存储与遍历
  • GaussDB备份数据常用命令
  • SSM框架——Spring面试题
  • 汇编基础知识
  • [HelloCTF]PHPinclude-labs超详细WP-Level 0
  • 解决git init 命令不显示.git
  • C++基础 [五] - String的模拟实现