1
0
Fork 0
forked from External/ergo

complete database reivision

This commit is contained in:
CEF Server 2024-11-28 02:58:14 +00:00
parent d26235e9a9
commit d73b6bac86
10 changed files with 203 additions and 353 deletions

View file

@ -6,7 +6,6 @@ package mysql
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"io"
@ -50,9 +49,7 @@ type MySQL struct {
logger *logger.Manager
insertHistory *sql.Stmt
insertSequence *sql.Stmt
insertConversation *sql.Stmt
insertCorrespondent *sql.Stmt
insertAccountMessage *sql.Stmt
stateMutex sync.Mutex
@ -89,39 +86,39 @@ func (mysql *MySQL) getExpireTime() (expireTime time.Duration) {
return
}
func (m *MySQL) Open() (err error) {
func (mysql *MySQL) Open() (err error) {
var address string
if m.config.SocketPath != "" {
address = fmt.Sprintf("unix(%s)", m.config.SocketPath)
} else if m.config.Port != 0 {
address = fmt.Sprintf("tcp(%s:%d)", m.config.Host, m.config.Port)
if mysql.config.SocketPath != "" {
address = fmt.Sprintf("unix(%s)", mysql.config.SocketPath)
} else if mysql.config.Port != 0 {
address = fmt.Sprintf("tcp(%s:%d)", mysql.config.Host, mysql.config.Port)
}
m.db, err = sql.Open("mysql", fmt.Sprintf("%s:%s@%s/%s", m.config.User, m.config.Password, address, m.config.HistoryDatabase))
mysql.db, err = sql.Open("mysql", fmt.Sprintf("%s:%s@%s/%s", mysql.config.User, mysql.config.Password, address, mysql.config.HistoryDatabase))
if err != nil {
return err
}
if m.config.MaxConns != 0 {
m.db.SetMaxOpenConns(m.config.MaxConns)
m.db.SetMaxIdleConns(m.config.MaxConns)
if mysql.config.MaxConns != 0 {
mysql.db.SetMaxOpenConns(mysql.config.MaxConns)
mysql.db.SetMaxIdleConns(mysql.config.MaxConns)
}
if m.config.ConnMaxLifetime != 0 {
m.db.SetConnMaxLifetime(m.config.ConnMaxLifetime)
if mysql.config.ConnMaxLifetime != 0 {
mysql.db.SetConnMaxLifetime(mysql.config.ConnMaxLifetime)
}
err = m.fixSchemas()
err = mysql.fixSchemas()
if err != nil {
return err
}
err = m.prepareStatements()
err = mysql.prepareStatements()
if err != nil {
return err
}
go m.cleanupLoop()
go m.forgetLoop()
go mysql.cleanupLoop()
go mysql.forgetLoop()
return nil
}
@ -166,20 +163,12 @@ func (mysql *MySQL) fixSchemas() (err error) {
if err != nil {
return
}
err = mysql.createCorrespondentsTable()
if err != nil {
return
}
_, err = mysql.db.Exec(`insert into metadata (key_name, value) values (?, ?);`, keySchemaMinorVersion, latestDbMinorVersion)
if err != nil {
return
}
} else if err == nil && minorVersion == "1" {
// upgrade from 2.1 to 2.2: create the correspondents table
err = mysql.createCorrespondentsTable()
if err != nil {
return
}
_, err = mysql.db.Exec(`update metadata set value = ? where key_name = ?;`, latestDbMinorVersion, keySchemaMinorVersion)
if err != nil {
return
@ -192,51 +181,15 @@ func (mysql *MySQL) fixSchemas() (err error) {
}
func (mysql *MySQL) createTables() (err error) {
_, err = mysql.db.Exec(`CREATE TABLE history (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
_, err = mysql.db.Exec(fmt.Sprintf(`CREATE TABLE history (
msgid BINARY(16) NOT NULL PRIMARY KEY,
data BLOB NOT NULL,
msgid BINARY(16) NOT NULL,
target VARBINARY(%[1]d) NOT NULL,
sender VARBINARY(%[1]d) NOT NULL,
nanotime BIGINT UNSIGNED NOT NULL,
pm boolean as (SUBSTRING(target, 1, 1) != "#") PERSISTENT,
KEY (msgid(4))
) CHARSET=ascii COLLATE=ascii_bin;`)
if err != nil {
return err
}
_, err = mysql.db.Exec(fmt.Sprintf(`CREATE TABLE sequence (
history_id BIGINT UNSIGNED NOT NULL PRIMARY KEY,
target VARBINARY(%[1]d) NOT NULL,
nanotime BIGINT UNSIGNED NOT NULL,
KEY (target, nanotime)
) CHARSET=ascii COLLATE=ascii_bin;`, MaxTargetLength))
if err != nil {
return err
}
/* XXX: this table used to be:
CREATE TABLE sequence (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
target VARBINARY(%[1]d) NOT NULL,
nanotime BIGINT UNSIGNED NOT NULL,
history_id BIGINT NOT NULL,
KEY (target, nanotime),
KEY (history_id)
) CHARSET=ascii COLLATE=ascii_bin;
Some users may still be using the old schema.
*/
_, err = mysql.db.Exec(fmt.Sprintf(`CREATE TABLE conversations (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
target VARBINARY(%[1]d) NOT NULL,
correspondent VARBINARY(%[1]d) NOT NULL,
nanotime BIGINT UNSIGNED NOT NULL,
history_id BIGINT NOT NULL,
KEY (target, correspondent, nanotime),
KEY (history_id)
) CHARSET=ascii COLLATE=ascii_bin;`, MaxTargetLength))
if err != nil {
return err
}
err = mysql.createCorrespondentsTable()
) CHARSET=ascii COLLATE=ascii_bin;`, MaxTargetLength, MaxTargetLength))
if err != nil {
return err
}
@ -249,19 +202,6 @@ func (mysql *MySQL) createTables() (err error) {
return nil
}
func (mysql *MySQL) createCorrespondentsTable() (err error) {
_, err = mysql.db.Exec(fmt.Sprintf(`CREATE TABLE correspondents (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
target VARBINARY(%[1]d) NOT NULL,
correspondent VARBINARY(%[1]d) NOT NULL,
nanotime BIGINT UNSIGNED NOT NULL,
UNIQUE KEY (target, correspondent),
KEY (target, nanotime),
KEY (nanotime)
) CHARSET=ascii COLLATE=ascii_bin;`, MaxTargetLength))
return
}
func (mysql *MySQL) createComplianceTables() (err error) {
_, err = mysql.db.Exec(fmt.Sprintf(`CREATE TABLE account_messages (
history_id BIGINT UNSIGNED NOT NULL PRIMARY KEY,
@ -327,10 +267,6 @@ func (mysql *MySQL) doCleanup(age time.Duration) (count int, err error) {
mysql.logger.Debug("mysql", fmt.Sprintf("deleting %d history rows, max age %s", len(ids), utils.NanoToTimestamp(maxNanotime)))
if maxNanotime != 0 {
mysql.deleteCorrespondents(ctx, maxNanotime)
}
return len(ids), mysql.deleteHistoryIDs(ctx, ids)
}
@ -347,14 +283,6 @@ func (mysql *MySQL) deleteHistoryIDs(ctx context.Context, ids []uint64) (err err
inBuf.WriteRune(')')
inClause := inBuf.String()
_, err = mysql.db.ExecContext(ctx, fmt.Sprintf(`DELETE FROM conversations WHERE history_id in %s;`, inClause))
if err != nil {
return
}
_, err = mysql.db.ExecContext(ctx, fmt.Sprintf(`DELETE FROM sequence WHERE history_id in %s;`, inClause))
if err != nil {
return
}
if mysql.isTrackingAccountMessages() {
_, err = mysql.db.ExecContext(ctx, fmt.Sprintf(`DELETE FROM account_messages WHERE history_id in %s;`, inClause))
if err != nil {
@ -370,57 +298,34 @@ func (mysql *MySQL) deleteHistoryIDs(ctx context.Context, ids []uint64) (err err
}
func (mysql *MySQL) selectCleanupIDs(ctx context.Context, age time.Duration) (ids []uint64, maxNanotime int64, err error) {
before := timestampSnowflake(time.Now().Add(-age))
rows, err := mysql.db.QueryContext(ctx, `
SELECT history.id, sequence.nanotime, conversations.nanotime
SELECT history.msgid
FROM history
LEFT JOIN sequence ON history.id = sequence.history_id
LEFT JOIN conversations on history.id = conversations.history_id
ORDER BY history.id LIMIT ?;`, cleanupRowLimit)
WHERE msgid < ?
ORDER BY history.msgid LIMIT ?;`, before, cleanupRowLimit)
if err != nil {
return
}
defer rows.Close()
idset := make(map[uint64]struct{}, cleanupRowLimit)
threshold := time.Now().Add(-age).UnixNano()
ids = make([]uint64, len(idset))
i := 0
for rows.Next() {
var id uint64
var seqNano, convNano sql.NullInt64
err = rows.Scan(&id, &seqNano, &convNano)
err = rows.Scan(&id)
if err != nil {
return
}
nanotime := extractNanotime(seqNano, convNano)
// returns 0 if not found; in that case the data is inconsistent
// and we should delete the entry
if nanotime < threshold {
idset[id] = struct{}{}
if nanotime > maxNanotime {
maxNanotime = nanotime
}
}
}
ids = make([]uint64, len(idset))
i := 0
for id := range idset {
ids[i] = id
i++
}
return
}
func (mysql *MySQL) deleteCorrespondents(ctx context.Context, threshold int64) {
result, err := mysql.db.ExecContext(ctx, `DELETE FROM correspondents WHERE nanotime <= (?);`, threshold)
if err != nil {
mysql.logError("error deleting correspondents", err)
} else {
count, err := result.RowsAffected()
if !mysql.logError("error deleting correspondents", err) {
mysql.logger.Debug(fmt.Sprintf("deleted %d correspondents entries", count))
}
}
}
// wait for forget queue items and process them one by one
func (mysql *MySQL) forgetLoop() {
defer func() {
@ -526,23 +431,7 @@ func (mysql *MySQL) doForgetIteration(account string) (count int, err error) {
func (mysql *MySQL) prepareStatements() (err error) {
mysql.insertHistory, err = mysql.db.Prepare(`INSERT INTO history
(data, msgid) VALUES (?, ?);`)
if err != nil {
return
}
mysql.insertSequence, err = mysql.db.Prepare(`INSERT INTO sequence
(target, nanotime, history_id) VALUES (?, ?, ?);`)
if err != nil {
return
}
mysql.insertConversation, err = mysql.db.Prepare(`INSERT INTO conversations
(target, correspondent, nanotime, history_id) VALUES (?, ?, ?, ?);`)
if err != nil {
return
}
mysql.insertCorrespondent, err = mysql.db.Prepare(`INSERT INTO correspondents
(target, correspondent, nanotime) VALUES (?, ?, ?)
ON DUPLICATE KEY UPDATE nanotime = GREATEST(nanotime, ?);`)
(data, msgid, target, sender, nanotime) VALUES (?, ?, ?, ?, ?);`)
if err != nil {
return
}
@ -608,11 +497,6 @@ func (mysql *MySQL) AddChannelItem(target string, item history.Item, account str
return
}
err = mysql.insertSequenceEntry(ctx, target, item.Message.Time.UnixNano(), id)
if err != nil {
return
}
err = mysql.insertAccountMessageEntry(ctx, id, account)
if err != nil {
return
@ -621,42 +505,21 @@ func (mysql *MySQL) AddChannelItem(target string, item history.Item, account str
return
}
func (mysql *MySQL) insertSequenceEntry(ctx context.Context, target string, messageTime int64, id int64) (err error) {
_, err = mysql.insertSequence.ExecContext(ctx, target, messageTime, id)
if err != nil {
println(target, messageTime, id, ctx)
}
mysql.logError("could not insert sequence entry", err)
return
}
func (mysql *MySQL) insertConversationEntry(ctx context.Context, target, correspondent string, messageTime int64, id int64) (err error) {
_, err = mysql.insertConversation.ExecContext(ctx, target, correspondent, messageTime, id)
mysql.logError("could not insert conversations entry", err)
return
}
func (mysql *MySQL) insertCorrespondentsEntry(ctx context.Context, target, correspondent string, messageTime int64, historyId int64) (err error) {
_, err = mysql.insertCorrespondent.ExecContext(ctx, target, correspondent, messageTime, messageTime)
mysql.logError("could not insert conversations entry", err)
return
}
func (mysql *MySQL) insertBase(ctx context.Context, item history.Item) (id int64, err error) {
_, err := marshalItem(&item)
var value []byte
value, err = marshalItem(&item)
if mysql.logError("could not marshal item", err) {
return
}
//msgidBytes, err := decodeMsgid(item.Message.Msgid)
/*if mysql.logError("could not decode msgid", err) {
return
}*/
result, err := mysql.insertHistory.ExecContext(ctx, value, item.Message.Msgid)
var account = item.Account
if account == "" {
account = "*"
}
result, err := mysql.insertHistory.ExecContext(ctx, value, item.Message.Msgid, item.Target, account, item.Message.Time.UnixNano())
if mysql.logError("could not insert item", err) {
return
}
id, err = result.LastInsertId()
if mysql.logError("could not insert item", err) {
return
@ -690,36 +553,7 @@ func (mysql *MySQL) AddDirectMessage(sender, senderAccount, recipient, recipient
ctx, cancel := context.WithTimeout(context.Background(), mysql.getTimeout())
defer cancel()
id, err := mysql.insertBase(ctx, item)
if err != nil {
return
}
nanotime := item.Message.Time.UnixNano()
if senderAccount != "" {
err = mysql.insertConversationEntry(ctx, senderAccount, recipient, nanotime, id)
if err != nil {
return
}
err = mysql.insertCorrespondentsEntry(ctx, senderAccount, recipient, nanotime, id)
if err != nil {
return
}
}
if recipientAccount != "" && sender != recipient {
err = mysql.insertConversationEntry(ctx, recipientAccount, sender, nanotime, id)
if err != nil {
return
}
err = mysql.insertCorrespondentsEntry(ctx, recipientAccount, sender, nanotime, id)
if err != nil {
return
}
}
err = mysql.insertAccountMessageEntry(ctx, id, senderAccount)
_, err = mysql.insertBase(ctx, item)
if err != nil {
return
}
@ -728,7 +562,7 @@ func (mysql *MySQL) AddDirectMessage(sender, senderAccount, recipient, recipient
}
// note that accountName is the unfolded name
func (mysql *MySQL) DeleteMsgid(msgid, accountName string) (err error) {
func (mysql *MySQL) DeleteMsgid(msgid, account string) (err error) {
if mysql.db == nil {
return nil
}
@ -741,11 +575,11 @@ func (mysql *MySQL) DeleteMsgid(msgid, accountName string) (err error) {
return
}
if accountName != "*" {
if account != "*" {
var item history.Item
err = unmarshalItem(data, &item)
// delete if the entry is corrupt
if err == nil && item.AccountName != accountName {
if err == nil && item.Account != account {
return ErrDisallowed
}
}
@ -756,7 +590,10 @@ func (mysql *MySQL) DeleteMsgid(msgid, accountName string) (err error) {
}
func (mysql *MySQL) Export(account string, writer io.Writer) {
if mysql.db == nil {
// no eu presence...
// maybe fix this when i know the new schema works
return
/*if mysql.db == nil {
return
}
@ -768,10 +605,8 @@ func (mysql *MySQL) Export(account string, writer io.Writer) {
defer cancel()
rows, rowsErr := mysql.db.QueryContext(ctx, `
SELECT account_messages.history_id, history.data, sequence.target FROM account_messages
INNER JOIN history ON history.id = account_messages.history_id
INNER JOIN sequence ON account_messages.history_id = sequence.history_id
WHERE account_messages.account = ? AND account_messages.history_id > ?
SELECT history.data, msgid, target FROM history
WHERE sender = ? AND account_messages.history_id > ?
LIMIT ?`, account, lastSeen, cleanupRowLimit)
if rowsErr != nil {
err = rowsErr
@ -783,7 +618,7 @@ func (mysql *MySQL) Export(account string, writer io.Writer) {
var blob, jsonBlob []byte
var target string
var item history.Item
err = rows.Scan(&id, &blob, &target)
err = rows.Scan(&blob, &id, &target)
if err != nil {
return
}
@ -791,7 +626,7 @@ func (mysql *MySQL) Export(account string, writer io.Writer) {
if err != nil {
return
}
item.CfCorrespondent = target
item.Target = target
jsonBlob, err = json.Marshal(item)
if err != nil {
return
@ -811,30 +646,28 @@ func (mysql *MySQL) Export(account string, writer io.Writer) {
}
mysql.logError("could not export history", err)
return
return*/
}
func (mysql *MySQL) lookupMsgid(ctx context.Context, msgid string, includeData bool) (result time.Time, id uint64, data []byte, err error) {
if err != nil {
return
}
cols := `sequence.nanotime, conversations.nanotime`
cols := `history.nanotime`
if includeData {
cols = `sequence.nanotime, conversations.nanotime, history.id, history.data`
cols = `history.nanotime, history.id, history.data`
}
// Since CEF uses snowflakes and vanilla ergo uses blobs, we cast as int to make it function.
// May have to adjust it some day
row := mysql.db.QueryRowContext(ctx, fmt.Sprintf(`
SELECT %s FROM history
LEFT JOIN sequence ON history.id = sequence.history_id
LEFT JOIN conversations ON history.id = conversations.history_id
WHERE history.msgid = CAST(? AS INT) LIMIT 1;`, cols), msgid)
var nanoSeq, nanoConv sql.NullInt64
var nanoSeq sql.NullInt64
if !includeData {
err = row.Scan(&nanoSeq, &nanoConv)
err = row.Scan(&nanoSeq)
} else {
err = row.Scan(&nanoSeq, &nanoConv, &id, &data)
err = row.Scan(&nanoSeq, &id, &data)
}
if err != sql.ErrNoRows {
mysql.logError("could not resolve msgid to time", err)
@ -842,7 +675,7 @@ func (mysql *MySQL) lookupMsgid(ctx context.Context, msgid string, includeData b
if err != nil {
return
}
nanotime := extractNanotime(nanoSeq, nanoConv)
nanotime := nanoSeq.Int64
if nanotime == 0 {
err = sql.ErrNoRows
return
@ -851,15 +684,6 @@ func (mysql *MySQL) lookupMsgid(ctx context.Context, msgid string, includeData b
return
}
func extractNanotime(seq, conv sql.NullInt64) (result int64) {
if seq.Valid {
return seq.Int64
} else if conv.Valid {
return conv.Int64
}
return
}
func (mysql *MySQL) selectItems(ctx context.Context, query string, args ...interface{}) (results []history.Item, err error) {
rows, err := mysql.db.QueryContext(ctx, query, args...)
if mysql.logError("could not select history items", err) {
@ -884,12 +708,12 @@ func (mysql *MySQL) selectItems(ctx context.Context, query string, args ...inter
return
}
func timestampSnowflake(t time.Time) uint64 {
var ts = t.Unix() & 0xffffffffffff
return uint64(ts << 16)
}
func (mysql *MySQL) betweenTimestamps(ctx context.Context, target, correspondent string, after, before, cutoff time.Time, limit int) (results []history.Item, err error) {
useSequence := correspondent == ""
table := "sequence"
if !useSequence {
table = "conversations"
}
after, before, ascending := history.MinMaxAsc(after, before, cutoff)
direction := "ASC"
@ -899,26 +723,25 @@ func (mysql *MySQL) betweenTimestamps(ctx context.Context, target, correspondent
var queryBuf strings.Builder
args := make([]interface{}, 0, 6)
fmt.Fprintf(&queryBuf,
"SELECT history.data from history INNER JOIN %[1]s ON history.id = %[1]s.history_id WHERE", table)
if useSequence {
fmt.Fprintf(&queryBuf, " sequence.target = ?")
args := make([]interface{}, 0, 7)
if correspondent == "" {
fmt.Fprintf(&queryBuf, "SELECT history.data from history WHERE target = ? ")
args = append(args, target)
} else {
fmt.Fprintf(&queryBuf, " conversations.target = ? AND conversations.correspondent = ?")
args = append(args, target)
args = append(args, correspondent)
fmt.Fprintf(&queryBuf, "SELECT history.data from history WHERE (target = ? and sender = ?) OR (target = ? and sender = ?)")
args = append(args, target, correspondent, correspondent, target)
}
if !after.IsZero() {
fmt.Fprintf(&queryBuf, " AND %s.nanotime > ?", table)
fmt.Fprintf(&queryBuf, " AND nanotime > ?")
args = append(args, after.UnixNano())
}
if !before.IsZero() {
fmt.Fprintf(&queryBuf, " AND %s.nanotime < ?", table)
fmt.Fprintf(&queryBuf, " AND nanotime < ?")
args = append(args, before.UnixNano())
}
fmt.Fprintf(&queryBuf, " ORDER BY %[1]s.nanotime %[2]s LIMIT ?;", table, direction)
fmt.Fprintf(&queryBuf, " ORDER BY nanotime %[1]s LIMIT ?;", direction)
args = append(args, limit)
results, err = mysql.selectItems(ctx, queryBuf.String(), args...)
@ -936,19 +759,19 @@ func (mysql *MySQL) listCorrespondentsInternal(ctx context.Context, target strin
}
var queryBuf strings.Builder
args := make([]interface{}, 0, 4)
queryBuf.WriteString(`SELECT correspondents.correspondent, correspondents.nanotime from correspondents
WHERE target = ?`)
args = append(args, target)
args := make([]interface{}, 0, 5)
queryBuf.WriteString(`SELECT target, sender, nanotime from history
WHERE target = ? OR (sender = ? and pm = true)`)
args = append(args, target, target)
if !after.IsZero() {
queryBuf.WriteString(" AND correspondents.nanotime > ?")
queryBuf.WriteString(" AND nanotime > ?")
args = append(args, after.UnixNano())
}
if !before.IsZero() {
queryBuf.WriteString(" AND correspondents.nanotime < ?")
queryBuf.WriteString(" AND nanotime < ?")
args = append(args, before.UnixNano())
}
fmt.Fprintf(&queryBuf, " ORDER BY correspondents.nanotime %s LIMIT ?;", direction)
fmt.Fprintf(&queryBuf, " ORDER BY nanotime %s LIMIT ?;", direction)
args = append(args, limit)
query := queryBuf.String()
@ -957,17 +780,26 @@ func (mysql *MySQL) listCorrespondentsInternal(ctx context.Context, target strin
return
}
defer rows.Close()
var correspondent string
var msgTarget string
var msgSender string
var nanotime int64
for rows.Next() {
err = rows.Scan(&correspondent, &nanotime)
err = rows.Scan(&msgTarget, &msgSender, &nanotime)
if err != nil {
return
}
results = append(results, history.TargetListing{
CfName: correspondent,
Time: time.Unix(0, nanotime),
})
if msgTarget == target {
results = append(results, history.TargetListing{
CfName: msgSender,
Time: time.Unix(0, nanotime),
})
} else {
results = append(results, history.TargetListing{
CfName: msgTarget,
Time: time.Unix(0, nanotime),
})
}
}
if !ascending {
@ -1121,8 +953,8 @@ func (mysql *MySQL) GetPMs(casefoldedUser string) (results map[string]int64, err
var queryBuf strings.Builder
args := make([]interface{}, 0)
queryBuf.WriteString(`SELECT max(nanotime), correspondent FROM conversations WHERE target = ? GROUP BY correspondent;`)
args = append(args, casefoldedUser)
queryBuf.WriteString(`SELECT max(nanotime), target, sender FROM history WHERE target = ? OR (sender = ? and pm = true) GROUP BY target, sender;`)
args = append(args, casefoldedUser, casefoldedUser)
rows, err := mysql.db.QueryContext(ctx, queryBuf.String(), args...)
if mysql.logError("could not get pms", err) {
@ -1131,14 +963,18 @@ func (mysql *MySQL) GetPMs(casefoldedUser string) (results map[string]int64, err
defer rows.Close()
var last int64
var correspondent string
var target, sender string
for rows.Next() {
err = rows.Scan(&last, &correspondent)
err = rows.Scan(&last, &target, &sender)
if mysql.logError("could not get pms", err) {
return
}
// We really don't need nanosecond precision
results[correspondent] = last / 1000000
if target != casefoldedUser {
results[target] = last / 1000000
} else {
results[sender] = last / 1000000
}
}
return
}

View file

@ -16,8 +16,3 @@ func marshalItem(item *history.Item) (result []byte, err error) {
func unmarshalItem(data []byte, result *history.Item) (err error) {
return json.Unmarshal(data, result)
}
// TODO: probably should convert the internal mysql column to uint
func decodeMsgid(msgid string) ([]byte, error) {
return []byte(msgid), nil
}