107 lines
3.1 KiB
Python
107 lines
3.1 KiB
Python
import asyncio
|
|
import hashlib
|
|
import json
|
|
import re
|
|
|
|
import MySQLdb
|
|
|
|
import config
|
|
from minio import Minio
|
|
from redis.asyncio import Redis
|
|
from pywebpush import WebPusher
|
|
|
|
from .sql import SessionMaker, AlertEndpoints, ergoQueryFetchOne
|
|
|
|
from fastapi import UploadFile
|
|
|
|
|
|
safeName = re.compile(r"[^\w\d\.-]")
|
|
|
|
# If this gets too out of hand, put an async breakpoint to allow other things to be handled while the hash occurs
|
|
async def SHA256(f: UploadFile) -> str:
|
|
sha = hashlib.sha256()
|
|
while data := await f.read(65535):
|
|
sha.update(data)
|
|
await f.seek(0)
|
|
return sha.hexdigest()
|
|
|
|
minioClient = Minio(
|
|
config.MINIO_INTERNAL_ADDR,
|
|
secure=False, # you will probably not have SSL
|
|
access_key=config.MINIO_ACCESS_KEY,
|
|
secret_key=config.MINIO_SECRET_KEY,
|
|
)
|
|
|
|
redis = Redis(host='localhost', port=6379, db=0, protocol=3)
|
|
|
|
class ErgoClient:
|
|
def __init__(self):
|
|
self.reader = None
|
|
self.writer = None
|
|
asyncio.get_running_loop().create_task(self.init())
|
|
|
|
@staticmethod
|
|
def retry(f):
|
|
async def wrapper(self, *args, **kwargs):
|
|
i = 30
|
|
while i:
|
|
try:
|
|
return await f(self, *args, **kwargs)
|
|
except RuntimeError:
|
|
self.init()
|
|
i -= 1
|
|
print("Couldn't connect")
|
|
return wrapper
|
|
|
|
@retry
|
|
async def init(self):
|
|
self.reader, self.writer = await asyncio.open_connection(config.ERGO_ADDR, config.ERGO_PORT)
|
|
await asyncio.get_running_loop().create_task(self.readEvents())
|
|
|
|
@retry
|
|
async def readEvents(self):
|
|
while 1:
|
|
rawLine = await self.reader.readline()
|
|
if not rawLine: break
|
|
line = rawLine.decode("utf8").strip().split()
|
|
if line[0] == "MENTION":
|
|
await self.handleMention(line[1], line[2], line[3])
|
|
|
|
|
|
async def handleMention(self, username: str, channel: str, msgid: str):
|
|
session = SessionMaker()
|
|
for target in session.query(AlertEndpoints).filter(AlertEndpoints.username == username):
|
|
pusher = WebPusher({
|
|
"endpoint": target.url,
|
|
"keys": {
|
|
"auth": target.auth,
|
|
"p256dh": target.p256dh
|
|
}
|
|
})
|
|
messageQuery = ergoQueryFetchOne("SELECT `data` FROM `history` WHERE `msgid` = :id", id=int(msgid)).data
|
|
message = json.loads(messageQuery.decode("utf8"))
|
|
encoded = json.dumps({
|
|
"channel": channel,
|
|
"from": message["AccountName"],
|
|
"content": message["Message"]["Message"]
|
|
}).encode("utf8")
|
|
|
|
await pusher.send_async(encoded)
|
|
session.close()
|
|
|
|
|
|
@retry
|
|
async def write(self, msg):
|
|
|
|
self.writer.write(msg+b"\n")
|
|
await self.writer.drain()
|
|
|
|
async def broadcastAs(self, user, *message):
|
|
await self.write(f"BROADCASTAS {user} {' '.join(message)}".encode("utf8"))
|
|
|
|
|
|
async def broadcastTo(self, user, *message):
|
|
await self.write(f"BROADCASTTO {user} {' '.join(message)}".encode("utf8"))
|
|
|
|
|
|
ergo = ErgoClient()
|