3M/cef_3M/util.py
2025-03-02 23:57:28 +00:00

107 lines
3.2 KiB
Python

import asyncio
import hashlib
import json
import re
import socket
import traceback
import MySQLdb
import config
from minio import Minio
from redis.asyncio import Redis
from pywebpush import WebPusher
from .sql import SessionMaker, AlertEndpoints
from fastapi import UploadFile
from .sql_generated import History
safeName = re.compile(r"[^\w\d\.-]")
# If this gets too out of hand, put an async breakpoint to allow other things to be handled while the hash occurs
async def SHA256(f: UploadFile) -> str:
sha = hashlib.sha256()
while data := await f.read(65535):
sha.update(data)
await f.seek(0)
return sha.hexdigest()
minioClient = Minio(
config.MINIO_INTERNAL_ADDR,
secure=False, # you will probably not have SSL
access_key=config.MINIO_ACCESS_KEY,
secret_key=config.MINIO_SECRET_KEY,
)
redis = Redis(host=config.REDIS_ADDR, port=6379, db=0, protocol=3)
class ErgoClient:
async def init(self):
asyncio.get_running_loop().create_task(self.userSub())
async def handleMention(self, username: str, channel: str, msgid: str):
session = SessionMaker()
for target in session.query(AlertEndpoints).filter(AlertEndpoints.username == username):
pusher = WebPusher({
"endpoint": target.url,
"keys": {
"auth": target.auth,
"p256dh": target.p256dh
}
})
messageQuery: History = session.query(History).filter(History.msgid == int(msgid)).first()
message = json.loads(messageQuery.data.decode("utf8"))
encoded = json.dumps({
"channel": channel,
"from": message["Account"],
"content": message["Message"]["Message"]
}).encode("utf8")
await pusher.send_async(encoded)
session.close()
async def handleUserMessage(self, message):
if message:
print(f"({message['channel']}) Message Received: {message['data']}")
username = message["channel"].split(b".", 1)[1].decode("utf8")
line = message["data"].decode("utf8").split()
if line[0] == "MENTION":
await self.handleMention(username, line[1], line[2])
async def userSub(self):
async with redis.pubsub() as pubsub:
await pubsub.psubscribe("user.*")
while True:
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=None)
try:
await self.handleUserMessage(message)
except:
print("Error in handling user message")
traceback.print_exc()
async def broadcastAs(self, user, *message):
await redis.publish(f"user.{user}", f"BROADCASTAS {' '.join(message)}")
async def fullyRemoveUser(self, user):
await redis.publish(f"user.{user}", f"FULLYREMOVE")
async def broadcastTo(self, channel, *message):
await redis.publish(f"channel.{channel}", f"BROADCASTTO {' '.join(message)}")
ergo = ErgoClient()
privilegedIps = set()
for host in config.PRIVILEGED_HOSTS:
for addr in [x[-1][0] for x in socket.getaddrinfo(host, 0)]:
privilegedIps.add(addr)