From d73b6bac86bd1f2443749597a2eed5134be732a1 Mon Sep 17 00:00:00 2001 From: CEF Server Date: Thu, 28 Nov 2024 02:58:14 +0000 Subject: [PATCH] complete database reivision --- irc/caps/defs.go | 2 +- irc/channel.go | 85 +++++---- irc/client.go | 26 +-- irc/handlers.go | 26 ++- irc/history/history.go | 15 +- irc/mysql/history.go | 374 +++++++++++-------------------------- irc/mysql/serialization.go | 5 - irc/nickname.go | 12 +- irc/roleplay.go | 3 + irc/server.go | 8 +- 10 files changed, 203 insertions(+), 353 deletions(-) diff --git a/irc/caps/defs.go b/irc/caps/defs.go index 3a1e25c2..f02a5081 100644 --- a/irc/caps/defs.go +++ b/irc/caps/defs.go @@ -7,7 +7,7 @@ package caps const ( // number of recognized capabilities: - numCapabs = 35 + numCapabs = 36 // length of the uint32 array that represents the bitset: bitsetLen = 2 ) diff --git a/irc/channel.go b/irc/channel.go index 64ed95ec..07ee7be5 100644 --- a/irc/channel.go +++ b/irc/channel.go @@ -836,11 +836,12 @@ func (channel *Channel) Join(client *Client, key string, isSajoin bool, rb *Resp // no history item for fake persistent joins if rb != nil && !respectAuditorium { histItem := history.Item{ - Type: history.Join, - Nick: details.nickMask, - AccountName: details.accountName, - Message: message, - IsBot: isBot, + Type: history.Join, + Nick: details.nickMask, + Account: details.account, + Message: message, + Target: channel.NameCasefolded(), + IsBot: isBot, } histItem.Params[0] = details.realname channel.AddHistoryItem(histItem, details.account) @@ -1051,11 +1052,12 @@ func (channel *Channel) Part(client *Client, message string, rb *ResponseBuffer) if !respectAuditorium { channel.AddHistoryItem(history.Item{ - Type: history.Part, - Nick: details.nickMask, - AccountName: details.accountName, - Message: splitMessage, - IsBot: isBot, + Type: history.Part, + Nick: details.nickMask, + Account: details.account, + Message: splitMessage, + Target: channel.NameCasefolded(), + IsBot: isBot, }, details.account) } @@ -1089,12 +1091,12 @@ func (channel *Channel) replayHistoryItems(rb *ResponseBuffer, items []history.I nick := NUHToNick(item.Nick) switch item.Type { case history.Privmsg: - rb.AddSplitMessageFromClient(item.Nick, item.AccountName, item.IsBot, item.Tags, "PRIVMSG", chname, item.Message) + rb.AddSplitMessageFromClient(item.Nick, item.Account, item.IsBot, item.Tags, "PRIVMSG", chname, item.Message) case history.Notice: - rb.AddSplitMessageFromClient(item.Nick, item.AccountName, item.IsBot, item.Tags, "NOTICE", chname, item.Message) + rb.AddSplitMessageFromClient(item.Nick, item.Account, item.IsBot, item.Tags, "NOTICE", chname, item.Message) case history.Tagmsg: if eventPlayback { - rb.AddSplitMessageFromClient(item.Nick, item.AccountName, item.IsBot, item.Tags, "TAGMSG", chname, item.Message) + rb.AddSplitMessageFromClient(item.Nick, item.Account, item.IsBot, item.Tags, "TAGMSG", chname, item.Message) } else if chathistoryCommand { // #1676, we have to send something here or else it breaks pagination rb.AddFromClient(item.Message.Time, history.HistservMungeMsgid(item.Message.Msgid), histservService.prefix, "*", false, nil, "PRIVMSG", chname, fmt.Sprintf(client.t("%s sent a TAGMSG"), nick)) @@ -1102,25 +1104,25 @@ func (channel *Channel) replayHistoryItems(rb *ResponseBuffer, items []history.I case history.Join: if eventPlayback { if extendedJoin { - rb.AddFromClient(item.Message.Time, item.Message.Msgid, item.Nick, item.AccountName, item.IsBot, nil, "JOIN", chname, item.AccountName, item.Params[0]) + rb.AddFromClient(item.Message.Time, item.Message.Msgid, item.Nick, item.Account, item.IsBot, nil, "JOIN", chname, item.Account, item.Params[0]) } else { - rb.AddFromClient(item.Message.Time, item.Message.Msgid, item.Nick, item.AccountName, item.IsBot, nil, "JOIN", chname) + rb.AddFromClient(item.Message.Time, item.Message.Msgid, item.Nick, item.Account, item.IsBot, nil, "JOIN", chname) } } else { if !playJoinsAsPrivmsg { continue // #474 } var message string - if item.AccountName == "*" { + if item.Account == "*" { message = fmt.Sprintf(client.t("%s joined the channel"), nick) } else { - message = fmt.Sprintf(client.t("%[1]s [account: %[2]s] joined the channel"), nick, item.AccountName) + message = fmt.Sprintf(client.t("%[1]s [account: %[2]s] joined the channel"), nick, item.Account) } rb.AddFromClient(item.Message.Time, history.HistservMungeMsgid(item.Message.Msgid), histservService.prefix, "*", false, nil, "PRIVMSG", chname, message) } case history.Part: if eventPlayback { - rb.AddFromClient(item.Message.Time, item.Message.Msgid, item.Nick, item.AccountName, item.IsBot, nil, "PART", chname, item.Message.Message) + rb.AddFromClient(item.Message.Time, item.Message.Msgid, item.Nick, item.Account, item.IsBot, nil, "PART", chname, item.Message.Message) } else { if !playJoinsAsPrivmsg { continue // #474 @@ -1130,14 +1132,14 @@ func (channel *Channel) replayHistoryItems(rb *ResponseBuffer, items []history.I } case history.Kick: if eventPlayback { - rb.AddFromClient(item.Message.Time, item.Message.Msgid, item.Nick, item.AccountName, item.IsBot, nil, "KICK", chname, item.Params[0], item.Message.Message) + rb.AddFromClient(item.Message.Time, item.Message.Msgid, item.Nick, item.Account, item.IsBot, nil, "KICK", chname, item.Params[0], item.Message.Message) } else { message := fmt.Sprintf(client.t("%[1]s kicked %[2]s (%[3]s)"), nick, item.Params[0], item.Message.Message) rb.AddFromClient(item.Message.Time, history.HistservMungeMsgid(item.Message.Msgid), histservService.prefix, "*", false, nil, "PRIVMSG", chname, message) } case history.Quit: if eventPlayback { - rb.AddFromClient(item.Message.Time, item.Message.Msgid, item.Nick, item.AccountName, item.IsBot, nil, "QUIT", item.Message.Message) + rb.AddFromClient(item.Message.Time, item.Message.Msgid, item.Nick, item.Account, item.IsBot, nil, "QUIT", item.Message.Message) } else { if !playJoinsAsPrivmsg { continue // #474 @@ -1147,14 +1149,14 @@ func (channel *Channel) replayHistoryItems(rb *ResponseBuffer, items []history.I } case history.Nick: if eventPlayback { - rb.AddFromClient(item.Message.Time, item.Message.Msgid, item.Nick, item.AccountName, item.IsBot, nil, "NICK", item.Params[0]) + rb.AddFromClient(item.Message.Time, item.Message.Msgid, item.Nick, item.Account, item.IsBot, nil, "NICK", item.Params[0]) } else { message := fmt.Sprintf(client.t("%[1]s changed nick to %[2]s"), nick, item.Params[0]) rb.AddFromClient(item.Message.Time, history.HistservMungeMsgid(item.Message.Msgid), histservService.prefix, "*", false, nil, "PRIVMSG", chname, message) } case history.Topic: if eventPlayback { - rb.AddFromClient(item.Message.Time, item.Message.Msgid, item.Nick, item.AccountName, item.IsBot, nil, "TOPIC", chname, item.Message.Message) + rb.AddFromClient(item.Message.Time, item.Message.Msgid, item.Nick, item.Account, item.IsBot, nil, "TOPIC", chname, item.Message.Message) } else { message := fmt.Sprintf(client.t("%[1]s set the channel topic to: %[2]s"), nick, item.Message.Message) rb.AddFromClient(item.Message.Time, history.HistservMungeMsgid(item.Message.Msgid), histservService.prefix, "*", false, nil, "PRIVMSG", chname, message) @@ -1166,7 +1168,7 @@ func (channel *Channel) replayHistoryItems(rb *ResponseBuffer, items []history.I params[i+1] = pair.Message } if eventPlayback { - rb.AddFromClient(item.Message.Time, item.Message.Msgid, item.Nick, item.AccountName, item.IsBot, nil, "MODE", params...) + rb.AddFromClient(item.Message.Time, item.Message.Msgid, item.Nick, item.Account, item.IsBot, nil, "MODE", params...) } else { message := fmt.Sprintf(client.t("%[1]s set channel modes: %[2]s"), nick, strings.Join(params[1:], " ")) rb.AddFromClient(item.Message.Time, history.HistservMungeMsgid(item.Message.Msgid), histservService.prefix, "*", false, nil, "PRIVMSG", chname, message) @@ -1236,11 +1238,12 @@ func (channel *Channel) SetTopic(client *Client, topic string, rb *ResponseBuffe } channel.AddHistoryItem(history.Item{ - Type: history.Topic, - Nick: details.nickMask, - AccountName: details.accountName, - Message: message, - IsBot: isBot, + Type: history.Topic, + Nick: details.nickMask, + Account: details.account, + Message: message, + IsBot: isBot, + Target: channel.NameCasefolded(), }, details.account) channel.MarkDirty(IncludeTopic) @@ -1380,12 +1383,13 @@ func (channel *Channel) SendSplitMessage(command string, minPrefixMode modes.Mod // #959: don't save STATUSMSG (or OpModerated) if minPrefixMode == modes.Mode(0) { channel.AddHistoryItem(history.Item{ - Type: histType, - Message: message, - Nick: details.nickMask, - AccountName: details.accountName, - Tags: clientOnlyTags, - IsBot: isBot, + Type: histType, + Message: message, + Nick: details.nickMask, + Account: details.accountName, + Tags: clientOnlyTags, + IsBot: isBot, + Target: channel.NameCasefolded(), }, details.account) } } @@ -1492,11 +1496,12 @@ func (channel *Channel) Kick(client *Client, target *Client, comment string, rb rb.AddFromClient(message.Time, message.Msgid, details.nickMask, details.accountName, isBot, nil, "KICK", chname, targetNick, comment) histItem := history.Item{ - Type: history.Kick, - Nick: details.nickMask, - AccountName: details.accountName, - Message: message, - IsBot: isBot, + Type: history.Kick, + Nick: details.nickMask, + Account: details.account, + Message: message, + IsBot: isBot, + Target: channel.NameCasefolded(), } histItem.Params[0] = targetNick channel.AddHistoryItem(histItem, details.account) @@ -1573,6 +1578,8 @@ func (channel *Channel) Invite(invitee *Client, inviter *Client, rb *ResponseBuf item := history.Item{ Type: history.Invite, Message: message, + Account: inviter.Account(), + Target: invitee.Account(), } for _, member := range channel.Members() { diff --git a/irc/client.go b/irc/client.go index e2af62b3..e8487732 100644 --- a/irc/client.go +++ b/irc/client.go @@ -651,7 +651,7 @@ func (client *Client) run(session *Session) { firstLine := !isReattach - correspondents, _ := client.server.historyDB.GetPMs(client.NickCasefolded()) + correspondents, _ := client.server.historyDB.GetPMs(client.Account()) // For safety, let's keep this within the 4096 character barrier var lineBuilder utils.TokenLineBuilder lineBuilder.Initialize(MaxLineLen, ",") @@ -903,7 +903,7 @@ func (client *Client) replayPrivmsgHistory(rb *ResponseBuffer, items []history.I continue } if hasEventPlayback { - rb.AddFromClient(item.Message.Time, item.Message.Msgid, item.Nick, item.AccountName, item.IsBot, nil, "INVITE", nick, item.Message.Message) + rb.AddFromClient(item.Message.Time, item.Message.Msgid, item.Nick, item.Account, item.IsBot, nil, "INVITE", nick, item.Message.Message) } else { rb.AddFromClient(item.Message.Time, history.HistservMungeMsgid(item.Message.Msgid), histservService.prefix, "*", false, nil, "PRIVMSG", fmt.Sprintf(client.t("%[1]s invited you to channel %[2]s"), NUHToNick(item.Nick), item.Message.Message)) } @@ -931,11 +931,11 @@ func (client *Client) replayPrivmsgHistory(rb *ResponseBuffer, items []history.I tags = item.Tags } if !isSelfMessage(&item) { - rb.AddSplitMessageFromClient(item.Nick, item.AccountName, item.IsBot, tags, command, nick, item.Message) + rb.AddSplitMessageFromClient(item.Nick, item.Account, item.IsBot, tags, command, nick, item.Message) } else { // this message was sent *from* the client to another nick; the target is item.Params[0] // substitute client's current nickmask in case client changed nick - rb.AddSplitMessageFromClient(details.nickMask, item.AccountName, item.IsBot, tags, command, item.Params[0], item.Message) + rb.AddSplitMessageFromClient(details.nickMask, item.Account, item.IsBot, tags, command, item.Params[0], item.Message) } } @@ -1365,11 +1365,11 @@ func (client *Client) destroy(session *Session) { splitQuitMessage := utils.MakeMessage(quitMessage) isBot := client.HasMode(modes.Bot) quitItem = history.Item{ - Type: history.Quit, - Nick: details.nickMask, - AccountName: details.accountName, - Message: splitQuitMessage, - IsBot: isBot, + Type: history.Quit, + Nick: details.nickMask, + Account: details.accountName, + Message: splitQuitMessage, + IsBot: isBot, } var cache MessageCache cache.Initialize(client.server, splitQuitMessage.Time, splitQuitMessage.Msgid, details.nickMask, details.accountName, isBot, nil, "QUIT", quitMessage) @@ -1690,7 +1690,7 @@ func (client *Client) addHistoryItem(target *Client, item history.Item, details, } item.Nick = details.nickMask - item.AccountName = details.accountName + item.Account = details.account targetedItem := item targetedItem.Params[0] = tDetails.nick @@ -1698,15 +1698,15 @@ func (client *Client) addHistoryItem(target *Client, item history.Item, details, tStatus, _ := target.historyStatus(config) // add to ephemeral history if cStatus == HistoryEphemeral { - targetedItem.CfCorrespondent = tDetails.nickCasefolded + targetedItem.Target = tDetails.account client.history.Add(targetedItem) } if tStatus == HistoryEphemeral && client != target { - item.CfCorrespondent = details.nickCasefolded + item.Target = target.Account() target.history.Add(item) } if cStatus == HistoryPersistent || tStatus == HistoryPersistent { - targetedItem.CfCorrespondent = "" + targetedItem.Target = target.account client.server.historyDB.AddDirectMessage(details.nickCasefolded, details.account, tDetails.nickCasefolded, tDetails.account, targetedItem) } return nil diff --git a/irc/handlers.go b/irc/handlers.go index d498628b..984a99b4 100644 --- a/irc/handlers.go +++ b/irc/handlers.go @@ -1926,11 +1926,12 @@ func announceCmodeChanges(channel *Channel, applied modes.ModeChanges, source, a } } channel.AddHistoryItem(history.Item{ - Type: history.Mode, - Nick: source, - AccountName: accountName, - Message: message, - IsBot: isBot, + Type: history.Mode, + Nick: source, + Account: accountName, + Message: message, + Target: channel.NameCasefolded(), + IsBot: isBot, }, account) } } @@ -2443,6 +2444,7 @@ func dispatchMessageToTarget(client *Client, tags map[string]string, histType hi } } } else { + // PMs lowercaseTarget := strings.ToLower(target) service, isService := ErgoServices[lowercaseTarget] _, isZNC := zncHandlers[lowercaseTarget] @@ -2542,6 +2544,8 @@ func dispatchMessageToTarget(client *Client, tags map[string]string, histType hi Type: histType, Message: message, Tags: tags, + Target: user.Account(), + Account: client.Account(), } client.addHistoryItem(user, item, &details, &tDetails, config) @@ -2862,16 +2866,16 @@ func redactHandler(server *Server, client *Client, msg ircmsg.Message, rb *Respo rb.Add(nil, server.name, "FAIL", "REDACT", "REDACT_FORBIDDEN", utils.SafeErrorParam(target), utils.SafeErrorParam(targetmsgid), client.t("You are not authorized to delete messages")) return false } - accountName := "*" + account := "*" if canDelete == canDeleteSelf { - accountName = client.AccountName() - if accountName == "*" { + account = client.account + if account == "*" { rb.Add(nil, server.name, "FAIL", "REDACT", "REDACT_FORBIDDEN", utils.SafeErrorParam(target), utils.SafeErrorParam(targetmsgid), client.t("You are not authorized to delete this message")) return false } } - err := server.DeleteMessage(target, targetmsgid, accountName) + err := server.DeleteMessage(target, targetmsgid, account) if err == errNoop { rb.Add(nil, server.name, "FAIL", "REDACT", "UNKNOWN_MSGID", utils.SafeErrorParam(target), utils.SafeErrorParam(targetmsgid), client.t("This message does not exist or is too old")) return false @@ -2888,7 +2892,7 @@ func redactHandler(server *Server, client *Client, msg ircmsg.Message, rb *Respo if target[0] != '#' { // If this is a PM, we just removed the message from the buffer of the other party; // now we have to remove it from the buffer of the client who sent the REDACT command - err := server.DeleteMessage(client.Nick(), targetmsgid, accountName) + err := server.DeleteMessage(client.Nick(), targetmsgid, account) if err != nil { client.server.logger.Error("internal", fmt.Sprintf("Private message %s is not deletable by %s from their own buffer's even though we just deleted it from %s's. This is a bug, please report it in details.", targetmsgid, client.Nick(), target), client.Nick()) @@ -3214,6 +3218,8 @@ func relaymsgHandler(server *Server, client *Client, msg ircmsg.Message, rb *Res Type: history.Privmsg, Message: message, Nick: nuh, + Target: channel.NameCasefolded(), + Account: "$RELAYMSG", }, "") // 3 possibilities for tags: diff --git a/irc/history/history.go b/irc/history/history.go index ef7e4d9f..7b58337b 100644 --- a/irc/history/history.go +++ b/irc/history/history.go @@ -38,7 +38,8 @@ type Item struct { Nick string // this is the uncasefolded account name, if there's no account it should be set to "*" - AccountName string + // in cef, this is always set, at least in theory. cant wait for bugs toc rop up + Account string // for non-privmsg items, we may stuff some other data in here Message utils.SplitMessage Tags map[string]string @@ -46,8 +47,8 @@ type Item struct { // for a DM, this is the casefolded nickname of the other party (whether this is // an incoming or outgoing message). this lets us emulate the "query buffer" functionality // required by CHATHISTORY: - CfCorrespondent string `json:"CfCorrespondent,omitempty"` - IsBot bool `json:"IsBot,omitempty"` + Target string `json:"Target"` + IsBot bool `json:"IsBot,omitempty"` } // HasMsgid tests whether a message has the message id `msgid`. @@ -213,10 +214,10 @@ func (list *Buffer) allCorrespondents() (results []TargetListing) { stop := list.start for { - if !seen.Has(list.buffer[pos].CfCorrespondent) { - seen.Add(list.buffer[pos].CfCorrespondent) + if !seen.Has(list.buffer[pos].Target) { + seen.Add(list.buffer[pos].Target) results = append(results, TargetListing{ - CfName: list.buffer[pos].CfCorrespondent, + CfName: list.buffer[pos].Target, Time: list.buffer[pos].Message.Time, }) } @@ -281,7 +282,7 @@ func (list *Buffer) MakeSequence(correspondent string, cutoff time.Time) Sequenc var pred Predicate if correspondent != "" { pred = func(item *Item) bool { - return item.CfCorrespondent == correspondent + return item.Target == correspondent } } return &bufferSequence{ diff --git a/irc/mysql/history.go b/irc/mysql/history.go index 6cc1ad21..8f324c04 100644 --- a/irc/mysql/history.go +++ b/irc/mysql/history.go @@ -6,7 +6,6 @@ package mysql import ( "context" "database/sql" - "encoding/json" "errors" "fmt" "io" @@ -50,9 +49,7 @@ type MySQL struct { logger *logger.Manager insertHistory *sql.Stmt - insertSequence *sql.Stmt insertConversation *sql.Stmt - insertCorrespondent *sql.Stmt insertAccountMessage *sql.Stmt stateMutex sync.Mutex @@ -89,39 +86,39 @@ func (mysql *MySQL) getExpireTime() (expireTime time.Duration) { return } -func (m *MySQL) Open() (err error) { +func (mysql *MySQL) Open() (err error) { var address string - if m.config.SocketPath != "" { - address = fmt.Sprintf("unix(%s)", m.config.SocketPath) - } else if m.config.Port != 0 { - address = fmt.Sprintf("tcp(%s:%d)", m.config.Host, m.config.Port) + if mysql.config.SocketPath != "" { + address = fmt.Sprintf("unix(%s)", mysql.config.SocketPath) + } else if mysql.config.Port != 0 { + address = fmt.Sprintf("tcp(%s:%d)", mysql.config.Host, mysql.config.Port) } - m.db, err = sql.Open("mysql", fmt.Sprintf("%s:%s@%s/%s", m.config.User, m.config.Password, address, m.config.HistoryDatabase)) + mysql.db, err = sql.Open("mysql", fmt.Sprintf("%s:%s@%s/%s", mysql.config.User, mysql.config.Password, address, mysql.config.HistoryDatabase)) if err != nil { return err } - if m.config.MaxConns != 0 { - m.db.SetMaxOpenConns(m.config.MaxConns) - m.db.SetMaxIdleConns(m.config.MaxConns) + if mysql.config.MaxConns != 0 { + mysql.db.SetMaxOpenConns(mysql.config.MaxConns) + mysql.db.SetMaxIdleConns(mysql.config.MaxConns) } - if m.config.ConnMaxLifetime != 0 { - m.db.SetConnMaxLifetime(m.config.ConnMaxLifetime) + if mysql.config.ConnMaxLifetime != 0 { + mysql.db.SetConnMaxLifetime(mysql.config.ConnMaxLifetime) } - err = m.fixSchemas() + err = mysql.fixSchemas() if err != nil { return err } - err = m.prepareStatements() + err = mysql.prepareStatements() if err != nil { return err } - go m.cleanupLoop() - go m.forgetLoop() + go mysql.cleanupLoop() + go mysql.forgetLoop() return nil } @@ -166,20 +163,12 @@ func (mysql *MySQL) fixSchemas() (err error) { if err != nil { return } - err = mysql.createCorrespondentsTable() - if err != nil { - return - } _, err = mysql.db.Exec(`insert into metadata (key_name, value) values (?, ?);`, keySchemaMinorVersion, latestDbMinorVersion) if err != nil { return } } else if err == nil && minorVersion == "1" { // upgrade from 2.1 to 2.2: create the correspondents table - err = mysql.createCorrespondentsTable() - if err != nil { - return - } _, err = mysql.db.Exec(`update metadata set value = ? where key_name = ?;`, latestDbMinorVersion, keySchemaMinorVersion) if err != nil { return @@ -192,51 +181,15 @@ func (mysql *MySQL) fixSchemas() (err error) { } func (mysql *MySQL) createTables() (err error) { - _, err = mysql.db.Exec(`CREATE TABLE history ( - id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, + _, err = mysql.db.Exec(fmt.Sprintf(`CREATE TABLE history ( + msgid BINARY(16) NOT NULL PRIMARY KEY, data BLOB NOT NULL, - msgid BINARY(16) NOT NULL, + target VARBINARY(%[1]d) NOT NULL, + sender VARBINARY(%[1]d) NOT NULL, + nanotime BIGINT UNSIGNED NOT NULL, + pm boolean as (SUBSTRING(target, 1, 1) != "#") PERSISTENT, KEY (msgid(4)) - ) CHARSET=ascii COLLATE=ascii_bin;`) - if err != nil { - return err - } - - _, err = mysql.db.Exec(fmt.Sprintf(`CREATE TABLE sequence ( - history_id BIGINT UNSIGNED NOT NULL PRIMARY KEY, - target VARBINARY(%[1]d) NOT NULL, - nanotime BIGINT UNSIGNED NOT NULL, - KEY (target, nanotime) - ) CHARSET=ascii COLLATE=ascii_bin;`, MaxTargetLength)) - if err != nil { - return err - } - /* XXX: this table used to be: - CREATE TABLE sequence ( - id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, - target VARBINARY(%[1]d) NOT NULL, - nanotime BIGINT UNSIGNED NOT NULL, - history_id BIGINT NOT NULL, - KEY (target, nanotime), - KEY (history_id) - ) CHARSET=ascii COLLATE=ascii_bin; - Some users may still be using the old schema. - */ - - _, err = mysql.db.Exec(fmt.Sprintf(`CREATE TABLE conversations ( - id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, - target VARBINARY(%[1]d) NOT NULL, - correspondent VARBINARY(%[1]d) NOT NULL, - nanotime BIGINT UNSIGNED NOT NULL, - history_id BIGINT NOT NULL, - KEY (target, correspondent, nanotime), - KEY (history_id) - ) CHARSET=ascii COLLATE=ascii_bin;`, MaxTargetLength)) - if err != nil { - return err - } - - err = mysql.createCorrespondentsTable() + ) CHARSET=ascii COLLATE=ascii_bin;`, MaxTargetLength, MaxTargetLength)) if err != nil { return err } @@ -249,19 +202,6 @@ func (mysql *MySQL) createTables() (err error) { return nil } -func (mysql *MySQL) createCorrespondentsTable() (err error) { - _, err = mysql.db.Exec(fmt.Sprintf(`CREATE TABLE correspondents ( - id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, - target VARBINARY(%[1]d) NOT NULL, - correspondent VARBINARY(%[1]d) NOT NULL, - nanotime BIGINT UNSIGNED NOT NULL, - UNIQUE KEY (target, correspondent), - KEY (target, nanotime), - KEY (nanotime) - ) CHARSET=ascii COLLATE=ascii_bin;`, MaxTargetLength)) - return -} - func (mysql *MySQL) createComplianceTables() (err error) { _, err = mysql.db.Exec(fmt.Sprintf(`CREATE TABLE account_messages ( history_id BIGINT UNSIGNED NOT NULL PRIMARY KEY, @@ -327,10 +267,6 @@ func (mysql *MySQL) doCleanup(age time.Duration) (count int, err error) { mysql.logger.Debug("mysql", fmt.Sprintf("deleting %d history rows, max age %s", len(ids), utils.NanoToTimestamp(maxNanotime))) - if maxNanotime != 0 { - mysql.deleteCorrespondents(ctx, maxNanotime) - } - return len(ids), mysql.deleteHistoryIDs(ctx, ids) } @@ -347,14 +283,6 @@ func (mysql *MySQL) deleteHistoryIDs(ctx context.Context, ids []uint64) (err err inBuf.WriteRune(')') inClause := inBuf.String() - _, err = mysql.db.ExecContext(ctx, fmt.Sprintf(`DELETE FROM conversations WHERE history_id in %s;`, inClause)) - if err != nil { - return - } - _, err = mysql.db.ExecContext(ctx, fmt.Sprintf(`DELETE FROM sequence WHERE history_id in %s;`, inClause)) - if err != nil { - return - } if mysql.isTrackingAccountMessages() { _, err = mysql.db.ExecContext(ctx, fmt.Sprintf(`DELETE FROM account_messages WHERE history_id in %s;`, inClause)) if err != nil { @@ -370,57 +298,34 @@ func (mysql *MySQL) deleteHistoryIDs(ctx context.Context, ids []uint64) (err err } func (mysql *MySQL) selectCleanupIDs(ctx context.Context, age time.Duration) (ids []uint64, maxNanotime int64, err error) { + before := timestampSnowflake(time.Now().Add(-age)) + rows, err := mysql.db.QueryContext(ctx, ` - SELECT history.id, sequence.nanotime, conversations.nanotime + SELECT history.msgid FROM history - LEFT JOIN sequence ON history.id = sequence.history_id - LEFT JOIN conversations on history.id = conversations.history_id - ORDER BY history.id LIMIT ?;`, cleanupRowLimit) + WHERE msgid < ? + ORDER BY history.msgid LIMIT ?;`, before, cleanupRowLimit) if err != nil { return } defer rows.Close() idset := make(map[uint64]struct{}, cleanupRowLimit) - threshold := time.Now().Add(-age).UnixNano() + ids = make([]uint64, len(idset)) + + i := 0 for rows.Next() { var id uint64 - var seqNano, convNano sql.NullInt64 - err = rows.Scan(&id, &seqNano, &convNano) + err = rows.Scan(&id) if err != nil { return } - nanotime := extractNanotime(seqNano, convNano) - // returns 0 if not found; in that case the data is inconsistent - // and we should delete the entry - if nanotime < threshold { - idset[id] = struct{}{} - if nanotime > maxNanotime { - maxNanotime = nanotime - } - } - } - ids = make([]uint64, len(idset)) - i := 0 - for id := range idset { ids[i] = id i++ } return } -func (mysql *MySQL) deleteCorrespondents(ctx context.Context, threshold int64) { - result, err := mysql.db.ExecContext(ctx, `DELETE FROM correspondents WHERE nanotime <= (?);`, threshold) - if err != nil { - mysql.logError("error deleting correspondents", err) - } else { - count, err := result.RowsAffected() - if !mysql.logError("error deleting correspondents", err) { - mysql.logger.Debug(fmt.Sprintf("deleted %d correspondents entries", count)) - } - } -} - // wait for forget queue items and process them one by one func (mysql *MySQL) forgetLoop() { defer func() { @@ -526,23 +431,7 @@ func (mysql *MySQL) doForgetIteration(account string) (count int, err error) { func (mysql *MySQL) prepareStatements() (err error) { mysql.insertHistory, err = mysql.db.Prepare(`INSERT INTO history - (data, msgid) VALUES (?, ?);`) - if err != nil { - return - } - mysql.insertSequence, err = mysql.db.Prepare(`INSERT INTO sequence - (target, nanotime, history_id) VALUES (?, ?, ?);`) - if err != nil { - return - } - mysql.insertConversation, err = mysql.db.Prepare(`INSERT INTO conversations - (target, correspondent, nanotime, history_id) VALUES (?, ?, ?, ?);`) - if err != nil { - return - } - mysql.insertCorrespondent, err = mysql.db.Prepare(`INSERT INTO correspondents - (target, correspondent, nanotime) VALUES (?, ?, ?) - ON DUPLICATE KEY UPDATE nanotime = GREATEST(nanotime, ?);`) + (data, msgid, target, sender, nanotime) VALUES (?, ?, ?, ?, ?);`) if err != nil { return } @@ -608,11 +497,6 @@ func (mysql *MySQL) AddChannelItem(target string, item history.Item, account str return } - err = mysql.insertSequenceEntry(ctx, target, item.Message.Time.UnixNano(), id) - if err != nil { - return - } - err = mysql.insertAccountMessageEntry(ctx, id, account) if err != nil { return @@ -621,42 +505,21 @@ func (mysql *MySQL) AddChannelItem(target string, item history.Item, account str return } -func (mysql *MySQL) insertSequenceEntry(ctx context.Context, target string, messageTime int64, id int64) (err error) { - _, err = mysql.insertSequence.ExecContext(ctx, target, messageTime, id) - if err != nil { - println(target, messageTime, id, ctx) - } - mysql.logError("could not insert sequence entry", err) - return -} - -func (mysql *MySQL) insertConversationEntry(ctx context.Context, target, correspondent string, messageTime int64, id int64) (err error) { - _, err = mysql.insertConversation.ExecContext(ctx, target, correspondent, messageTime, id) - mysql.logError("could not insert conversations entry", err) - return -} - -func (mysql *MySQL) insertCorrespondentsEntry(ctx context.Context, target, correspondent string, messageTime int64, historyId int64) (err error) { - _, err = mysql.insertCorrespondent.ExecContext(ctx, target, correspondent, messageTime, messageTime) - mysql.logError("could not insert conversations entry", err) - return -} - func (mysql *MySQL) insertBase(ctx context.Context, item history.Item) (id int64, err error) { - _, err := marshalItem(&item) + var value []byte + value, err = marshalItem(&item) if mysql.logError("could not marshal item", err) { return } - - //msgidBytes, err := decodeMsgid(item.Message.Msgid) - /*if mysql.logError("could not decode msgid", err) { - return - }*/ - - result, err := mysql.insertHistory.ExecContext(ctx, value, item.Message.Msgid) + var account = item.Account + if account == "" { + account = "*" + } + result, err := mysql.insertHistory.ExecContext(ctx, value, item.Message.Msgid, item.Target, account, item.Message.Time.UnixNano()) if mysql.logError("could not insert item", err) { return } + id, err = result.LastInsertId() if mysql.logError("could not insert item", err) { return @@ -690,36 +553,7 @@ func (mysql *MySQL) AddDirectMessage(sender, senderAccount, recipient, recipient ctx, cancel := context.WithTimeout(context.Background(), mysql.getTimeout()) defer cancel() - id, err := mysql.insertBase(ctx, item) - if err != nil { - return - } - - nanotime := item.Message.Time.UnixNano() - - if senderAccount != "" { - err = mysql.insertConversationEntry(ctx, senderAccount, recipient, nanotime, id) - if err != nil { - return - } - err = mysql.insertCorrespondentsEntry(ctx, senderAccount, recipient, nanotime, id) - if err != nil { - return - } - } - - if recipientAccount != "" && sender != recipient { - err = mysql.insertConversationEntry(ctx, recipientAccount, sender, nanotime, id) - if err != nil { - return - } - err = mysql.insertCorrespondentsEntry(ctx, recipientAccount, sender, nanotime, id) - if err != nil { - return - } - } - - err = mysql.insertAccountMessageEntry(ctx, id, senderAccount) + _, err = mysql.insertBase(ctx, item) if err != nil { return } @@ -728,7 +562,7 @@ func (mysql *MySQL) AddDirectMessage(sender, senderAccount, recipient, recipient } // note that accountName is the unfolded name -func (mysql *MySQL) DeleteMsgid(msgid, accountName string) (err error) { +func (mysql *MySQL) DeleteMsgid(msgid, account string) (err error) { if mysql.db == nil { return nil } @@ -741,11 +575,11 @@ func (mysql *MySQL) DeleteMsgid(msgid, accountName string) (err error) { return } - if accountName != "*" { + if account != "*" { var item history.Item err = unmarshalItem(data, &item) // delete if the entry is corrupt - if err == nil && item.AccountName != accountName { + if err == nil && item.Account != account { return ErrDisallowed } } @@ -756,7 +590,10 @@ func (mysql *MySQL) DeleteMsgid(msgid, accountName string) (err error) { } func (mysql *MySQL) Export(account string, writer io.Writer) { - if mysql.db == nil { + // no eu presence... + // maybe fix this when i know the new schema works + return + /*if mysql.db == nil { return } @@ -768,10 +605,8 @@ func (mysql *MySQL) Export(account string, writer io.Writer) { defer cancel() rows, rowsErr := mysql.db.QueryContext(ctx, ` - SELECT account_messages.history_id, history.data, sequence.target FROM account_messages - INNER JOIN history ON history.id = account_messages.history_id - INNER JOIN sequence ON account_messages.history_id = sequence.history_id - WHERE account_messages.account = ? AND account_messages.history_id > ? + SELECT history.data, msgid, target FROM history + WHERE sender = ? AND account_messages.history_id > ? LIMIT ?`, account, lastSeen, cleanupRowLimit) if rowsErr != nil { err = rowsErr @@ -783,7 +618,7 @@ func (mysql *MySQL) Export(account string, writer io.Writer) { var blob, jsonBlob []byte var target string var item history.Item - err = rows.Scan(&id, &blob, &target) + err = rows.Scan(&blob, &id, &target) if err != nil { return } @@ -791,7 +626,7 @@ func (mysql *MySQL) Export(account string, writer io.Writer) { if err != nil { return } - item.CfCorrespondent = target + item.Target = target jsonBlob, err = json.Marshal(item) if err != nil { return @@ -811,30 +646,28 @@ func (mysql *MySQL) Export(account string, writer io.Writer) { } mysql.logError("could not export history", err) - return + return*/ } func (mysql *MySQL) lookupMsgid(ctx context.Context, msgid string, includeData bool) (result time.Time, id uint64, data []byte, err error) { if err != nil { return } - cols := `sequence.nanotime, conversations.nanotime` + cols := `history.nanotime` if includeData { - cols = `sequence.nanotime, conversations.nanotime, history.id, history.data` + cols = `history.nanotime, history.id, history.data` } // Since CEF uses snowflakes and vanilla ergo uses blobs, we cast as int to make it function. // May have to adjust it some day row := mysql.db.QueryRowContext(ctx, fmt.Sprintf(` SELECT %s FROM history - LEFT JOIN sequence ON history.id = sequence.history_id - LEFT JOIN conversations ON history.id = conversations.history_id WHERE history.msgid = CAST(? AS INT) LIMIT 1;`, cols), msgid) - var nanoSeq, nanoConv sql.NullInt64 + var nanoSeq sql.NullInt64 if !includeData { - err = row.Scan(&nanoSeq, &nanoConv) + err = row.Scan(&nanoSeq) } else { - err = row.Scan(&nanoSeq, &nanoConv, &id, &data) + err = row.Scan(&nanoSeq, &id, &data) } if err != sql.ErrNoRows { mysql.logError("could not resolve msgid to time", err) @@ -842,7 +675,7 @@ func (mysql *MySQL) lookupMsgid(ctx context.Context, msgid string, includeData b if err != nil { return } - nanotime := extractNanotime(nanoSeq, nanoConv) + nanotime := nanoSeq.Int64 if nanotime == 0 { err = sql.ErrNoRows return @@ -851,15 +684,6 @@ func (mysql *MySQL) lookupMsgid(ctx context.Context, msgid string, includeData b return } -func extractNanotime(seq, conv sql.NullInt64) (result int64) { - if seq.Valid { - return seq.Int64 - } else if conv.Valid { - return conv.Int64 - } - return -} - func (mysql *MySQL) selectItems(ctx context.Context, query string, args ...interface{}) (results []history.Item, err error) { rows, err := mysql.db.QueryContext(ctx, query, args...) if mysql.logError("could not select history items", err) { @@ -884,12 +708,12 @@ func (mysql *MySQL) selectItems(ctx context.Context, query string, args ...inter return } +func timestampSnowflake(t time.Time) uint64 { + var ts = t.Unix() & 0xffffffffffff + return uint64(ts << 16) +} + func (mysql *MySQL) betweenTimestamps(ctx context.Context, target, correspondent string, after, before, cutoff time.Time, limit int) (results []history.Item, err error) { - useSequence := correspondent == "" - table := "sequence" - if !useSequence { - table = "conversations" - } after, before, ascending := history.MinMaxAsc(after, before, cutoff) direction := "ASC" @@ -899,26 +723,25 @@ func (mysql *MySQL) betweenTimestamps(ctx context.Context, target, correspondent var queryBuf strings.Builder - args := make([]interface{}, 0, 6) - fmt.Fprintf(&queryBuf, - "SELECT history.data from history INNER JOIN %[1]s ON history.id = %[1]s.history_id WHERE", table) - if useSequence { - fmt.Fprintf(&queryBuf, " sequence.target = ?") + args := make([]interface{}, 0, 7) + if correspondent == "" { + fmt.Fprintf(&queryBuf, "SELECT history.data from history WHERE target = ? ") args = append(args, target) } else { - fmt.Fprintf(&queryBuf, " conversations.target = ? AND conversations.correspondent = ?") - args = append(args, target) - args = append(args, correspondent) + fmt.Fprintf(&queryBuf, "SELECT history.data from history WHERE (target = ? and sender = ?) OR (target = ? and sender = ?)") + args = append(args, target, correspondent, correspondent, target) } + if !after.IsZero() { - fmt.Fprintf(&queryBuf, " AND %s.nanotime > ?", table) + fmt.Fprintf(&queryBuf, " AND nanotime > ?") args = append(args, after.UnixNano()) } if !before.IsZero() { - fmt.Fprintf(&queryBuf, " AND %s.nanotime < ?", table) + fmt.Fprintf(&queryBuf, " AND nanotime < ?") args = append(args, before.UnixNano()) } - fmt.Fprintf(&queryBuf, " ORDER BY %[1]s.nanotime %[2]s LIMIT ?;", table, direction) + + fmt.Fprintf(&queryBuf, " ORDER BY nanotime %[1]s LIMIT ?;", direction) args = append(args, limit) results, err = mysql.selectItems(ctx, queryBuf.String(), args...) @@ -936,19 +759,19 @@ func (mysql *MySQL) listCorrespondentsInternal(ctx context.Context, target strin } var queryBuf strings.Builder - args := make([]interface{}, 0, 4) - queryBuf.WriteString(`SELECT correspondents.correspondent, correspondents.nanotime from correspondents - WHERE target = ?`) - args = append(args, target) + args := make([]interface{}, 0, 5) + queryBuf.WriteString(`SELECT target, sender, nanotime from history + WHERE target = ? OR (sender = ? and pm = true)`) + args = append(args, target, target) if !after.IsZero() { - queryBuf.WriteString(" AND correspondents.nanotime > ?") + queryBuf.WriteString(" AND nanotime > ?") args = append(args, after.UnixNano()) } if !before.IsZero() { - queryBuf.WriteString(" AND correspondents.nanotime < ?") + queryBuf.WriteString(" AND nanotime < ?") args = append(args, before.UnixNano()) } - fmt.Fprintf(&queryBuf, " ORDER BY correspondents.nanotime %s LIMIT ?;", direction) + fmt.Fprintf(&queryBuf, " ORDER BY nanotime %s LIMIT ?;", direction) args = append(args, limit) query := queryBuf.String() @@ -957,17 +780,26 @@ func (mysql *MySQL) listCorrespondentsInternal(ctx context.Context, target strin return } defer rows.Close() - var correspondent string + var msgTarget string + var msgSender string var nanotime int64 for rows.Next() { - err = rows.Scan(&correspondent, &nanotime) + err = rows.Scan(&msgTarget, &msgSender, &nanotime) if err != nil { return } - results = append(results, history.TargetListing{ - CfName: correspondent, - Time: time.Unix(0, nanotime), - }) + if msgTarget == target { + results = append(results, history.TargetListing{ + CfName: msgSender, + Time: time.Unix(0, nanotime), + }) + } else { + results = append(results, history.TargetListing{ + CfName: msgTarget, + Time: time.Unix(0, nanotime), + }) + } + } if !ascending { @@ -1121,8 +953,8 @@ func (mysql *MySQL) GetPMs(casefoldedUser string) (results map[string]int64, err var queryBuf strings.Builder args := make([]interface{}, 0) - queryBuf.WriteString(`SELECT max(nanotime), correspondent FROM conversations WHERE target = ? GROUP BY correspondent;`) - args = append(args, casefoldedUser) + queryBuf.WriteString(`SELECT max(nanotime), target, sender FROM history WHERE target = ? OR (sender = ? and pm = true) GROUP BY target, sender;`) + args = append(args, casefoldedUser, casefoldedUser) rows, err := mysql.db.QueryContext(ctx, queryBuf.String(), args...) if mysql.logError("could not get pms", err) { @@ -1131,14 +963,18 @@ func (mysql *MySQL) GetPMs(casefoldedUser string) (results map[string]int64, err defer rows.Close() var last int64 - var correspondent string + var target, sender string for rows.Next() { - err = rows.Scan(&last, &correspondent) + err = rows.Scan(&last, &target, &sender) if mysql.logError("could not get pms", err) { return } // We really don't need nanosecond precision - results[correspondent] = last / 1000000 + if target != casefoldedUser { + results[target] = last / 1000000 + } else { + results[sender] = last / 1000000 + } } return } diff --git a/irc/mysql/serialization.go b/irc/mysql/serialization.go index c864495e..6065e0aa 100644 --- a/irc/mysql/serialization.go +++ b/irc/mysql/serialization.go @@ -16,8 +16,3 @@ func marshalItem(item *history.Item) (result []byte, err error) { func unmarshalItem(data []byte, result *history.Item) (err error) { return json.Unmarshal(data, result) } - -// TODO: probably should convert the internal mysql column to uint -func decodeMsgid(msgid string) ([]byte, error) { - return []byte(msgid), nil -} diff --git a/irc/nickname.go b/irc/nickname.go index 0f2f8232..629adc60 100644 --- a/irc/nickname.go +++ b/irc/nickname.go @@ -93,11 +93,11 @@ func performNickChange(server *Server, client *Client, target *Client, session * isBot := !isSanick && client.HasMode(modes.Bot) message := utils.MakeMessage("") histItem := history.Item{ - Type: history.Nick, - Nick: origNickMask, - AccountName: details.accountName, - Message: message, - IsBot: isBot, + Type: history.Nick, + Nick: origNickMask, + Account: details.account, + Message: message, + IsBot: isBot, } histItem.Params[0] = assignedNickname @@ -123,6 +123,8 @@ func performNickChange(server *Server, client *Client, target *Client, session * for _, channel := range target.Channels() { if channel.memberIsVisible(client) { + // I LOVE MUTATING STATE! + histItem.Target = channel.NameCasefolded() channel.AddHistoryItem(histItem, details.account) } } diff --git a/irc/roleplay.go b/irc/roleplay.go index 97625644..d8c9dc8f 100644 --- a/irc/roleplay.go +++ b/irc/roleplay.go @@ -110,6 +110,9 @@ func sendRoleplayMessage(server *Server, client *Client, source string, targetSt Type: history.Privmsg, Message: splitMessage, Nick: sourceMask, + Target: target, + // TODO: does this work? + Account: "$RP", }, client.Account()) } else { target, err := CasefoldName(targetString) diff --git a/irc/server.go b/irc/server.go index ebec4791..04f6c561 100644 --- a/irc/server.go +++ b/irc/server.go @@ -1079,7 +1079,7 @@ func (server *Server) ForgetHistory(accountName string) { return } - predicate := func(item *history.Item) bool { return item.AccountName == accountName } + predicate := func(item *history.Item) bool { return item.Account == accountName } for _, channel := range server.channels.Channels() { channel.history.Delete(predicate) @@ -1093,7 +1093,7 @@ func (server *Server) ForgetHistory(accountName string) { // deletes a message. target is a hint about what buffer it's in (not required for // persistent history, where all the msgids are indexed together). if accountName // is anything other than "*", it must match the recorded AccountName of the message -func (server *Server) DeleteMessage(target, msgid, accountName string) (err error) { +func (server *Server) DeleteMessage(target, msgid, account string) (err error) { config := server.Config() var hist *history.Buffer @@ -1116,10 +1116,10 @@ func (server *Server) DeleteMessage(target, msgid, accountName string) (err erro } if hist == nil { - err = server.historyDB.DeleteMsgid(msgid, accountName) + err = server.historyDB.DeleteMsgid(msgid, account) } else { count := hist.Delete(func(item *history.Item) bool { - return item.Message.Msgid == msgid && (accountName == "*" || item.AccountName == accountName) + return item.Message.Msgid == msgid && (account == "*" || item.Account == account) }) if count == 0 { err = errNoop