add handling for public streams
new env vars for mediamtx switch to redis for ipc
This commit is contained in:
parent
215ffced7c
commit
6233c96e6a
5 changed files with 120 additions and 31 deletions
|
|
@ -1,6 +1,6 @@
|
||||||
FROM docker.io/python:3.12-alpine
|
FROM docker.io/python:3.12-alpine
|
||||||
WORKDIR /3m
|
WORKDIR /3m
|
||||||
RUN apk update && apk add git openssl mariadb-connector-c-dev build-base linux-headers
|
RUN apk update && apk add git openssl mariadb-connector-c-dev build-base linux-headers ffmpeg
|
||||||
|
|
||||||
COPY ./requirements.txt /app/requirements.txt
|
COPY ./requirements.txt /app/requirements.txt
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,4 @@
|
||||||
|
import asyncio
|
||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
|
|
@ -11,6 +12,7 @@ from .util import minioClient, ergo
|
||||||
from fastapi_utils.tasks import repeat_every
|
from fastapi_utils.tasks import repeat_every
|
||||||
from sqlalchemy import and_
|
from sqlalchemy import and_
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
|
from .endpoints.mediamtx import mediamtxPoll
|
||||||
|
|
||||||
|
|
||||||
@repeat_every(seconds=60 * 60)
|
@repeat_every(seconds=60 * 60)
|
||||||
|
|
@ -37,6 +39,9 @@ async def cleanup():
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def onStart(app: FastAPI):
|
async def onStart(app: FastAPI):
|
||||||
await cleanup()
|
await cleanup()
|
||||||
|
await util.ergo.init()
|
||||||
|
asyncio.get_running_loop().create_task(mediamtxPoll())
|
||||||
|
print("Initialized")
|
||||||
yield
|
yield
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,14 @@
|
||||||
from starlette.responses import JSONResponse
|
import asyncio
|
||||||
|
import subprocess
|
||||||
|
import time
|
||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
import aiohttp
|
||||||
|
import ffmpeg
|
||||||
|
import select
|
||||||
|
from starlette.responses import JSONResponse, Response, FileResponse
|
||||||
|
|
||||||
|
from config import MEDIAMTX_API, MEDIAMTX_RTSP
|
||||||
from . import router
|
from . import router
|
||||||
from fastapi import Request, Depends
|
from fastapi import Request, Depends
|
||||||
|
|
||||||
|
|
@ -18,8 +27,11 @@ def pathParts(path):
|
||||||
return None, None, None
|
return None, None, None
|
||||||
return channel, user, token
|
return channel, user, token
|
||||||
|
|
||||||
|
class TargetType(str, Enum):
|
||||||
|
user = "u"
|
||||||
|
channel = "c"
|
||||||
|
|
||||||
@router.get("/mediamtx/streams/{channel}", dependencies=[Depends(JWTBearer())])
|
@router.get("/mediamtx/streams/{channel}", dependencies=[Depends(JWTBearer(True))])
|
||||||
async def mediamtxChannelStreams(request: Request, channel: str):
|
async def mediamtxChannelStreams(request: Request, channel: str):
|
||||||
inChannel = request.state.jwt.get("channel", "").lower() == "#" + channel.lower()
|
inChannel = request.state.jwt.get("channel", "").lower() == "#" + channel.lower()
|
||||||
results = []
|
results = []
|
||||||
|
|
@ -32,12 +44,70 @@ async def mediamtxChannelStreams(request: Request, channel: str):
|
||||||
})
|
})
|
||||||
return JSONResponse(status_code=200, content=results)
|
return JSONResponse(status_code=200, content=results)
|
||||||
|
|
||||||
|
@router.get("/mediamtx/thumbnail/{user}/{key}")
|
||||||
|
@router.get("/mediamtx/thumbnail/{channel}/{user}/{key}")
|
||||||
|
async def mediamtxThumbnail(user: str, key: str, channel: str = None):
|
||||||
|
blob = "".join([x for x in (channel, user, key) if x])
|
||||||
|
|
||||||
|
if "?" in blob:
|
||||||
|
return Response(status_code=404)
|
||||||
|
print(f"rtsp://{MEDIAMTX_RTSP}/" + "/".join([x for x in (channel, user, key) if x]))
|
||||||
|
proc: subprocess.Popen = ffmpeg.input(f"rtsp://{MEDIAMTX_RTSP}/" + "/".join([x for x in (channel, user, key) if x]),
|
||||||
|
).output('pipe:', loglevel="quiet", vframes=1, format='image2', vcodec='mjpeg').run_async(pipe_stdout=True)
|
||||||
|
timeout = time.time() + 8
|
||||||
|
data = b""
|
||||||
|
|
||||||
|
while 1:
|
||||||
|
if select.select([proc.stdout], [], [], 0)[0]:
|
||||||
|
data += proc.stdout.read(65535)
|
||||||
|
if proc.poll() is not None:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
if timeout < time.time():
|
||||||
|
proc.kill()
|
||||||
|
return Response(status_code=503)
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
|
||||||
|
return Response(content=data, media_type="image/jpeg", headers={
|
||||||
|
"Cache-Control": "public, max-age=60, stale-while-revalidate=300"
|
||||||
|
})
|
||||||
|
|
||||||
|
@router.get("/mediamtx/public", dependencies=[])
|
||||||
|
async def mediamtxAllPublicStreams():
|
||||||
|
results = []
|
||||||
|
async for result in redis.scan_iter(f"stream *"):
|
||||||
|
_, channel, user, token = result.decode("utf8").split()
|
||||||
|
if token == "public":
|
||||||
|
results.append({
|
||||||
|
"target": channel,
|
||||||
|
"user": user,
|
||||||
|
"token": token
|
||||||
|
})
|
||||||
|
return JSONResponse(status_code=200, content=results)
|
||||||
|
|
||||||
|
@router.get("/mediamtx/public/{targetType}/{target}", dependencies=[])
|
||||||
|
async def mediamtxEntityPublicStreams(targetType: TargetType, target: str):
|
||||||
|
if targetType == TargetType.channel:
|
||||||
|
target = "#" + target
|
||||||
|
results = []
|
||||||
|
async for result in redis.scan_iter(f"stream {target} *"):
|
||||||
|
_, channel, user, token = result.decode("utf8").split()
|
||||||
|
print(result)
|
||||||
|
if token == "public":
|
||||||
|
results.append({
|
||||||
|
"user": user,
|
||||||
|
"token": token
|
||||||
|
})
|
||||||
|
return JSONResponse(status_code=200, content=results)
|
||||||
|
|
||||||
@router.post("/mediamtx/auth", include_in_schema=False)
|
@router.post("/mediamtx/auth", include_in_schema=False)
|
||||||
async def mediamtxAuth(request: Request):
|
async def mediamtxAuth(request: Request):
|
||||||
if request.client.host not in privilegedIps:
|
if request.client.host not in privilegedIps:
|
||||||
return False
|
return False
|
||||||
body = await request.json()
|
body = await request.json()
|
||||||
|
# Just never expose RTSP :)
|
||||||
|
if body["protocol"] == "rtsp":
|
||||||
|
return JSONResponse(status_code=200, content={"success": True})
|
||||||
jwt = decodeJWT(body["query"][4:])
|
jwt = decodeJWT(body["query"][4:])
|
||||||
path = body["path"].split("/")
|
path = body["path"].split("/")
|
||||||
reading = body["action"] == "read"
|
reading = body["action"] == "read"
|
||||||
|
|
@ -74,7 +144,7 @@ async def mediamtxAdd(request: Request):
|
||||||
body = await request.json()
|
body = await request.json()
|
||||||
path = body["env"]["MTX_PATH"].split("/")
|
path = body["env"]["MTX_PATH"].split("/")
|
||||||
parts = [x for x in pathParts(path) if x]
|
parts = [x for x in pathParts(path) if x]
|
||||||
await redis.set("stream " + " ".join(parts), parts[2])
|
await redis.set("stream " + " ".join(parts), parts[2], ex=60)
|
||||||
if len(parts) == 3:
|
if len(parts) == 3:
|
||||||
await ergo.broadcastTo(parts[0], "STREAMSTART", parts[0], parts[1], parts[2])
|
await ergo.broadcastTo(parts[0], "STREAMSTART", parts[0], parts[1], parts[2])
|
||||||
|
|
||||||
|
|
@ -89,3 +159,16 @@ async def mediamtxDelete(request: Request):
|
||||||
await redis.delete("stream " + " ".join(parts))
|
await redis.delete("stream " + " ".join(parts))
|
||||||
if len(parts) == 3:
|
if len(parts) == 3:
|
||||||
await ergo.broadcastTo(parts[0], "STREAMEND", parts[0], parts[1], parts[2])
|
await ergo.broadcastTo(parts[0], "STREAMEND", parts[0], parts[1], parts[2])
|
||||||
|
|
||||||
|
# Not an endpoint
|
||||||
|
async def mediamtxPoll():
|
||||||
|
while 1:
|
||||||
|
async with aiohttp.ClientSession() as sess:
|
||||||
|
req = await sess.get(MEDIAMTX_API +"/v3/paths/list?itemsPerPage=9999999")
|
||||||
|
streams = await req.json()
|
||||||
|
for stream in streams["items"]:
|
||||||
|
if stream["ready"]:
|
||||||
|
path = stream["name"].split("/")
|
||||||
|
parts = [x for x in pathParts(path) if x]
|
||||||
|
await redis.set(f"stream " + " ".join(parts), parts[-1], ex=60)
|
||||||
|
await asyncio.sleep(30)
|
||||||
|
|
|
||||||
|
|
@ -39,23 +39,9 @@ redis = Redis(host=config.REDIS_ADDR, port=6379, db=0, protocol=3)
|
||||||
|
|
||||||
|
|
||||||
class ErgoClient:
|
class ErgoClient:
|
||||||
def __init__(self):
|
|
||||||
self.reader = None
|
|
||||||
self.writer = None
|
|
||||||
asyncio.get_running_loop().create_task(self.init())
|
|
||||||
|
|
||||||
async def init(self):
|
async def init(self):
|
||||||
self.reader, self.writer = await asyncio.open_connection(config.ERGO_ADDR, config.ERGO_PORT)
|
asyncio.get_running_loop().create_task(self.userSub())
|
||||||
await asyncio.create_task(self.readEvents())
|
|
||||||
|
|
||||||
async def readEvents(self):
|
|
||||||
while 1:
|
|
||||||
rawLine = await self.reader.readline()
|
|
||||||
if not rawLine:
|
|
||||||
break
|
|
||||||
line = rawLine.decode("utf8").strip().split()
|
|
||||||
if line[0] == "MENTION":
|
|
||||||
await self.handleMention(line[1], line[2], line[3])
|
|
||||||
|
|
||||||
async def handleMention(self, username: str, channel: str, msgid: str):
|
async def handleMention(self, username: str, channel: str, msgid: str):
|
||||||
session = SessionMaker()
|
session = SessionMaker()
|
||||||
|
|
@ -78,23 +64,35 @@ class ErgoClient:
|
||||||
await pusher.send_async(encoded)
|
await pusher.send_async(encoded)
|
||||||
session.close()
|
session.close()
|
||||||
|
|
||||||
async def write(self, msg):
|
async def handleUserMessage(self, message):
|
||||||
if self.writer is None:
|
if message:
|
||||||
for _ in range(30):
|
print(f"({message['channel']}) Message Received: {message['data']}")
|
||||||
await asyncio.sleep(1)
|
username = message["channel"].split(b".", 1)[1].decode("utf8")
|
||||||
if self.writer:
|
line = message["data"].decode("utf8").split()
|
||||||
break
|
if line[0] == "MENTION":
|
||||||
self.writer.write(msg + b"\n")
|
await self.handleMention(username, line[1], line[2])
|
||||||
await self.writer.drain()
|
|
||||||
|
async def userSub(self):
|
||||||
|
async with redis.pubsub() as pubsub:
|
||||||
|
await pubsub.psubscribe("user.*")
|
||||||
|
while True:
|
||||||
|
message = await pubsub.get_message(ignore_subscribe_messages=True)
|
||||||
|
try:
|
||||||
|
await self.handleUserMessage(message)
|
||||||
|
except Exception as e:
|
||||||
|
print("Error in handling user message", e)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
async def broadcastAs(self, user, *message):
|
async def broadcastAs(self, user, *message):
|
||||||
await self.write(f"BROADCASTAS {user} {' '.join(message)}".encode("utf8"))
|
await redis.publish(f"user.{user}", f"BROADCASTAS {' '.join(message)}")
|
||||||
|
|
||||||
async def fullyRemoveUser(self, user):
|
async def fullyRemoveUser(self, user):
|
||||||
await self.write(f"FULLYREMOVE {user}".encode("utf8"))
|
await redis.publish(f"user.{user}", f"FULLYREMOVE")
|
||||||
|
|
||||||
|
async def broadcastTo(self, channel, *message):
|
||||||
|
await redis.publish(f"channel.{channel}", f"BROADCASTTO {' '.join(message)}")
|
||||||
|
|
||||||
async def broadcastTo(self, user, *message):
|
|
||||||
await self.write(f"BROADCASTTO {user} {' '.join(message)}".encode("utf8"))
|
|
||||||
|
|
||||||
|
|
||||||
ergo = ErgoClient()
|
ergo = ErgoClient()
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,9 @@ MINIO_ACCESS_KEY = os.getenv("THREEM_MINIO_ACCESS_KEY") or "access-key-goes-here
|
||||||
MINIO_SECRET_KEY = os.getenv("THREEM_MINIO_SECRET_KEY") or "secret-key-goes-here"
|
MINIO_SECRET_KEY = os.getenv("THREEM_MINIO_SECRET_KEY") or "secret-key-goes-here"
|
||||||
DBURL = os.getenv("THREEM_DBURL") or "mysql+mysqldb://ergo:password@localhost/ergo_ext"
|
DBURL = os.getenv("THREEM_DBURL") or "mysql+mysqldb://ergo:password@localhost/ergo_ext"
|
||||||
REDIS_ADDR = os.getenv("THREEM_REDIS_ADDR") or "localhost"
|
REDIS_ADDR = os.getenv("THREEM_REDIS_ADDR") or "localhost"
|
||||||
|
MEDIAMTX_API = os.getenv("THREEM_MEDIAMTX_API") or "http://localhost:9997"
|
||||||
|
MEDIAMTX_RTSP = os.getenv("THREEM_MEDIAMTX_RTSP") or "localhost:8554"
|
||||||
|
|
||||||
|
|
||||||
MAX_FILE_SIZE = 1024*1024*20
|
MAX_FILE_SIZE = 1024*1024*20
|
||||||
MAX_PFP_SIZE = 1024*1024*1.5
|
MAX_PFP_SIZE = 1024*1024*1.5
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue