3M/cef_3M/endpoints/account.py
CEF Server ba2e896813 add authentication
add built-in cleanup
general refactor
2024-07-29 03:26:14 +00:00

132 lines
4.5 KiB
Python

import random
from pydantic import BaseModel
from . import router
from starlette.responses import JSONResponse
from fastapi import Request, HTTPException, Depends
from ..sql import SessionMaker, Users
from ..util import privilegedIps
from ..auth import JWTBearer
import nacl.pwhash
import string
@router.get("/account/exists/{name}")
async def exists(name: str):
with SessionMaker() as session:
check = session.query(Users).filter(Users.username == str(name))
first = check.first()
if first is None:
return JSONResponse({
"exists": False,
"temporary": False
})
else:
return JSONResponse({
"exists": True,
"temporary": bool(first.temporary)
})
class PasswordChange(BaseModel):
currentPassword: str
newPassword: str
newPasswordAgain: str
@router.get("/account/invite", dependencies=[Depends(JWTBearer())])
async def getInvite(request: Request):
username = request.state.jwt["account"]
with SessionMaker() as session:
user = session.query(Users).filter(Users.username == username).first()
return JSONResponse({
"code": user.invite_code
})
@router.post("/account/invite/regenerate", dependencies=[Depends(JWTBearer())])
async def regenInvite(request: Request):
username = request.state.jwt["account"]
code = ""
for _ in range(8):
code += random.choice(string.ascii_uppercase)
with SessionMaker() as session:
user = session.query(Users).filter(Users.username == username).first()
user.invite_code = code
session.commit()
return JSONResponse({
"code": code
})
@router.post("/account/password", dependencies=[Depends(JWTBearer(False))])
async def changePassword(request: Request, passwordData: PasswordChange):
if passwordData.newPassword != passwordData.newPasswordAgain:
raise HTTPException(status_code=400, detail="Passwords don't match")
if len(passwordData.newPassword) <= 5:
raise HTTPException(status_code=400, detail="Come on, at least longer than 5 characters")
whoami = request.state.jwt
username = whoami["account"].lower()
with SessionMaker() as session:
user = session.query(Users).filter(Users.username == username).first()
bPassOld = passwordData.currentPassword.encode("utf8")
try:
nacl.pwhash.scrypt.verify(user.password.encode("utf8"), bPassOld)
except:
raise HTTPException(status_code=403, detail="Invalid original password")
bPass = passwordData.newPassword.encode("utf8")
user.password = nacl.pwhash.scrypt.str(bPass)
user.temporary = False
session.commit()
return JSONResponse({
"success": True
})
@router.post("/account/verify", include_in_schema=False)
async def verify(request: Request):
if request.client.host not in privilegedIps:
return False
body = await request.json()
bPass = body.get("passphrase", "").encode("utf8")
with SessionMaker() as session:
check = session.query(Users).filter(Users.username == str(body["accountName"]))
first = check.first()
if first:
try:
nacl.pwhash.scrypt.verify(first.password.encode("utf8"), bPass)
return JSONResponse({
"success": True,
})
except:
return JSONResponse({
"success": False,
"error": "Incorrect password"
})
else:
# create account
split = bPass.split(b"|")
if len(split) != 2:
return JSONResponse({
"success": False,
"error": "No invite code"
})
code, password = split
firstUser = False
if session.query(Users).count() == 0:
firstUser = True
inviteFrom = session.query(Users).filter(Users.invite_code == code.decode("utf8")).first()
if not inviteFrom and not firstUser:
return JSONResponse({
"success": False,
"error": "Bad invite code"
})
print("invite code", code, "password", password)
account = Users(username=body["accountName"], password=nacl.pwhash.scrypt.str(password), temporary=True)
session.add(account)
session.commit()
return JSONResponse({
"success": True,
})