import base64 import random from typing import Annotated from pydantic import BaseModel, StringConstraints from . import router from starlette.responses import JSONResponse from fastapi import Request, HTTPException, Depends from ..sql import SessionMaker, Users, Sessions from ..sql_generated import Tokens from ..util import privilegedIps from ..auth import JWTBearer import nacl.pwhash import nacl.utils import string class PasswordChange(BaseModel): currentPassword: str newPassword: str newPasswordAgain: str class RememberMe(BaseModel): username: str password: str class TokenVerify(BaseModel): username: str token: str class TokenCreate(BaseModel): label: Annotated[str, StringConstraints(max_length=64)] class TokenDelete(BaseModel): id: int def passwordVerify(hash: str, password: str) -> bool: try: nacl.pwhash.scrypt.verify(hash.encode("utf8"), password.encode("utf8")) return True except: return False @router.get("/account/exists/{name}") async def exists(name: str): with SessionMaker() as session: check = session.query(Users).filter(Users.username == str(name)) first = check.first() if first is None: return JSONResponse({ "exists": False, "temporary": False }) else: return JSONResponse({ "exists": True, "temporary": bool(first.temporary) }) @router.post("/account/remember") async def rememberLogin(request: Request, loginData: RememberMe): with SessionMaker() as session: check = session.query(Users).filter(Users.username == str(loginData.username)) first: Users = check.first() if not first or not passwordVerify(first.password, loginData.password): return JSONResponse({ "success": False, "error": "Incorrect password" }) token = base64.b64encode(nacl.utils.random(32)) sess = Sessions(username=first.username, hash=nacl.pwhash.scrypt.str(token)) session.add(sess) session.commit() return JSONResponse({ "success": True, "token": token.decode("utf8") }) @router.get("/account/invite", dependencies=[Depends(JWTBearer())]) async def getInvite(request: Request): username = request.state.jwt["account"] with SessionMaker() as session: user = session.query(Users).filter(Users.username == username).first() return JSONResponse({ "code": user.invite_code }) @router.post("/account/invite/regenerate", dependencies=[Depends(JWTBearer())]) async def regenInvite(request: Request): username = request.state.jwt["account"] code = "" for _ in range(8): code += random.choice(string.ascii_uppercase) with SessionMaker() as session: user = session.query(Users).filter(Users.username == username).first() user.invite_code = code session.commit() return JSONResponse({ "code": code }) @router.post("/account/password", dependencies=[Depends(JWTBearer(False))]) async def changePassword(request: Request, passwordData: PasswordChange): if passwordData.newPassword != passwordData.newPasswordAgain: raise HTTPException(status_code=400, detail="Passwords don't match") if len(passwordData.newPassword) <= 5: raise HTTPException(status_code=400, detail="Come on, at least longer than 5 characters") whoami = request.state.jwt username = whoami["account"].lower() with SessionMaker() as session: user = session.query(Users).filter(Users.username == username).first() if not passwordVerify(user.password, passwordData.currentPassword): raise HTTPException(status_code=403, detail="Invalid original password") bPass = passwordData.newPassword.encode("utf8") user.password = nacl.pwhash.scrypt.str(bPass) user.temporary = False # clear sessions and tokens session.query(Sessions).filter(Sessions.username == user.username).delete() session.query(Tokens).filter(Tokens.username == user.username).delete() session.commit() return JSONResponse({ "success": True }) @router.post("/account/tokenCheck") async def tokenVerify(request: Request, tokenData: TokenVerify): with SessionMaker() as session: tokens = session.query(Tokens).filter(Tokens.username == str(tokenData.username)) for token in tokens.all(): if passwordVerify(token.hash, tokenData.token): return JSONResponse({ "success": True, }) return JSONResponse({ "success": False, }) @router.get("/account/tokens", dependencies=[Depends(JWTBearer())]) async def getTokens(request: Request): username = request.state.jwt["account"] with SessionMaker() as session: tokens = session.query(Tokens).filter(Tokens.username == str(username)) return JSONResponse({ "tokens": [ { "id": x.id, "name": x.name, "created_at": x.created_at.isoformat() } for x in tokens.all() ] }) @router.post("/account/token/create", dependencies=[Depends(JWTBearer())]) async def createToken(request: Request, info: TokenCreate): username = request.state.jwt["account"] with SessionMaker() as session: value = f"{username}-{base64.b64encode(nacl.utils.random(24)).decode('utf8')}" token = Tokens(username=username, hash=nacl.pwhash.scrypt.str(value.encode("utf8")), name=info.label) session.add(token) session.commit() return JSONResponse({ "token": { "id": token.id, "name": token.name, "created_at": token.created_at.isoformat(), "token": value } }) @router.post("/account/token/delete", dependencies=[Depends(JWTBearer())]) async def deleteToken(request: Request, info: TokenDelete): username = request.state.jwt["account"] with SessionMaker() as session: session.query(Tokens).filter(Tokens.username == username, Tokens.id == info.id).delete() session.commit() return JSONResponse({ "success": True }) @router.post("/account/verify", include_in_schema=False) async def verify(request: Request): if request.client.host not in privilegedIps: return False body = await request.json() bPass = body.get("passphrase", "").encode("utf8") with SessionMaker() as session: check = session.query(Users).filter(Users.username == str(body["accountName"])) first = check.first() if first: # Not too happy with this approach but it should work sessions = session.query(Sessions).filter(Users.username == str(body["accountName"])) for sess in sessions.all(): if passwordVerify(sess.hash, body.get("passphrase", "")): return JSONResponse({ "success": True, }) if passwordVerify(first.password, body.get("passphrase", "")): return JSONResponse({ "success": True, }) else: return JSONResponse({ "success": False, "error": "Incorrect password" }) else: # create account split = bPass.split(b"|") if len(split) != 2: return JSONResponse({ "success": False, "error": "No invite code" }) code, password = split firstUser = False if session.query(Users).count() == 0: firstUser = True inviteFrom = session.query(Users).filter(Users.invite_code == code.decode("utf8")).first() if not inviteFrom and not firstUser: return JSONResponse({ "success": False, "error": "Bad invite code" }) account = Users(username=body["accountName"], password=nacl.pwhash.scrypt.str(password), temporary=True) session.add(account) session.commit() return JSONResponse({ "success": True, })