forked from External/ergo
fix #1615
Remove the CHATHISTORY * and znc.in/playback *self targets, clean up associated database code, add new mechanisms to play all missed DMs.
This commit is contained in:
parent
d2278faf75
commit
0d05ab4ff4
6 changed files with 87 additions and 64 deletions
|
|
@ -193,16 +193,25 @@ func (mysql *MySQL) createTables() (err error) {
|
|||
}
|
||||
|
||||
_, err = mysql.db.Exec(fmt.Sprintf(`CREATE TABLE sequence (
|
||||
history_id BIGINT UNSIGNED NOT NULL PRIMARY KEY,
|
||||
target VARBINARY(%[1]d) NOT NULL,
|
||||
nanotime BIGINT UNSIGNED NOT NULL,
|
||||
KEY (target, nanotime)
|
||||
) CHARSET=ascii COLLATE=ascii_bin;`, MaxTargetLength))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
/* XXX: this table used to be:
|
||||
CREATE TABLE sequence (
|
||||
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
|
||||
target VARBINARY(%[1]d) NOT NULL,
|
||||
nanotime BIGINT UNSIGNED NOT NULL,
|
||||
history_id BIGINT NOT NULL,
|
||||
KEY (target, nanotime),
|
||||
KEY (history_id)
|
||||
) CHARSET=ascii COLLATE=ascii_bin;`, MaxTargetLength))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
) CHARSET=ascii COLLATE=ascii_bin;
|
||||
Some users may still be using the old schema.
|
||||
*/
|
||||
|
||||
_, err = mysql.db.Exec(fmt.Sprintf(`CREATE TABLE conversations (
|
||||
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
|
||||
|
|
@ -352,31 +361,32 @@ func (mysql *MySQL) deleteHistoryIDs(ctx context.Context, ids []uint64) (err err
|
|||
|
||||
func (mysql *MySQL) selectCleanupIDs(ctx context.Context, age time.Duration) (ids []uint64, maxNanotime int64, err error) {
|
||||
rows, err := mysql.db.QueryContext(ctx, `
|
||||
SELECT history.id, sequence.nanotime
|
||||
SELECT history.id, sequence.nanotime, conversations.nanotime
|
||||
FROM history
|
||||
LEFT JOIN sequence ON history.id = sequence.history_id
|
||||
LEFT JOIN conversations on history.id = conversations.history_id
|
||||
ORDER BY history.id LIMIT ?;`, cleanupRowLimit)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
// a history ID may have 0-2 rows in sequence: 1 for a channel entry,
|
||||
// 2 for a DM, 0 if the data is inconsistent. therefore, deduplicate
|
||||
// and delete anything that doesn't have a sequence entry:
|
||||
idset := make(map[uint64]struct{}, cleanupRowLimit)
|
||||
threshold := time.Now().Add(-age).UnixNano()
|
||||
for rows.Next() {
|
||||
var id uint64
|
||||
var nanotime sql.NullInt64
|
||||
err = rows.Scan(&id, &nanotime)
|
||||
var seqNano, convNano sql.NullInt64
|
||||
err = rows.Scan(&id, &seqNano, &convNano)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if !nanotime.Valid || nanotime.Int64 < threshold {
|
||||
nanotime := extractNanotime(seqNano, convNano)
|
||||
// returns 0 if not found; in that case the data is inconsistent
|
||||
// and we should delete the entry
|
||||
if nanotime < threshold {
|
||||
idset[id] = struct{}{}
|
||||
if nanotime.Valid && nanotime.Int64 > maxNanotime {
|
||||
maxNanotime = nanotime.Int64
|
||||
if nanotime > maxNanotime {
|
||||
maxNanotime = nanotime
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -675,10 +685,6 @@ func (mysql *MySQL) AddDirectMessage(sender, senderAccount, recipient, recipient
|
|||
nanotime := item.Message.Time.UnixNano()
|
||||
|
||||
if senderAccount != "" {
|
||||
err = mysql.insertSequenceEntry(ctx, senderAccount, nanotime, id)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = mysql.insertConversationEntry(ctx, senderAccount, recipient, nanotime, id)
|
||||
if err != nil {
|
||||
return
|
||||
|
|
@ -690,10 +696,6 @@ func (mysql *MySQL) AddDirectMessage(sender, senderAccount, recipient, recipient
|
|||
}
|
||||
|
||||
if recipientAccount != "" && sender != recipient {
|
||||
err = mysql.insertSequenceEntry(ctx, recipientAccount, nanotime, id)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = mysql.insertConversationEntry(ctx, recipientAccount, sender, nanotime, id)
|
||||
if err != nil {
|
||||
return
|
||||
|
|
@ -800,31 +802,24 @@ func (mysql *MySQL) Export(account string, writer io.Writer) {
|
|||
}
|
||||
|
||||
func (mysql *MySQL) lookupMsgid(ctx context.Context, msgid string, includeData bool) (result time.Time, id uint64, data []byte, err error) {
|
||||
// in theory, we could optimize out a roundtrip to the database by using a subquery instead:
|
||||
// sequence.nanotime > (
|
||||
// SELECT sequence.nanotime FROM sequence, history
|
||||
// WHERE sequence.history_id = history.id AND history.msgid = ?
|
||||
// LIMIT 1)
|
||||
// however, this doesn't handle the BETWEEN case with one or two msgids, where we
|
||||
// don't initially know whether the interval is going forwards or backwards. to simplify
|
||||
// the logic, resolve msgids to timestamps "manually" in all cases, using a separate query.
|
||||
decoded, err := decodeMsgid(msgid)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
cols := `sequence.nanotime`
|
||||
cols := `sequence.nanotime, conversations.nanotime`
|
||||
if includeData {
|
||||
cols = `sequence.nanotime, sequence.history_id, history.data`
|
||||
cols = `sequence.nanotime, conversations.nanotime, history.id, history.data`
|
||||
}
|
||||
row := mysql.db.QueryRowContext(ctx, fmt.Sprintf(`
|
||||
SELECT %s FROM sequence
|
||||
INNER JOIN history ON history.id = sequence.history_id
|
||||
SELECT %s FROM history
|
||||
LEFT JOIN sequence ON history.id = sequence.history_id
|
||||
LEFT JOIN conversations ON history.id = conversations.history_id
|
||||
WHERE history.msgid = ? LIMIT 1;`, cols), decoded)
|
||||
var nanotime int64
|
||||
var nanoSeq, nanoConv sql.NullInt64
|
||||
if !includeData {
|
||||
err = row.Scan(&nanotime)
|
||||
err = row.Scan(&nanoSeq, &nanoConv)
|
||||
} else {
|
||||
err = row.Scan(&nanotime, &id, &data)
|
||||
err = row.Scan(&nanoSeq, &nanoConv, &id, &data)
|
||||
}
|
||||
if err != sql.ErrNoRows {
|
||||
mysql.logError("could not resolve msgid to time", err)
|
||||
|
|
@ -832,11 +827,24 @@ func (mysql *MySQL) lookupMsgid(ctx context.Context, msgid string, includeData b
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
nanotime := extractNanotime(nanoSeq, nanoConv)
|
||||
if nanotime == 0 {
|
||||
err = sql.ErrNoRows
|
||||
return
|
||||
}
|
||||
result = time.Unix(0, nanotime).UTC()
|
||||
return
|
||||
}
|
||||
|
||||
func extractNanotime(seq, conv sql.NullInt64) (result int64) {
|
||||
if seq.Valid {
|
||||
return seq.Int64
|
||||
} else if conv.Valid {
|
||||
return conv.Int64
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (mysql *MySQL) selectItems(ctx context.Context, query string, args ...interface{}) (results []history.Item, err error) {
|
||||
rows, err := mysql.db.QueryContext(ctx, query, args...)
|
||||
if mysql.logError("could not select history items", err) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue