adjust config for bypassing nginx add communication to ergo add cachebusting + fixing icons
46 lines
1.5 KiB
Python
46 lines
1.5 KiB
Python
import time
|
|
import jwt
|
|
from fastapi.security import HTTPBearer
|
|
|
|
import config
|
|
from fastapi import Request, HTTPException
|
|
|
|
|
|
JWT_PUBKEY = open(config.SECRETKEY).read()
|
|
JWT_ALGORITHM = "RS256"
|
|
|
|
|
|
def decodeJWT(token: str) -> dict:
|
|
try:
|
|
decoded_token = jwt.decode(token, JWT_PUBKEY, algorithms=[JWT_ALGORITHM])
|
|
return decoded_token if decoded_token["exp"] >= time.time() else None
|
|
except Exception as e:
|
|
return {}
|
|
|
|
|
|
class JWTBearer(HTTPBearer):
|
|
def __init__(self, auto_error: bool = True):
|
|
super(JWTBearer, self).__init__(auto_error=auto_error)
|
|
|
|
async def __call__(self, request: Request):
|
|
credentials = await super(JWTBearer, self).__call__(request)
|
|
if credentials:
|
|
if not credentials.scheme == "Bearer":
|
|
raise HTTPException(status_code=403, detail="Invalid authentication scheme.")
|
|
if not self.verify_jwt(credentials.credentials):
|
|
raise HTTPException(status_code=403, detail="Invalid or expired token.")
|
|
request.state.jwt = decodeJWT(credentials.credentials)
|
|
return credentials.credentials
|
|
else:
|
|
raise HTTPException(status_code=403, detail="Invalid authorization code.")
|
|
|
|
def verify_jwt(self, jwtoken: str) -> bool:
|
|
isTokenValid: bool = False
|
|
|
|
try:
|
|
payload = decodeJWT(jwtoken)
|
|
except:
|
|
payload = None
|
|
if payload:
|
|
isTokenValid = True
|
|
return isTokenValid
|