diff --git a/cef_3M/auth.py b/cef_3M/auth.py index 2c66e93..ef174dc 100644 --- a/cef_3M/auth.py +++ b/cef_3M/auth.py @@ -5,6 +5,7 @@ from fastapi.security import HTTPBearer import config from fastapi import Request, HTTPException + JWT_PUBKEY = open(config.SECRETKEY).read() JWT_ALGORITHM = "RS256" @@ -13,7 +14,7 @@ def decodeJWT(token: str) -> dict: try: decoded_token = jwt.decode(token, JWT_PUBKEY, algorithms=[JWT_ALGORITHM]) return decoded_token if decoded_token["exp"] >= time.time() else None - except: + except Exception as e: return {} diff --git a/cef_3M/endpoints/mediamtx.py b/cef_3M/endpoints/mediamtx.py new file mode 100644 index 0000000..fcaea78 --- /dev/null +++ b/cef_3M/endpoints/mediamtx.py @@ -0,0 +1,95 @@ +from starlette.responses import JSONResponse + +from . import router +from fastapi import Request, Depends + +from ..auth import decodeJWT, JWTBearer +from ..util import redis, ergo + + +def pathParts(path): + if len(path) == 2: + channel = None + user, token = path + elif len(path) == 3: + channel, user, token = path + channel = "#" + channel + else: + return None, None, None + return channel, user, token + + +@router.get("/mediamtx/streams/{channel}", dependencies=[Depends(JWTBearer())]) +async def mediamtxChannelStreams(request: Request, channel: str): + inChannel = request.state.jwt.get("channel", "").lower() == "#" + channel.lower() + results = [] + for result in redis.scan_iter(f"stream #{channel} *"): + _, channel, user, token = result.decode("utf8").split() + if inChannel or token == "public": + results.append({ + "user": user, + "token": token + }) + return JSONResponse(status_code=200, content=results) + + +@router.post("/mediamtx/auth", include_in_schema=False) +async def mediamtxAuth(request: Request): + if request.client.host != "127.0.0.1": + return False + body = await request.json() + jwt = decodeJWT(body["query"][4:]) + path = body["path"].split("/") + reading = body["action"] == "read" + + channel, user, token = pathParts(path) + + if user is None: + return JSONResponse(status_code=400, content={"error": "bad path"}) + if " " in token: + return JSONResponse(status_code=400, content={"error": "bad token"}) + + # the only time we don't care about JWT is if someone is watching a public stream + if reading and token == "public": + return JSONResponse(status_code=200, content={"success": True}) + + if len(jwt.keys()) == 0: + return JSONResponse(status_code=403, content={"error": "bad jwt"}) + + # TODO: channel stream permissions + # publishing + if not reading: + if user != jwt["account"]: + return JSONResponse(status_code=403, content={"error": "nuh uh"}) + if channel and jwt["channel"] != channel: + return JSONResponse(status_code=403, content={"error": "nuh uh"}) + + return JSONResponse(status_code=200, content={"success": True}) + + +@router.post("/mediamtx/add", include_in_schema=False) +async def mediamtxAdd(request: Request): + if request.client.host != "127.0.0.1": + return False + body = await request.json() + path = body["env"]["MTX_PATH"].split("/") + parts = [x for x in pathParts(path) if x] + redis.set("stream " + " ".join(parts), parts[2]) + if len(parts) == 3: + await ergo.broadcastTo(parts[0], "STREAMSTART", parts[0], parts[1], parts[2]) + + + +@router.post("/mediamtx/del", include_in_schema=False) +async def mediamtxDelete(request: Request): + if request.client.host != "127.0.0.1": + return False + body = await request.json() + path = body["env"]["MTX_PATH"].split("/") + parts = [x for x in pathParts(path) if x] + redis.delete("stream " + " ".join(parts)) + if len(parts) == 3: + await ergo.broadcastTo(parts[0], "STREAMEND", parts[0], parts[1], parts[2]) + + + diff --git a/cef_3M/endpoints/pfp.py b/cef_3M/endpoints/pfp.py index 4eb86dc..87be743 100644 --- a/cef_3M/endpoints/pfp.py +++ b/cef_3M/endpoints/pfp.py @@ -4,7 +4,7 @@ import time from . import router from fastapi import UploadFile, Request, Depends -from ..util import minioClient +from ..util import minioClient, ergo from ..auth import JWTBearer import config @@ -45,10 +45,11 @@ async def pfpUpload(file: UploadFile, request: Request): mime = mimetypes.guess_type(file.filename) minioClient.put_object("pfp", username, file.file, file.size, content_type=mime[0]) - return {"url": f"https://{config.MINIO_ADDR}/pfp/{username}?{time.time():.0f}"} + await ergo.broadcastAs(username, "CACHEBUST") + return {"url": f"https://{config.MINIO_EXTERNAL_ADDR}/pfp/{username}?{time.time():.0f}"} @router.post("/pfp/uploadIcon", dependencies=[Depends(JWTBearer())]) -async def pfpUpload(file: UploadFile, request: Request): +async def IconUpload(file: UploadFile, request: Request): if file.size > config.MAX_PFP_SIZE: return {"error": "file too big"} whoami = request.state.jwt @@ -62,5 +63,6 @@ async def pfpUpload(file: UploadFile, request: Request): file.file.seek(0) mime = mimetypes.guess_type(file.filename) - minioClient.put_object("pfp", username+"/icon", file.file, file.size, content_type=mime[0]) - return {"url": f"https://{config.MINIO_ADDR}/pfp/{username}/icon?{time.time():.0f}"} \ No newline at end of file + minioClient.put_object("pfp", username+".icon", file.file, file.size, content_type=mime[0]) + await ergo.broadcastAs(username, "CACHEBUST") + return {"url": f"https://{config.MINIO_EXTERNAL_ADDR}/pfp/{username}.icon?{time.time():.0f}"} \ No newline at end of file diff --git a/cef_3M/sql.py b/cef_3M/sql.py index 9f2a676..e9ddba8 100644 --- a/cef_3M/sql.py +++ b/cef_3M/sql.py @@ -7,7 +7,9 @@ alembic = configparser.ConfigParser() alembic.read("alembic.ini") engine = create_engine( - alembic.get("alembic", "sqlalchemy.url") + alembic.get("alembic", "sqlalchemy.url"), + pool_pre_ping=True, + pool_recycle=3600 ) SessionMaker = sessionmaker(autocommit=False, autoflush=False, bind=engine) diff --git a/cef_3M/util.py b/cef_3M/util.py index d55bc69..de939a4 100644 --- a/cef_3M/util.py +++ b/cef_3M/util.py @@ -1,8 +1,9 @@ +import asyncio import hashlib import re import config from minio import Minio - +from redis import Redis from fastapi import UploadFile @@ -18,7 +19,51 @@ async def SHA256(f: UploadFile) -> str: return sha.hexdigest() minioClient = Minio( - config.MINIO_ADDR, + config.MINIO_INTERNAL_ADDR, + secure=False, # you will probably not have SSL access_key=config.MINIO_ACCESS_KEY, secret_key=config.MINIO_SECRET_KEY, ) + +redis = Redis(host='localhost', port=6379, db=0, protocol=3) + +class ErgoClient: + def __init__(self): + self.reader = None + self.writer = None + asyncio.get_running_loop().create_task(self.init()) + + @staticmethod + def retry(f): + async def wrapper(self, *args, **kwargs): + i = 30 + while i: + try: + return await f(self, *args, **kwargs) + except RuntimeError: + self.init() + i -= 1 + print("Couldn't connect") + return wrapper + + @retry + async def init(self): + self.reader, self.writer = await asyncio.open_connection(config.ERGO_ADDR, config.ERGO_PORT) + + + + @retry + async def write(self, msg): + + self.writer.write(msg+b"\n") + await self.writer.drain() + + async def broadcastAs(self, user, *message): + await self.write(f"BROADCASTAS {user} {' '.join(message)}".encode("utf8")) + + + async def broadcastTo(self, user, *message): + await self.write(f"BROADCASTTO {user} {' '.join(message)}".encode("utf8")) + + +ergo = ErgoClient() diff --git a/config.example.py b/config.example.py index 5826303..89abc50 100644 --- a/config.example.py +++ b/config.example.py @@ -1,7 +1,12 @@ import os SECRETKEY = os.path.join("secrets", "pubkey.pem") -MINIO_ADDR = "data.example.xyz" +# CEF-specific port (grumble communicates over it as well) +ERGO_ADDR = "127.0.0.1" +ERGO_PORT = 22843 + +MINIO_INTERNAL_ADDR = "http://127.0.0.1:9000" # dodges nginx +MINIO_EXTERNAL_ADDR = "data.example.xyz" MINIO_ACCESS_KEY = "access-key-goes-here" MINIO_SECRET_KEY = "secret-key-goes-here"