add video streaming endpoints
adjust config for bypassing nginx add communication to ergo add cachebusting + fixing icons
This commit is contained in:
parent
15e1018476
commit
6a69c5a34d
6 changed files with 160 additions and 10 deletions
|
|
@ -5,6 +5,7 @@ from fastapi.security import HTTPBearer
|
||||||
import config
|
import config
|
||||||
from fastapi import Request, HTTPException
|
from fastapi import Request, HTTPException
|
||||||
|
|
||||||
|
|
||||||
JWT_PUBKEY = open(config.SECRETKEY).read()
|
JWT_PUBKEY = open(config.SECRETKEY).read()
|
||||||
JWT_ALGORITHM = "RS256"
|
JWT_ALGORITHM = "RS256"
|
||||||
|
|
||||||
|
|
@ -13,7 +14,7 @@ def decodeJWT(token: str) -> dict:
|
||||||
try:
|
try:
|
||||||
decoded_token = jwt.decode(token, JWT_PUBKEY, algorithms=[JWT_ALGORITHM])
|
decoded_token = jwt.decode(token, JWT_PUBKEY, algorithms=[JWT_ALGORITHM])
|
||||||
return decoded_token if decoded_token["exp"] >= time.time() else None
|
return decoded_token if decoded_token["exp"] >= time.time() else None
|
||||||
except:
|
except Exception as e:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
95
cef_3M/endpoints/mediamtx.py
Normal file
95
cef_3M/endpoints/mediamtx.py
Normal file
|
|
@ -0,0 +1,95 @@
|
||||||
|
from starlette.responses import JSONResponse
|
||||||
|
|
||||||
|
from . import router
|
||||||
|
from fastapi import Request, Depends
|
||||||
|
|
||||||
|
from ..auth import decodeJWT, JWTBearer
|
||||||
|
from ..util import redis, ergo
|
||||||
|
|
||||||
|
|
||||||
|
def pathParts(path):
|
||||||
|
if len(path) == 2:
|
||||||
|
channel = None
|
||||||
|
user, token = path
|
||||||
|
elif len(path) == 3:
|
||||||
|
channel, user, token = path
|
||||||
|
channel = "#" + channel
|
||||||
|
else:
|
||||||
|
return None, None, None
|
||||||
|
return channel, user, token
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/mediamtx/streams/{channel}", dependencies=[Depends(JWTBearer())])
|
||||||
|
async def mediamtxChannelStreams(request: Request, channel: str):
|
||||||
|
inChannel = request.state.jwt.get("channel", "").lower() == "#" + channel.lower()
|
||||||
|
results = []
|
||||||
|
for result in redis.scan_iter(f"stream #{channel} *"):
|
||||||
|
_, channel, user, token = result.decode("utf8").split()
|
||||||
|
if inChannel or token == "public":
|
||||||
|
results.append({
|
||||||
|
"user": user,
|
||||||
|
"token": token
|
||||||
|
})
|
||||||
|
return JSONResponse(status_code=200, content=results)
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/mediamtx/auth", include_in_schema=False)
|
||||||
|
async def mediamtxAuth(request: Request):
|
||||||
|
if request.client.host != "127.0.0.1":
|
||||||
|
return False
|
||||||
|
body = await request.json()
|
||||||
|
jwt = decodeJWT(body["query"][4:])
|
||||||
|
path = body["path"].split("/")
|
||||||
|
reading = body["action"] == "read"
|
||||||
|
|
||||||
|
channel, user, token = pathParts(path)
|
||||||
|
|
||||||
|
if user is None:
|
||||||
|
return JSONResponse(status_code=400, content={"error": "bad path"})
|
||||||
|
if " " in token:
|
||||||
|
return JSONResponse(status_code=400, content={"error": "bad token"})
|
||||||
|
|
||||||
|
# the only time we don't care about JWT is if someone is watching a public stream
|
||||||
|
if reading and token == "public":
|
||||||
|
return JSONResponse(status_code=200, content={"success": True})
|
||||||
|
|
||||||
|
if len(jwt.keys()) == 0:
|
||||||
|
return JSONResponse(status_code=403, content={"error": "bad jwt"})
|
||||||
|
|
||||||
|
# TODO: channel stream permissions
|
||||||
|
# publishing
|
||||||
|
if not reading:
|
||||||
|
if user != jwt["account"]:
|
||||||
|
return JSONResponse(status_code=403, content={"error": "nuh uh"})
|
||||||
|
if channel and jwt["channel"] != channel:
|
||||||
|
return JSONResponse(status_code=403, content={"error": "nuh uh"})
|
||||||
|
|
||||||
|
return JSONResponse(status_code=200, content={"success": True})
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/mediamtx/add", include_in_schema=False)
|
||||||
|
async def mediamtxAdd(request: Request):
|
||||||
|
if request.client.host != "127.0.0.1":
|
||||||
|
return False
|
||||||
|
body = await request.json()
|
||||||
|
path = body["env"]["MTX_PATH"].split("/")
|
||||||
|
parts = [x for x in pathParts(path) if x]
|
||||||
|
redis.set("stream " + " ".join(parts), parts[2])
|
||||||
|
if len(parts) == 3:
|
||||||
|
await ergo.broadcastTo(parts[0], "STREAMSTART", parts[0], parts[1], parts[2])
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/mediamtx/del", include_in_schema=False)
|
||||||
|
async def mediamtxDelete(request: Request):
|
||||||
|
if request.client.host != "127.0.0.1":
|
||||||
|
return False
|
||||||
|
body = await request.json()
|
||||||
|
path = body["env"]["MTX_PATH"].split("/")
|
||||||
|
parts = [x for x in pathParts(path) if x]
|
||||||
|
redis.delete("stream " + " ".join(parts))
|
||||||
|
if len(parts) == 3:
|
||||||
|
await ergo.broadcastTo(parts[0], "STREAMEND", parts[0], parts[1], parts[2])
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -4,7 +4,7 @@ import time
|
||||||
from . import router
|
from . import router
|
||||||
from fastapi import UploadFile, Request, Depends
|
from fastapi import UploadFile, Request, Depends
|
||||||
|
|
||||||
from ..util import minioClient
|
from ..util import minioClient, ergo
|
||||||
from ..auth import JWTBearer
|
from ..auth import JWTBearer
|
||||||
import config
|
import config
|
||||||
|
|
||||||
|
|
@ -45,10 +45,11 @@ async def pfpUpload(file: UploadFile, request: Request):
|
||||||
|
|
||||||
mime = mimetypes.guess_type(file.filename)
|
mime = mimetypes.guess_type(file.filename)
|
||||||
minioClient.put_object("pfp", username, file.file, file.size, content_type=mime[0])
|
minioClient.put_object("pfp", username, file.file, file.size, content_type=mime[0])
|
||||||
return {"url": f"https://{config.MINIO_ADDR}/pfp/{username}?{time.time():.0f}"}
|
await ergo.broadcastAs(username, "CACHEBUST")
|
||||||
|
return {"url": f"https://{config.MINIO_EXTERNAL_ADDR}/pfp/{username}?{time.time():.0f}"}
|
||||||
|
|
||||||
@router.post("/pfp/uploadIcon", dependencies=[Depends(JWTBearer())])
|
@router.post("/pfp/uploadIcon", dependencies=[Depends(JWTBearer())])
|
||||||
async def pfpUpload(file: UploadFile, request: Request):
|
async def IconUpload(file: UploadFile, request: Request):
|
||||||
if file.size > config.MAX_PFP_SIZE:
|
if file.size > config.MAX_PFP_SIZE:
|
||||||
return {"error": "file too big"}
|
return {"error": "file too big"}
|
||||||
whoami = request.state.jwt
|
whoami = request.state.jwt
|
||||||
|
|
@ -62,5 +63,6 @@ async def pfpUpload(file: UploadFile, request: Request):
|
||||||
file.file.seek(0)
|
file.file.seek(0)
|
||||||
|
|
||||||
mime = mimetypes.guess_type(file.filename)
|
mime = mimetypes.guess_type(file.filename)
|
||||||
minioClient.put_object("pfp", username+"/icon", file.file, file.size, content_type=mime[0])
|
minioClient.put_object("pfp", username+".icon", file.file, file.size, content_type=mime[0])
|
||||||
return {"url": f"https://{config.MINIO_ADDR}/pfp/{username}/icon?{time.time():.0f}"}
|
await ergo.broadcastAs(username, "CACHEBUST")
|
||||||
|
return {"url": f"https://{config.MINIO_EXTERNAL_ADDR}/pfp/{username}.icon?{time.time():.0f}"}
|
||||||
|
|
@ -7,7 +7,9 @@ alembic = configparser.ConfigParser()
|
||||||
alembic.read("alembic.ini")
|
alembic.read("alembic.ini")
|
||||||
|
|
||||||
engine = create_engine(
|
engine = create_engine(
|
||||||
alembic.get("alembic", "sqlalchemy.url")
|
alembic.get("alembic", "sqlalchemy.url"),
|
||||||
|
pool_pre_ping=True,
|
||||||
|
pool_recycle=3600
|
||||||
)
|
)
|
||||||
|
|
||||||
SessionMaker = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
SessionMaker = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,9 @@
|
||||||
|
import asyncio
|
||||||
import hashlib
|
import hashlib
|
||||||
import re
|
import re
|
||||||
import config
|
import config
|
||||||
from minio import Minio
|
from minio import Minio
|
||||||
|
from redis import Redis
|
||||||
|
|
||||||
from fastapi import UploadFile
|
from fastapi import UploadFile
|
||||||
|
|
||||||
|
|
@ -18,7 +19,51 @@ async def SHA256(f: UploadFile) -> str:
|
||||||
return sha.hexdigest()
|
return sha.hexdigest()
|
||||||
|
|
||||||
minioClient = Minio(
|
minioClient = Minio(
|
||||||
config.MINIO_ADDR,
|
config.MINIO_INTERNAL_ADDR,
|
||||||
|
secure=False, # you will probably not have SSL
|
||||||
access_key=config.MINIO_ACCESS_KEY,
|
access_key=config.MINIO_ACCESS_KEY,
|
||||||
secret_key=config.MINIO_SECRET_KEY,
|
secret_key=config.MINIO_SECRET_KEY,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
redis = Redis(host='localhost', port=6379, db=0, protocol=3)
|
||||||
|
|
||||||
|
class ErgoClient:
|
||||||
|
def __init__(self):
|
||||||
|
self.reader = None
|
||||||
|
self.writer = None
|
||||||
|
asyncio.get_running_loop().create_task(self.init())
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def retry(f):
|
||||||
|
async def wrapper(self, *args, **kwargs):
|
||||||
|
i = 30
|
||||||
|
while i:
|
||||||
|
try:
|
||||||
|
return await f(self, *args, **kwargs)
|
||||||
|
except RuntimeError:
|
||||||
|
self.init()
|
||||||
|
i -= 1
|
||||||
|
print("Couldn't connect")
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
@retry
|
||||||
|
async def init(self):
|
||||||
|
self.reader, self.writer = await asyncio.open_connection(config.ERGO_ADDR, config.ERGO_PORT)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@retry
|
||||||
|
async def write(self, msg):
|
||||||
|
|
||||||
|
self.writer.write(msg+b"\n")
|
||||||
|
await self.writer.drain()
|
||||||
|
|
||||||
|
async def broadcastAs(self, user, *message):
|
||||||
|
await self.write(f"BROADCASTAS {user} {' '.join(message)}".encode("utf8"))
|
||||||
|
|
||||||
|
|
||||||
|
async def broadcastTo(self, user, *message):
|
||||||
|
await self.write(f"BROADCASTTO {user} {' '.join(message)}".encode("utf8"))
|
||||||
|
|
||||||
|
|
||||||
|
ergo = ErgoClient()
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,12 @@
|
||||||
import os
|
import os
|
||||||
SECRETKEY = os.path.join("secrets", "pubkey.pem")
|
SECRETKEY = os.path.join("secrets", "pubkey.pem")
|
||||||
|
|
||||||
MINIO_ADDR = "data.example.xyz"
|
# CEF-specific port (grumble communicates over it as well)
|
||||||
|
ERGO_ADDR = "127.0.0.1"
|
||||||
|
ERGO_PORT = 22843
|
||||||
|
|
||||||
|
MINIO_INTERNAL_ADDR = "http://127.0.0.1:9000" # dodges nginx
|
||||||
|
MINIO_EXTERNAL_ADDR = "data.example.xyz"
|
||||||
MINIO_ACCESS_KEY = "access-key-goes-here"
|
MINIO_ACCESS_KEY = "access-key-goes-here"
|
||||||
MINIO_SECRET_KEY = "secret-key-goes-here"
|
MINIO_SECRET_KEY = "secret-key-goes-here"
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue