3M/cef_3M/util.py
2024-07-17 14:34:01 +00:00

107 lines
3.1 KiB
Python

import asyncio
import hashlib
import json
import re
import MySQLdb
import config
from minio import Minio
from redis.asyncio import Redis
from pywebpush import WebPusher
from .sql import SessionMaker, AlertEndpoints, ergoQueryFetchOne
from fastapi import UploadFile
safeName = re.compile(r"[^\w\d\.-]")
# If this gets too out of hand, put an async breakpoint to allow other things to be handled while the hash occurs
async def SHA256(f: UploadFile) -> str:
sha = hashlib.sha256()
while data := await f.read(65535):
sha.update(data)
await f.seek(0)
return sha.hexdigest()
minioClient = Minio(
config.MINIO_INTERNAL_ADDR,
secure=False, # you will probably not have SSL
access_key=config.MINIO_ACCESS_KEY,
secret_key=config.MINIO_SECRET_KEY,
)
redis = Redis(host='localhost', port=6379, db=0, protocol=3)
class ErgoClient:
def __init__(self):
self.reader = None
self.writer = None
asyncio.get_running_loop().create_task(self.init())
@staticmethod
def retry(f):
async def wrapper(self, *args, **kwargs):
i = 30
while i:
try:
return await f(self, *args, **kwargs)
except RuntimeError:
self.init()
i -= 1
print("Couldn't connect")
return wrapper
@retry
async def init(self):
self.reader, self.writer = await asyncio.open_connection(config.ERGO_ADDR, config.ERGO_PORT)
await asyncio.get_running_loop().create_task(self.readEvents())
@retry
async def readEvents(self):
while 1:
rawLine = await self.reader.readline()
if not rawLine: break
line = rawLine.decode("utf8").strip().split()
if line[0] == "MENTION":
await self.handleMention(line[1], line[2], line[3])
async def handleMention(self, username: str, channel: str, msgid: str):
session = SessionMaker()
for target in session.query(AlertEndpoints).filter(AlertEndpoints.username == username):
pusher = WebPusher({
"endpoint": target.url,
"keys": {
"auth": target.auth,
"p256dh": target.p256dh
}
})
messageQuery = ergoQueryFetchOne("SELECT `data` FROM `history` WHERE `msgid` = :id", id=int(msgid)).data
message = json.loads(messageQuery.decode("utf8"))
encoded = json.dumps({
"channel": channel,
"from": message["AccountName"],
"content": message["Message"]["Message"]
}).encode("utf8")
await pusher.send_async(encoded)
session.close()
@retry
async def write(self, msg):
self.writer.write(msg+b"\n")
await self.writer.drain()
async def broadcastAs(self, user, *message):
await self.write(f"BROADCASTAS {user} {' '.join(message)}".encode("utf8"))
async def broadcastTo(self, user, *message):
await self.write(f"BROADCASTTO {user} {' '.join(message)}".encode("utf8"))
ergo = ErgoClient()