3M/cef_3M/auth.py

45 lines
1.4 KiB
Python

import time
import jwt
from fastapi.security import HTTPBearer
import config
from fastapi import Request, HTTPException
JWT_PUBKEY = open(config.SECRETKEY).read()
JWT_ALGORITHM = "RS256"
def decodeJWT(token: str) -> dict:
try:
decoded_token = jwt.decode(token, JWT_PUBKEY, algorithms=[JWT_ALGORITHM])
return decoded_token if decoded_token["exp"] >= time.time() else None
except:
return {}
class JWTBearer(HTTPBearer):
def __init__(self, auto_error: bool = True):
super(JWTBearer, self).__init__(auto_error=auto_error)
async def __call__(self, request: Request):
credentials = await super(JWTBearer, self).__call__(request)
if credentials:
if not credentials.scheme == "Bearer":
raise HTTPException(status_code=403, detail="Invalid authentication scheme.")
if not self.verify_jwt(credentials.credentials):
raise HTTPException(status_code=403, detail="Invalid or expired token.")
request.state.jwt = decodeJWT(credentials.credentials)
return credentials.credentials
else:
raise HTTPException(status_code=403, detail="Invalid authorization code.")
def verify_jwt(self, jwtoken: str) -> bool:
isTokenValid: bool = False
try:
payload = decodeJWT(jwtoken)
except:
payload = None
if payload:
isTokenValid = True
return isTokenValid