import asyncio import hashlib import json import re import socket import traceback import MySQLdb import config from minio import Minio from redis.asyncio import Redis from pywebpush import WebPusher from .sql import SessionMaker, AlertEndpoints, ergoQueryFetchOne from fastapi import UploadFile safeName = re.compile(r"[^\w\d\.-]") # If this gets too out of hand, put an async breakpoint to allow other things to be handled while the hash occurs async def SHA256(f: UploadFile) -> str: sha = hashlib.sha256() while data := await f.read(65535): sha.update(data) await f.seek(0) return sha.hexdigest() minioClient = Minio( config.MINIO_INTERNAL_ADDR, secure=False, # you will probably not have SSL access_key=config.MINIO_ACCESS_KEY, secret_key=config.MINIO_SECRET_KEY, ) redis = Redis(host=config.REDIS_ADDR, port=6379, db=0, protocol=3) class ErgoClient: async def init(self): asyncio.get_running_loop().create_task(self.userSub()) async def handleMention(self, username: str, channel: str, msgid: str): session = SessionMaker() for target in session.query(AlertEndpoints).filter(AlertEndpoints.username == username): pusher = WebPusher({ "endpoint": target.url, "keys": { "auth": target.auth, "p256dh": target.p256dh } }) messageQuery = ergoQueryFetchOne("SELECT `data` FROM `history` WHERE `msgid` = :id", id=int(msgid)).data message = json.loads(messageQuery.decode("utf8")) encoded = json.dumps({ "channel": channel, "from": message["AccountName"], "content": message["Message"]["Message"] }).encode("utf8") await pusher.send_async(encoded) session.close() async def handleUserMessage(self, message): if message: print(f"({message['channel']}) Message Received: {message['data']}") username = message["channel"].split(b".", 1)[1].decode("utf8") line = message["data"].decode("utf8").split() if line[0] == "MENTION": await self.handleMention(username, line[1], line[2]) async def userSub(self): async with redis.pubsub() as pubsub: await pubsub.psubscribe("user.*") while True: message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=None) try: await self.handleUserMessage(message) except Exception as e: print("Error in handling user message", e) async def broadcastAs(self, user, *message): await redis.publish(f"user.{user}", f"BROADCASTAS {' '.join(message)}") async def fullyRemoveUser(self, user): await redis.publish(f"user.{user}", f"FULLYREMOVE") async def broadcastTo(self, channel, *message): await redis.publish(f"channel.{channel}", f"BROADCASTTO {' '.join(message)}") ergo = ErgoClient() privilegedIps = set() for host in config.PRIVILEGED_HOSTS: for addr in [x[-1][0] for x in socket.getaddrinfo(host, 0)]: privilegedIps.add(addr)