3M/cef_3M/endpoints/alerts.py
CEF Server ba2e896813 add authentication
add built-in cleanup
general refactor
2024-07-29 03:26:14 +00:00

54 lines
1.9 KiB
Python

from fastapi import Request, Depends
from starlette.responses import JSONResponse
from . import router
from .. import JWTBearer
from ..sql import SessionMaker, AlertEndpoints
from pydantic import BaseModel, create_model, HttpUrl
class SubscriptionData(BaseModel):
endpoint: HttpUrl
keys: create_model("keys", auth=(str, ...), p256dh=(str, ...))
@router.post("/alert/register", dependencies=[Depends(JWTBearer())])
async def register(request: Request, subscription: SubscriptionData):
with SessionMaker() as session:
check = session.query(AlertEndpoints).filter(AlertEndpoints.url == str(subscription.endpoint))
if check.first() is not None:
return JSONResponse({
"success": True
})
info = AlertEndpoints(username=request.state.jwt["account"], url=str(subscription.endpoint),
auth=subscription.keys.auth, p256dh=subscription.keys.p256dh)
session.merge(info)
session.commit()
return JSONResponse({
"success": True
})
@router.post("/alert/unregister", dependencies=[Depends(JWTBearer())])
async def unregister(request: Request):
with SessionMaker() as session:
body = await request.json()
session.query(AlertEndpoints).filter(AlertEndpoints.url == body.get("url", ""),
AlertEndpoints.username == request.state.jwt["account"]).delete()
session.commit()
return JSONResponse({
"success": True
})
@router.post("/alert/clear", dependencies=[Depends(JWTBearer())])
async def clear(request: Request):
with SessionMaker() as session:
session.query(AlertEndpoints).filter(AlertEndpoints.username == request.state.jwt["account"]).delete()
session.commit()
return JSONResponse({
"success": True
})