1
0
Fork 0
forked from External/ergo

channel history modifications

This commit is contained in:
CEF Server 2024-11-18 19:38:49 +00:00
parent f4c03b6765
commit 711af30aa8
5 changed files with 106 additions and 41 deletions

View file

@ -623,6 +623,9 @@ func (mysql *MySQL) AddChannelItem(target string, item history.Item, account str
func (mysql *MySQL) insertSequenceEntry(ctx context.Context, target string, messageTime int64, id int64) (err error) {
_, err = mysql.insertSequence.ExecContext(ctx, target, messageTime, id)
if err != nil {
println(target, messageTime, id, ctx)
}
mysql.logError("could not insert sequence entry", err)
return
}
@ -640,17 +643,17 @@ func (mysql *MySQL) insertCorrespondentsEntry(ctx context.Context, target, corre
}
func (mysql *MySQL) insertBase(ctx context.Context, item history.Item) (id int64, err error) {
value, err := marshalItem(&item)
_, err := marshalItem(&item)
if mysql.logError("could not marshal item", err) {
return
}
msgidBytes, err := decodeMsgid(item.Message.Msgid)
if mysql.logError("could not decode msgid", err) {
//msgidBytes, err := decodeMsgid(item.Message.Msgid)
/*if mysql.logError("could not decode msgid", err) {
return
}
}*/
result, err := mysql.insertHistory.ExecContext(ctx, value, msgidBytes)
result, err := mysql.insertHistory.ExecContext(ctx, value, item.Message.Msgid)
if mysql.logError("could not insert item", err) {
return
}
@ -812,7 +815,6 @@ func (mysql *MySQL) Export(account string, writer io.Writer) {
}
func (mysql *MySQL) lookupMsgid(ctx context.Context, msgid string, includeData bool) (result time.Time, id uint64, data []byte, err error) {
decoded, err := decodeMsgid(msgid)
if err != nil {
return
}
@ -820,11 +822,14 @@ func (mysql *MySQL) lookupMsgid(ctx context.Context, msgid string, includeData b
if includeData {
cols = `sequence.nanotime, conversations.nanotime, history.id, history.data`
}
// Since CEF uses snowflakes and vanilla ergo uses blobs, we cast as int to make it function.
// May have to adjust it some day
row := mysql.db.QueryRowContext(ctx, fmt.Sprintf(`
SELECT %s FROM history
LEFT JOIN sequence ON history.id = sequence.history_id
LEFT JOIN conversations ON history.id = conversations.history_id
WHERE history.msgid = ? LIMIT 1;`, cols), decoded)
WHERE history.msgid = CAST(? AS INT) LIMIT 1;`, cols), msgid)
var nanoSeq, nanoConv sql.NullInt64
if !includeData {
err = row.Scan(&nanoSeq, &nanoConv)
@ -1042,6 +1047,7 @@ func (s *mySQLHistorySequence) Between(start, end history.Selector, limit int) (
defer cancel()
startTime := start.Time
if start.Msgid != "" {
startTime, _, _, err = s.mysql.lookupMsgid(ctx, start.Msgid, false)
if err != nil {
@ -1055,6 +1061,7 @@ func (s *mySQLHistorySequence) Between(start, end history.Selector, limit int) (
endTime := end.Time
if end.Msgid != "" {
endTime, _, _, err = s.mysql.lookupMsgid(ctx, end.Msgid, false)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil
@ -1101,3 +1108,37 @@ func (mysql *MySQL) MakeSequence(target, correspondent string, cutoff time.Time)
cutoff: cutoff,
}
}
func (mysql *MySQL) GetPMs(casefoldedUser string) (results map[string]int64, err error) {
if mysql.db == nil {
return
}
results = make(map[string]int64)
ctx, cancel := context.WithTimeout(context.Background(), mysql.getTimeout())
defer cancel()
var queryBuf strings.Builder
args := make([]interface{}, 0)
queryBuf.WriteString(`SELECT max(nanotime), correspondent FROM conversations WHERE target = ? GROUP BY correspondent;`)
args = append(args, casefoldedUser)
rows, err := mysql.db.QueryContext(ctx, queryBuf.String(), args...)
if mysql.logError("could not get pms", err) {
return
}
defer rows.Close()
var last int64
var correspondent string
for rows.Next() {
err = rows.Scan(&last, &correspondent)
if mysql.logError("could not get pms", err) {
return
}
// We really don't need nanosecond precision
results[correspondent] = last / 1000000
}
return
}