forked from External/ergo
initial persistent history implementation
This commit is contained in:
parent
0d5a4fd584
commit
33dac4c0ba
34 changed files with 2229 additions and 595 deletions
535
irc/mysql/history.go
Normal file
535
irc/mysql/history.go
Normal file
|
|
@ -0,0 +1,535 @@
|
|||
package mysql
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"runtime/debug"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
_ "github.com/go-sql-driver/mysql"
|
||||
"github.com/oragono/oragono/irc/history"
|
||||
"github.com/oragono/oragono/irc/logger"
|
||||
"github.com/oragono/oragono/irc/utils"
|
||||
)
|
||||
|
||||
const (
|
||||
// latest schema of the db
|
||||
latestDbSchema = "1"
|
||||
keySchemaVersion = "db.version"
|
||||
cleanupRowLimit = 50
|
||||
cleanupPauseTime = 10 * time.Minute
|
||||
)
|
||||
|
||||
type MySQL struct {
|
||||
db *sql.DB
|
||||
logger *logger.Manager
|
||||
|
||||
insertHistory *sql.Stmt
|
||||
insertSequence *sql.Stmt
|
||||
insertConversation *sql.Stmt
|
||||
|
||||
stateMutex sync.Mutex
|
||||
expireTime time.Duration
|
||||
}
|
||||
|
||||
func (mysql *MySQL) Initialize(logger *logger.Manager, expireTime time.Duration) {
|
||||
mysql.logger = logger
|
||||
mysql.expireTime = expireTime
|
||||
}
|
||||
|
||||
func (mysql *MySQL) SetExpireTime(expireTime time.Duration) {
|
||||
mysql.stateMutex.Lock()
|
||||
mysql.expireTime = expireTime
|
||||
mysql.stateMutex.Unlock()
|
||||
}
|
||||
|
||||
func (mysql *MySQL) getExpireTime() (expireTime time.Duration) {
|
||||
mysql.stateMutex.Lock()
|
||||
expireTime = mysql.expireTime
|
||||
mysql.stateMutex.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
func (mysql *MySQL) Open(username, password, host string, port int, database string) (err error) {
|
||||
// TODO: timeouts!
|
||||
var address string
|
||||
if port != 0 {
|
||||
address = fmt.Sprintf("tcp(%s:%d)", host, port)
|
||||
}
|
||||
|
||||
mysql.db, err = sql.Open("mysql", fmt.Sprintf("%s:%s@%s/%s", username, password, address, database))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = mysql.fixSchemas()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = mysql.prepareStatements()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go mysql.cleanupLoop()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mysql *MySQL) fixSchemas() (err error) {
|
||||
_, err = mysql.db.Exec(`CREATE TABLE IF NOT EXISTS metadata (
|
||||
key_name VARCHAR(32) primary key,
|
||||
value VARCHAR(32) NOT NULL
|
||||
) CHARSET=ascii COLLATE=ascii_bin;`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var schema string
|
||||
err = mysql.db.QueryRow(`select value from metadata where key_name = ?;`, keySchemaVersion).Scan(&schema)
|
||||
if err == sql.ErrNoRows {
|
||||
err = mysql.createTables()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
_, err = mysql.db.Exec(`insert into metadata (key_name, value) values (?, ?);`, keySchemaVersion, latestDbSchema)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
} else if err == nil && schema != latestDbSchema {
|
||||
// TODO figure out what to do about schema changes
|
||||
return &utils.IncompatibleSchemaError{CurrentVersion: schema, RequiredVersion: latestDbSchema}
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mysql *MySQL) createTables() (err error) {
|
||||
_, err = mysql.db.Exec(`CREATE TABLE history (
|
||||
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
|
||||
data BLOB NOT NULL,
|
||||
msgid BINARY(16) NOT NULL,
|
||||
KEY (msgid(4))
|
||||
) CHARSET=ascii COLLATE=ascii_bin;`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = mysql.db.Exec(`CREATE TABLE sequence (
|
||||
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
|
||||
target VARBINARY(64) NOT NULL,
|
||||
nanotime BIGINT UNSIGNED NOT NULL,
|
||||
history_id BIGINT NOT NULL,
|
||||
KEY (target, nanotime),
|
||||
KEY (history_id)
|
||||
) CHARSET=ascii COLLATE=ascii_bin;`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = mysql.db.Exec(`CREATE TABLE conversations (
|
||||
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
|
||||
lower_target VARBINARY(64) NOT NULL,
|
||||
upper_target VARBINARY(64) NOT NULL,
|
||||
nanotime BIGINT UNSIGNED NOT NULL,
|
||||
history_id BIGINT NOT NULL,
|
||||
KEY (lower_target, upper_target, nanotime),
|
||||
KEY (history_id)
|
||||
) CHARSET=ascii COLLATE=ascii_bin;`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mysql *MySQL) cleanupLoop() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
mysql.logger.Error("mysql",
|
||||
fmt.Sprintf("Panic in cleanup routine: %v\n%s", r, debug.Stack()))
|
||||
time.Sleep(cleanupPauseTime)
|
||||
go mysql.cleanupLoop()
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
expireTime := mysql.getExpireTime()
|
||||
if expireTime != 0 {
|
||||
for {
|
||||
startTime := time.Now()
|
||||
rowsDeleted, err := mysql.doCleanup(expireTime)
|
||||
elapsed := time.Now().Sub(startTime)
|
||||
mysql.logError("error during row cleanup", err)
|
||||
// keep going as long as we're accomplishing significant work
|
||||
// (don't busy-wait on small numbers of rows expiring):
|
||||
if rowsDeleted < (cleanupRowLimit / 10) {
|
||||
break
|
||||
}
|
||||
// crude backpressure mechanism: if the database is slow,
|
||||
// give it time to process other queries
|
||||
time.Sleep(elapsed)
|
||||
}
|
||||
}
|
||||
time.Sleep(cleanupPauseTime)
|
||||
}
|
||||
}
|
||||
|
||||
func (mysql *MySQL) doCleanup(age time.Duration) (count int, err error) {
|
||||
ids, maxNanotime, err := mysql.selectCleanupIDs(age)
|
||||
if len(ids) == 0 {
|
||||
mysql.logger.Debug("mysql", "found no rows to clean up")
|
||||
return
|
||||
}
|
||||
|
||||
mysql.logger.Debug("mysql", fmt.Sprintf("deleting %d history rows, max age %s", len(ids), utils.NanoToTimestamp(maxNanotime)))
|
||||
|
||||
// can't use ? binding for a variable number of arguments, build the IN clause manually
|
||||
var inBuf bytes.Buffer
|
||||
inBuf.WriteByte('(')
|
||||
for i, id := range ids {
|
||||
if i != 0 {
|
||||
inBuf.WriteRune(',')
|
||||
}
|
||||
inBuf.WriteString(strconv.FormatInt(int64(id), 10))
|
||||
}
|
||||
inBuf.WriteRune(')')
|
||||
|
||||
_, err = mysql.db.Exec(fmt.Sprintf(`DELETE FROM conversations WHERE history_id in %s;`, inBuf.Bytes()))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
_, err = mysql.db.Exec(fmt.Sprintf(`DELETE FROM sequence WHERE history_id in %s;`, inBuf.Bytes()))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
_, err = mysql.db.Exec(fmt.Sprintf(`DELETE FROM history WHERE id in %s;`, inBuf.Bytes()))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
count = len(ids)
|
||||
return
|
||||
}
|
||||
|
||||
func (mysql *MySQL) selectCleanupIDs(age time.Duration) (ids []uint64, maxNanotime int64, err error) {
|
||||
rows, err := mysql.db.Query(`
|
||||
SELECT history.id, sequence.nanotime
|
||||
FROM history
|
||||
LEFT JOIN sequence ON history.id = sequence.history_id
|
||||
ORDER BY history.id LIMIT ?;`, cleanupRowLimit)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
// a history ID may have 0-2 rows in sequence: 1 for a channel entry,
|
||||
// 2 for a DM, 0 if the data is inconsistent. therefore, deduplicate
|
||||
// and delete anything that doesn't have a sequence entry:
|
||||
idset := make(map[uint64]struct{}, cleanupRowLimit)
|
||||
threshold := time.Now().Add(-age).UnixNano()
|
||||
for rows.Next() {
|
||||
var id uint64
|
||||
var nanotime sql.NullInt64
|
||||
err = rows.Scan(&id, &nanotime)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if !nanotime.Valid || nanotime.Int64 < threshold {
|
||||
idset[id] = struct{}{}
|
||||
if nanotime.Valid && nanotime.Int64 > maxNanotime {
|
||||
maxNanotime = nanotime.Int64
|
||||
}
|
||||
}
|
||||
}
|
||||
ids = make([]uint64, len(idset))
|
||||
i := 0
|
||||
for id := range idset {
|
||||
ids[i] = id
|
||||
i++
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (mysql *MySQL) prepareStatements() (err error) {
|
||||
mysql.insertHistory, err = mysql.db.Prepare(`INSERT INTO history
|
||||
(data, msgid) VALUES (?, ?);`)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
mysql.insertSequence, err = mysql.db.Prepare(`INSERT INTO sequence
|
||||
(target, nanotime, history_id) VALUES (?, ?, ?);`)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
mysql.insertConversation, err = mysql.db.Prepare(`INSERT INTO conversations
|
||||
(lower_target, upper_target, nanotime, history_id) VALUES (?, ?, ?, ?);`)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (mysql *MySQL) logError(context string, err error) (quit bool) {
|
||||
if err != nil {
|
||||
mysql.logger.Error("mysql", context, err.Error())
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (mysql *MySQL) AddChannelItem(target string, item history.Item) (err error) {
|
||||
if mysql.db == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if target == "" {
|
||||
return utils.ErrInvalidParams
|
||||
}
|
||||
|
||||
id, err := mysql.insertBase(item)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = mysql.insertSequenceEntry(target, item.Message.Time, id)
|
||||
return
|
||||
}
|
||||
|
||||
func (mysql *MySQL) insertSequenceEntry(target string, messageTime time.Time, id int64) (err error) {
|
||||
_, err = mysql.insertSequence.Exec(target, messageTime.UnixNano(), id)
|
||||
mysql.logError("could not insert sequence entry", err)
|
||||
return
|
||||
}
|
||||
|
||||
func (mysql *MySQL) insertConversationEntry(sender, recipient string, messageTime time.Time, id int64) (err error) {
|
||||
lower, higher := stringMinMax(sender, recipient)
|
||||
_, err = mysql.insertConversation.Exec(lower, higher, messageTime.UnixNano(), id)
|
||||
mysql.logError("could not insert conversations entry", err)
|
||||
return
|
||||
}
|
||||
|
||||
func (mysql *MySQL) insertBase(item history.Item) (id int64, err error) {
|
||||
value, err := marshalItem(&item)
|
||||
if mysql.logError("could not marshal item", err) {
|
||||
return
|
||||
}
|
||||
|
||||
msgidBytes, err := decodeMsgid(item.Message.Msgid)
|
||||
if mysql.logError("could not decode msgid", err) {
|
||||
return
|
||||
}
|
||||
|
||||
result, err := mysql.insertHistory.Exec(value, msgidBytes)
|
||||
if mysql.logError("could not insert item", err) {
|
||||
return
|
||||
}
|
||||
id, err = result.LastInsertId()
|
||||
if mysql.logError("could not insert item", err) {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func stringMinMax(first, second string) (min, max string) {
|
||||
if first < second {
|
||||
return first, second
|
||||
} else {
|
||||
return second, first
|
||||
}
|
||||
}
|
||||
|
||||
func (mysql *MySQL) AddDirectMessage(sender, recipient string, senderPersistent, recipientPersistent bool, item history.Item) (err error) {
|
||||
if mysql.db == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if !(senderPersistent || recipientPersistent) {
|
||||
return
|
||||
}
|
||||
|
||||
if sender == "" || recipient == "" {
|
||||
return utils.ErrInvalidParams
|
||||
}
|
||||
|
||||
id, err := mysql.insertBase(item)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if senderPersistent {
|
||||
mysql.insertSequenceEntry(sender, item.Message.Time, id)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if recipientPersistent && sender != recipient {
|
||||
err = mysql.insertSequenceEntry(recipient, item.Message.Time, id)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
err = mysql.insertConversationEntry(sender, recipient, item.Message.Time, id)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (mysql *MySQL) msgidToTime(msgid string) (result time.Time, err error) {
|
||||
// in theory, we could optimize out a roundtrip to the database by using a subquery instead:
|
||||
// sequence.nanotime > (
|
||||
// SELECT sequence.nanotime FROM sequence, history
|
||||
// WHERE sequence.history_id = history.id AND history.msgid = ?
|
||||
// LIMIT 1)
|
||||
// however, this doesn't handle the BETWEEN case with one or two msgids, where we
|
||||
// don't initially know whether the interval is going forwards or backwards. to simplify
|
||||
// the logic, resolve msgids to timestamps "manually" in all cases, using a separate query.
|
||||
decoded, err := decodeMsgid(msgid)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
row := mysql.db.QueryRow(`
|
||||
SELECT sequence.nanotime FROM sequence
|
||||
INNER JOIN history ON history.id = sequence.history_id
|
||||
WHERE history.msgid = ? LIMIT 1;`, decoded)
|
||||
var nanotime int64
|
||||
err = row.Scan(&nanotime)
|
||||
if mysql.logError("could not resolve msgid to time", err) {
|
||||
return
|
||||
}
|
||||
result = time.Unix(0, nanotime)
|
||||
return
|
||||
}
|
||||
|
||||
func (mysql *MySQL) selectItems(query string, args ...interface{}) (results []history.Item, err error) {
|
||||
rows, err := mysql.db.Query(query, args...)
|
||||
if mysql.logError("could not select history items", err) {
|
||||
return
|
||||
}
|
||||
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var blob []byte
|
||||
var item history.Item
|
||||
err = rows.Scan(&blob)
|
||||
if mysql.logError("could not scan history item", err) {
|
||||
return
|
||||
}
|
||||
err = unmarshalItem(blob, &item)
|
||||
if mysql.logError("could not unmarshal history item", err) {
|
||||
return
|
||||
}
|
||||
results = append(results, item)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (mysql *MySQL) BetweenTimestamps(sender, recipient string, after, before, cutoff time.Time, limit int) (results []history.Item, err error) {
|
||||
useSequence := true
|
||||
var lowerTarget, upperTarget string
|
||||
if sender != "" {
|
||||
lowerTarget, upperTarget = stringMinMax(sender, recipient)
|
||||
useSequence = false
|
||||
}
|
||||
|
||||
table := "sequence"
|
||||
if !useSequence {
|
||||
table = "conversations"
|
||||
}
|
||||
|
||||
after, before, ascending := history.MinMaxAsc(after, before, cutoff)
|
||||
direction := "ASC"
|
||||
if !ascending {
|
||||
direction = "DESC"
|
||||
}
|
||||
|
||||
var queryBuf bytes.Buffer
|
||||
|
||||
args := make([]interface{}, 0, 6)
|
||||
fmt.Fprintf(&queryBuf,
|
||||
"SELECT history.data from history INNER JOIN %[1]s ON history.id = %[1]s.history_id WHERE", table)
|
||||
if useSequence {
|
||||
fmt.Fprintf(&queryBuf, " sequence.target = ?")
|
||||
args = append(args, recipient)
|
||||
} else {
|
||||
fmt.Fprintf(&queryBuf, " conversations.lower_target = ? AND conversations.upper_target = ?")
|
||||
args = append(args, lowerTarget)
|
||||
args = append(args, upperTarget)
|
||||
}
|
||||
if !after.IsZero() {
|
||||
fmt.Fprintf(&queryBuf, " AND %s.nanotime > ?", table)
|
||||
args = append(args, after.UnixNano())
|
||||
}
|
||||
if !before.IsZero() {
|
||||
fmt.Fprintf(&queryBuf, " AND %s.nanotime < ?", table)
|
||||
args = append(args, before.UnixNano())
|
||||
}
|
||||
fmt.Fprintf(&queryBuf, " ORDER BY %[1]s.nanotime %[2]s LIMIT ?;", table, direction)
|
||||
args = append(args, limit)
|
||||
|
||||
results, err = mysql.selectItems(queryBuf.String(), args...)
|
||||
if err == nil && !ascending {
|
||||
history.Reverse(results)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (mysql *MySQL) Close() {
|
||||
// closing the database will close our prepared statements as well
|
||||
if mysql.db != nil {
|
||||
mysql.db.Close()
|
||||
}
|
||||
mysql.db = nil
|
||||
}
|
||||
|
||||
// implements history.Sequence, emulating a single history buffer (for a channel,
|
||||
// a single user's DMs, or a DM conversation)
|
||||
type mySQLHistorySequence struct {
|
||||
mysql *MySQL
|
||||
sender string
|
||||
recipient string
|
||||
cutoff time.Time
|
||||
}
|
||||
|
||||
func (s *mySQLHistorySequence) Between(start, end history.Selector, limit int) (results []history.Item, complete bool, err error) {
|
||||
startTime := start.Time
|
||||
if start.Msgid != "" {
|
||||
startTime, err = s.mysql.msgidToTime(start.Msgid)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
}
|
||||
endTime := end.Time
|
||||
if end.Msgid != "" {
|
||||
endTime, err = s.mysql.msgidToTime(end.Msgid)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
}
|
||||
|
||||
results, err = s.mysql.BetweenTimestamps(s.sender, s.recipient, startTime, endTime, s.cutoff, limit)
|
||||
return results, (err == nil), err
|
||||
}
|
||||
|
||||
func (s *mySQLHistorySequence) Around(start history.Selector, limit int) (results []history.Item, err error) {
|
||||
return history.GenericAround(s, start, limit)
|
||||
}
|
||||
|
||||
func (mysql *MySQL) MakeSequence(sender, recipient string, cutoff time.Time) history.Sequence {
|
||||
return &mySQLHistorySequence{
|
||||
sender: sender,
|
||||
recipient: recipient,
|
||||
mysql: mysql,
|
||||
cutoff: cutoff,
|
||||
}
|
||||
}
|
||||
23
irc/mysql/serialization.go
Normal file
23
irc/mysql/serialization.go
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
package mysql
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/oragono/oragono/irc/history"
|
||||
"github.com/oragono/oragono/irc/utils"
|
||||
)
|
||||
|
||||
// 123 / '{' is the magic number that means JSON;
|
||||
// if we want to do a binary encoding later, we just have to add different magic version numbers
|
||||
|
||||
func marshalItem(item *history.Item) (result []byte, err error) {
|
||||
return json.Marshal(item)
|
||||
}
|
||||
|
||||
func unmarshalItem(data []byte, result *history.Item) (err error) {
|
||||
return json.Unmarshal(data, result)
|
||||
}
|
||||
|
||||
func decodeMsgid(msgid string) ([]byte, error) {
|
||||
return utils.B32Encoder.DecodeString(msgid)
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue