diff --git a/Dockerfile b/Dockerfile index 9ebd232..0df60d7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ FROM docker.io/python:3.12-alpine WORKDIR /3m -RUN apk update && apk add git openssl mariadb-connector-c-dev build-base linux-headers +RUN apk update && apk add git openssl mariadb-connector-c-dev build-base linux-headers ffmpeg COPY ./requirements.txt /app/requirements.txt diff --git a/cef_3M/__init__.py b/cef_3M/__init__.py index 88af95f..10c037f 100644 --- a/cef_3M/__init__.py +++ b/cef_3M/__init__.py @@ -1,3 +1,4 @@ +import asyncio import datetime from fastapi import FastAPI @@ -11,6 +12,7 @@ from .util import minioClient, ergo from fastapi_utils.tasks import repeat_every from sqlalchemy import and_ from contextlib import asynccontextmanager +from .endpoints.mediamtx import mediamtxPoll @repeat_every(seconds=60 * 60) @@ -37,6 +39,9 @@ async def cleanup(): @asynccontextmanager async def onStart(app: FastAPI): await cleanup() + await util.ergo.init() + asyncio.get_running_loop().create_task(mediamtxPoll()) + print("Initialized") yield diff --git a/cef_3M/endpoints/mediamtx.py b/cef_3M/endpoints/mediamtx.py index e378cae..ae8aed5 100644 --- a/cef_3M/endpoints/mediamtx.py +++ b/cef_3M/endpoints/mediamtx.py @@ -1,5 +1,14 @@ -from starlette.responses import JSONResponse +import asyncio +import subprocess +import time +from enum import Enum +import aiohttp +import ffmpeg +import select +from starlette.responses import JSONResponse, Response, FileResponse + +from config import MEDIAMTX_API, MEDIAMTX_RTSP from . import router from fastapi import Request, Depends @@ -18,8 +27,11 @@ def pathParts(path): return None, None, None return channel, user, token +class TargetType(str, Enum): + user = "u" + channel = "c" -@router.get("/mediamtx/streams/{channel}", dependencies=[Depends(JWTBearer())]) +@router.get("/mediamtx/streams/{channel}", dependencies=[Depends(JWTBearer(True))]) async def mediamtxChannelStreams(request: Request, channel: str): inChannel = request.state.jwt.get("channel", "").lower() == "#" + channel.lower() results = [] @@ -32,12 +44,70 @@ async def mediamtxChannelStreams(request: Request, channel: str): }) return JSONResponse(status_code=200, content=results) +@router.get("/mediamtx/thumbnail/{user}/{key}") +@router.get("/mediamtx/thumbnail/{channel}/{user}/{key}") +async def mediamtxThumbnail(user: str, key: str, channel: str = None): + blob = "".join([x for x in (channel, user, key) if x]) + + if "?" in blob: + return Response(status_code=404) + print(f"rtsp://{MEDIAMTX_RTSP}/" + "/".join([x for x in (channel, user, key) if x])) + proc: subprocess.Popen = ffmpeg.input(f"rtsp://{MEDIAMTX_RTSP}/" + "/".join([x for x in (channel, user, key) if x]), + ).output('pipe:', loglevel="quiet", vframes=1, format='image2', vcodec='mjpeg').run_async(pipe_stdout=True) + timeout = time.time() + 8 + data = b"" + + while 1: + if select.select([proc.stdout], [], [], 0)[0]: + data += proc.stdout.read(65535) + if proc.poll() is not None: + break + else: + if timeout < time.time(): + proc.kill() + return Response(status_code=503) + await asyncio.sleep(0.1) + + return Response(content=data, media_type="image/jpeg", headers={ + "Cache-Control": "public, max-age=60, stale-while-revalidate=300" + }) + +@router.get("/mediamtx/public", dependencies=[]) +async def mediamtxAllPublicStreams(): + results = [] + async for result in redis.scan_iter(f"stream *"): + _, channel, user, token = result.decode("utf8").split() + if token == "public": + results.append({ + "target": channel, + "user": user, + "token": token + }) + return JSONResponse(status_code=200, content=results) + +@router.get("/mediamtx/public/{targetType}/{target}", dependencies=[]) +async def mediamtxEntityPublicStreams(targetType: TargetType, target: str): + if targetType == TargetType.channel: + target = "#" + target + results = [] + async for result in redis.scan_iter(f"stream {target} *"): + _, channel, user, token = result.decode("utf8").split() + print(result) + if token == "public": + results.append({ + "user": user, + "token": token + }) + return JSONResponse(status_code=200, content=results) @router.post("/mediamtx/auth", include_in_schema=False) async def mediamtxAuth(request: Request): if request.client.host not in privilegedIps: return False body = await request.json() + # Just never expose RTSP :) + if body["protocol"] == "rtsp": + return JSONResponse(status_code=200, content={"success": True}) jwt = decodeJWT(body["query"][4:]) path = body["path"].split("/") reading = body["action"] == "read" @@ -74,7 +144,7 @@ async def mediamtxAdd(request: Request): body = await request.json() path = body["env"]["MTX_PATH"].split("/") parts = [x for x in pathParts(path) if x] - await redis.set("stream " + " ".join(parts), parts[2]) + await redis.set("stream " + " ".join(parts), parts[2], ex=60) if len(parts) == 3: await ergo.broadcastTo(parts[0], "STREAMSTART", parts[0], parts[1], parts[2]) @@ -89,3 +159,16 @@ async def mediamtxDelete(request: Request): await redis.delete("stream " + " ".join(parts)) if len(parts) == 3: await ergo.broadcastTo(parts[0], "STREAMEND", parts[0], parts[1], parts[2]) + +# Not an endpoint +async def mediamtxPoll(): + while 1: + async with aiohttp.ClientSession() as sess: + req = await sess.get(MEDIAMTX_API +"/v3/paths/list?itemsPerPage=9999999") + streams = await req.json() + for stream in streams["items"]: + if stream["ready"]: + path = stream["name"].split("/") + parts = [x for x in pathParts(path) if x] + await redis.set(f"stream " + " ".join(parts), parts[-1], ex=60) + await asyncio.sleep(30) diff --git a/cef_3M/util.py b/cef_3M/util.py index 66292c1..f7d8122 100644 --- a/cef_3M/util.py +++ b/cef_3M/util.py @@ -39,23 +39,9 @@ redis = Redis(host=config.REDIS_ADDR, port=6379, db=0, protocol=3) class ErgoClient: - def __init__(self): - self.reader = None - self.writer = None - asyncio.get_running_loop().create_task(self.init()) - async def init(self): - self.reader, self.writer = await asyncio.open_connection(config.ERGO_ADDR, config.ERGO_PORT) - await asyncio.create_task(self.readEvents()) + asyncio.get_running_loop().create_task(self.userSub()) - async def readEvents(self): - while 1: - rawLine = await self.reader.readline() - if not rawLine: - break - line = rawLine.decode("utf8").strip().split() - if line[0] == "MENTION": - await self.handleMention(line[1], line[2], line[3]) async def handleMention(self, username: str, channel: str, msgid: str): session = SessionMaker() @@ -78,23 +64,35 @@ class ErgoClient: await pusher.send_async(encoded) session.close() - async def write(self, msg): - if self.writer is None: - for _ in range(30): - await asyncio.sleep(1) - if self.writer: - break - self.writer.write(msg + b"\n") - await self.writer.drain() + async def handleUserMessage(self, message): + if message: + print(f"({message['channel']}) Message Received: {message['data']}") + username = message["channel"].split(b".", 1)[1].decode("utf8") + line = message["data"].decode("utf8").split() + if line[0] == "MENTION": + await self.handleMention(username, line[1], line[2]) + + async def userSub(self): + async with redis.pubsub() as pubsub: + await pubsub.psubscribe("user.*") + while True: + message = await pubsub.get_message(ignore_subscribe_messages=True) + try: + await self.handleUserMessage(message) + except Exception as e: + print("Error in handling user message", e) + + async def broadcastAs(self, user, *message): - await self.write(f"BROADCASTAS {user} {' '.join(message)}".encode("utf8")) + await redis.publish(f"user.{user}", f"BROADCASTAS {' '.join(message)}") async def fullyRemoveUser(self, user): - await self.write(f"FULLYREMOVE {user}".encode("utf8")) + await redis.publish(f"user.{user}", f"FULLYREMOVE") + + async def broadcastTo(self, channel, *message): + await redis.publish(f"channel.{channel}", f"BROADCASTTO {' '.join(message)}") - async def broadcastTo(self, user, *message): - await self.write(f"BROADCASTTO {user} {' '.join(message)}".encode("utf8")) ergo = ErgoClient() diff --git a/config.example.py b/config.example.py index f55e741..dceb358 100644 --- a/config.example.py +++ b/config.example.py @@ -11,6 +11,9 @@ MINIO_ACCESS_KEY = os.getenv("THREEM_MINIO_ACCESS_KEY") or "access-key-goes-here MINIO_SECRET_KEY = os.getenv("THREEM_MINIO_SECRET_KEY") or "secret-key-goes-here" DBURL = os.getenv("THREEM_DBURL") or "mysql+mysqldb://ergo:password@localhost/ergo_ext" REDIS_ADDR = os.getenv("THREEM_REDIS_ADDR") or "localhost" +MEDIAMTX_API = os.getenv("THREEM_MEDIAMTX_API") or "http://localhost:9997" +MEDIAMTX_RTSP = os.getenv("THREEM_MEDIAMTX_RTSP") or "localhost:8554" + MAX_FILE_SIZE = 1024*1024*20 MAX_PFP_SIZE = 1024*1024*1.5