1
0
Fork 0
forked from External/ergo

switch to redis pubsub for ipc

adjust commands to utilize channel names
add new config variables
fix mention race condition
This commit is contained in:
CEF Server 2024-08-27 14:49:05 +00:00
parent 64ebb1f480
commit f4c03b6765
12 changed files with 220 additions and 195 deletions

View file

@ -1,134 +1,191 @@
package irc
import (
"bufio"
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"github.com/cshum/imagor/imagorpath"
"github.com/ergochat/ergo/irc/caps"
"io"
"net"
"os"
"github.com/ergochat/irc-go/ircmsg"
"net/http"
"regexp"
"strings"
"time"
)
const LISTEN = "0.0.0.0:22843"
var ctx = context.Background()
type CefConnection struct {
connections map[string]net.Conn
server *Server
}
func (instance *CefConnection) CEFMessage(action string, extra ...string) {
if instance == nil {
func (channel *Channel) RedisBroadcast(message ...string) {
if channel.server.redis == nil {
return
}
str := fmt.Sprintf("%s %s\n", action, strings.Join(extra, " "))
for _, conn := range instance.connections {
conn.Write([]byte(str))
}
}
func (instance *CefConnection) KickBroadcast(channel string, user string) {
instance.CEFMessage("KICK", channel, user)
}
func cefConnection(server *Server) *CefConnection {
// create a tcp listener on the given port
listener, err := net.Listen("tcp", LISTEN)
err := channel.server.redis.Publish(ctx, "channel."+channel.NameCasefolded(), strings.Join(message, " ")).Err()
if err != nil {
fmt.Println("Unable to open CefConnection listener:", err)
os.Exit(1)
fmt.Printf("Channel broadcast error: %v", err)
}
fmt.Printf("CefConnection listener on %s active\n", LISTEN)
instance := CefConnection{connections: make(map[string]net.Conn), server: server}
go cefListener(listener, &instance)
return &instance
}
func cefListener(listener net.Listener, instance *CefConnection) {
// listen for new connections
for {
conn, err := listener.Accept()
if err != nil {
fmt.Println("failed to accept cef connection, err:", err)
func (client *Client) RedisBroadcast(message ...string) {
err := client.server.redis.Publish(ctx, "user."+client.NickCasefolded(), strings.Join(message, " ")).Err()
if err != nil {
fmt.Printf("User broadcast error: %v", err)
}
}
func (channel *Channel) Broadcast(command string, params ...string) {
for _, member := range channel.Members() {
for _, session := range member.Sessions() {
session.Send(nil, member.server.name, command, params...)
}
}
}
// ChannelSub Actions and info centering around channels
func (server *Server) ChannelSub() {
pubsub := server.redis.PSubscribe(ctx, "channel.*")
defer pubsub.Close()
ch := pubsub.Channel()
for msg := range ch {
server.logger.Info("RedisMessage", msg.Channel, msg.Payload)
line := strings.Split(msg.Payload, " ")
channelName := strings.SplitN(msg.Channel, ".", 2)[1]
channel := server.channels.Get(channelName)
if len(line) == 0 {
println("Empty string dumped into ", msg.Channel, " channel")
}
if channel == nil {
server.logger.Warning("RedisMessage", "Unknown channel")
continue
}
instance.connections[conn.RemoteAddr().String()] = conn
// pass an accepted connection to a handler goroutine
go handleConnection(conn, instance)
switch line[0] {
case "VOICEPART":
channel.Broadcast("VOICEPART", channelName, line[1])
case "VOICESTATE":
channel.Broadcast("VOICESTATE", channelName, line[1])
case "BROADCASTTO":
for _, person := range channel.Members() {
person.Send(nil, server.name, line[1], line[2:]...)
}
}
}
}
func handleConnection(conn net.Conn, instance *CefConnection) {
defer delete(instance.connections, conn.RemoteAddr().String())
reader := bufio.NewReader(conn)
println("Connection with CEF service established")
for {
// read client request data
bytes, err := reader.ReadBytes(byte('\n'))
if err != nil {
if err != io.EOF {
fmt.Println("failed to read data, err:", err)
}
return
// UserSub Handles things pertaining to users
func (server *Server) UserSub() {
pubsub := server.redis.PSubscribe(ctx, "user.*")
defer pubsub.Close()
ch := pubsub.Channel()
for msg := range ch {
server.logger.Info("RedisMessage", msg.Channel, msg.Payload)
line := strings.Split(msg.Payload, " ")
userName := strings.SplitN(msg.Channel, ".", 2)[1]
user := server.clients.Get(userName)
if len(line) == 0 {
println("Empty string dumped into ", msg.Channel, " channel")
}
convertedLine := string(bytes[:len(bytes)-1])
line := strings.Split(strings.Trim(convertedLine, " \n"), " ")
fmt.Printf("cef: %+q\n", line)
switch line[0] {
case "PART":
if len(line) == 1 || len(line[1]) == 0 {
println("skipping malformed line")
continue
}
channel := instance.server.channels.Get(line[1])
for _, member := range channel.Members() {
for _, session := range member.Sessions() {
session.Send(nil, member.server.name, "VOICEPART", line[1], line[2])
}
}
break
case "VOICESTATE":
channel := instance.server.channels.Get(line[1])
for _, member := range channel.Members() {
for _, session := range member.Sessions() {
session.Send(nil, member.server.name, "VOICESTATE", line[1], line[2], line[3], line[4])
}
}
break
case "BROADCASTAS":
// TODO: global broadcast
user := instance.server.clients.Get(line[1])
if user != nil {
// I'm not too sure what the capability bit is, I think it's just ones that match
for friend := range user.Friends(caps.ExtendedNames) {
friend.Send(nil, user.NickMaskString(), line[2], line[3:]...)
}
}
break
case "BROADCASTTO":
channel := instance.server.channels.Get(line[1])
if channel != nil {
// I'm not too sure what the capability bit is, I think it's just ones that match
for _, person := range channel.Members() {
person.Send(nil, instance.server.name, line[2], line[3:]...)
}
}
break
case "FULLYREMOVE":
user := instance.server.clients.Get(line[1])
if user != nil {
user.destroy(nil)
err := instance.server.accounts.Unregister(user.Account(), true)
err := server.accounts.Unregister(user.Account(), true)
if err != nil {
return
}
}
break
default:
println("Unknown cef message: ", line[0])
case "BROADCASTAS":
if user != nil {
// I'm not too sure what the capability bit is, I think it's just ones that match
for friend := range user.Friends(caps.ExtendedNames) {
friend.Send(nil, user.NickMaskString(), line[1], line[2:]...)
}
}
break
}
}
}
func startRedis(server *Server) {
go server.ChannelSub()
go server.UserSub()
}
func (server *Server) GenerateImagorSignaturesFromMessage(message *ircmsg.Message) string {
line, err := message.Line()
if err == nil {
return server.GenerateImagorSignatures(line)
}
return ""
}
func (server *Server) GetUrlMime(url string) string {
config := server.Config()
// hacky, should fix
if !strings.Contains(url, "?") {
url += "?"
}
params := imagorpath.Params{
Image: url,
Meta: true,
}
metaPath := imagorpath.Generate(params, imagorpath.NewHMACSigner(sha256.New, 0, config.Cef.Imagor.Secret))
client := http.Client{
Timeout: 5 * time.Second,
}
resp, err := client.Get(config.Cef.Imagor.Url + metaPath)
if err != nil {
println("Failed on the initial get")
println(err.Error())
return ""
}
defer resp.Body.Close()
var meta map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&meta)
if err != nil {
println("Failed on the JSON decode")
return ""
}
contentType, valid := meta["format"].(string)
fmt.Printf("%+v\n", meta)
if !valid {
println("No content type")
return ""
}
return contentType
}
var urlRegex = regexp.MustCompile("https?:\\/\\/[\\w-]+(\\.[\\w-]+)+([\\w.,@?^=%&:/~+#-]*[\\w@?^=%&/~+#-])?")
// Process a message to add Imagor signatures
func (server *Server) GenerateImagorSignatures(str string) string {
urls := urlRegex.FindAllString(str, -1)
var sigs []string
for _, url := range urls {
params := imagorpath.Params{
Image: url,
FitIn: true,
Width: 600,
Height: 600,
}
path := imagorpath.Generate(params, imagorpath.NewHMACSigner(sha256.New, 0, server.Config().Cef.Imagor.Secret))
signature := path[:strings.IndexByte(path, '/')]
contentType := server.GetUrlMime(url)
if contentType != "" {
sigs = append(sigs, signature+"|"+strings.ReplaceAll(contentType, "/", "_"))
} else {
sigs = append(sigs, signature)
}
}
if len(sigs) > 0 {
return strings.Join(sigs, ",")
}
return ""
}