import time import jwt from fastapi.security import HTTPBearer import config from fastapi import Request, HTTPException JWT_PUBKEY = open(config.SECRETKEY).read() JWT_ALGORITHM = "RS256" def decodeJWT(token: str) -> dict: try: decoded_token = jwt.decode(token, JWT_PUBKEY, algorithms=[JWT_ALGORITHM]) return decoded_token if decoded_token["exp"] >= time.time() else None except Exception as e: return {} class JWTBearer(HTTPBearer): def __init__(self, auto_error: bool = True): super(JWTBearer, self).__init__(auto_error=auto_error) async def __call__(self, request: Request): credentials = await super(JWTBearer, self).__call__(request) if credentials: if not credentials.scheme == "Bearer": raise HTTPException(status_code=403, detail="Invalid authentication scheme.") if not self.verify_jwt(credentials.credentials): raise HTTPException(status_code=403, detail="Invalid or expired token.") request.state.jwt = decodeJWT(credentials.credentials) return credentials.credentials else: raise HTTPException(status_code=403, detail="Invalid authorization code.") def verify_jwt(self, jwtoken: str) -> bool: isTokenValid: bool = False try: payload = decodeJWT(jwtoken) except: payload = None if payload: isTokenValid = True return isTokenValid