fastapi双token机制登录实现
一、整体架构
二、代码实现
from datetime import datetime, timedelta, timezone import uuid from redis import asyncio from fastapi import HTTPException, Depends,FastAPI,Response,Request from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from jose import jwt, JWTError from pydantic import BaseModel from tortoise.contrib.fastapi import register_tortoise from app.config.model_conf.settings import TORTOISE_ORM from app.models.users import Users api_login = FastAPI() class user_login(BaseModel): username: str password: str class user_login_success(BaseModel): token: str token_type: str="Bearer" ACCESS_TOKEN_EXPIRE_MINUTES = 15 REFRESH_TOKEN_EXPIRE_DAYS = 7 SECRET_KEY = "CHANGE_ME_TO_32RANDOM_STRING" ALGORITHM = "HS256" REDIS_URL = "redis://192.168.88.14:6379/0" BLK_PREFIX = "black:jti:" REF_PREFIX = "refresh:" def create_access_token(username: str, jti: str) -> str: expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) return jwt.encode({"username": username, "jti": jti, "exp": expire}, SECRET_KEY, algorithm=ALGORITHM) # redis初始化 async def create_redis(): redis = await asyncio.from_url( REDIS_URL, decode_responses=True, max_connections=100, ) return redis @api_login.post("/login",response_model=user_login_success) async def login(user:user_login,response: Response,redis=Depends(create_redis)): user_info = Users.get(uername=user.username,password=user.password) if not user_info: raise HTTPException(status_code=401,detail="用户名或密码错误") # 生成access_token jti = str(uuid.uuid4()) access_token = create_access_token(user.username,jti) # 生成refresh_token,存储redis和cookie refresh_token = str(uuid.uuid4()) await redis.setex(REF_PREFIX+refresh_token,REFRESH_TOKEN_EXPIRE_DAYS*60*60*24,user.username) response.set_cookie( key="refresh_token", value=refresh_token, max_age=REFRESH_TOKEN_EXPIRE_DAYS*60*60*24, httponly=True, samesite="lax", secure=False, # 生产 True ) return {"token": access_token} # 解码token,验证token是否合法 def decode_token(token: str): try: return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) except JWTError: return None # 获取当前登录用户信息 http_bearer = HTTPBearer(auto_error=True) async def get_current_user( cred: HTTPAuthorizationCredentials = Depends(http_bearer), redis=Depends(create_redis) ): # 解码token username_jti = decode_token(cred.credentials) print(username_jti) if not username_jti: raise HTTPException(status_code=401,detail="token无效") if not username_jti.get("username") or not username_jti.get("jti"): raise HTTPException(status_code=401,detail="token无效") # 验证用户是否已经退出 if await redis.exists(BLK_PREFIX+username_jti.get("jti")): raise HTTPException(status_code=401,detail="token已失效") return username_jti.get("username") # 重新签发access_token和refresh_token @api_login.post("/refresh") async def refresh(request: Request,response: Response,redis=Depends(create_redis),current_user: str = Depends(get_current_user)): # refresh_token存在,access_token失效,重新生成token # 判断并生成refresh_token refresh_token=request.cookies.get("refresh_token") if not refresh_token: raise HTTPException(status_code=401,detail="缺少 refresh_token") if not await redis.exists(REF_PREFIX+refresh_token): raise HTTPException(status_code=401,detail="refresh_token不存在") new_token = str(uuid.uuid4()) await redis.delete(REF_PREFIX+refresh_token) await redis.setex(REF_PREFIX+new_token,REFRESH_TOKEN_EXPIRE_DAYS*60*60*24,current_user) response.set_cookie( key="refresh_token", value=new_token, max_age=REFRESH_TOKEN_EXPIRE_DAYS*60*60*24, httponly=True, samesite="lax", secure=False, ) # 生成access_token new_jti = str(uuid.uuid4()) access_token = create_access_token(current_user,new_jti) return {"access_token":access_token} # 注销,依赖当前登录的用户,拉黑+删除token,redis @api_login.post("/logout") async def logout(request: Request,redis=Depends(create_redis), username_jti: str = Depends(get_current_user), cred: HTTPAuthorizationCredentials = Depends(http_bearer), ): # 退出,请求头获取token,拉黑当前请求的token header_token = decode_token(cred.credentials) await redis.setex(BLK_PREFIX+header_token.get("jti"),ACCESS_TOKEN_EXPIRE_MINUTES*60,1) # 从cookie中获取refresh_token,删除reids的refresh_token refresh_token = request.cookies.get("refresh_token") if refresh_token: await redis.delete(REF_PREFIX+refresh_token) else: raise HTTPException(status_code=401,detail="缺少refresh_token") return {"msg": "注销成功"} class user_register(BaseModel): msg: str @api_login.post("/register",response_model=user_register) async def register(user:user_login): # 1. 用户名重复检查 if await Users.filter(username=user.username).exists(): raise HTTPException(status_code=409, detail="用户已存在") if await Users.create(username=user.username, password=user.password): return {"msg": "注册成功,跳转到登录界面"} return {"msg": "注册失败"} # 数据库参数配置 register_tortoise( api_login, config=TORTOISE_ORM, ) if __name__ == "__main__": import uvicorn uvicorn.run(api_login, host="127.0.0.1", port=8000)