This commit is contained in:
Shivaram Lingamneni 2020-02-28 05:41:08 -05:00
parent c414ac383c
commit d967129446
4 changed files with 69 additions and 75 deletions

View file

@ -25,7 +25,7 @@ const (
MaxTargetLength = 64
// latest schema of the db
latestDbSchema = "1"
latestDbSchema = "2"
keySchemaVersion = "db.version"
cleanupRowLimit = 50
cleanupPauseTime = 10 * time.Minute
@ -144,11 +144,11 @@ func (mysql *MySQL) createTables() (err error) {
_, err = mysql.db.Exec(fmt.Sprintf(`CREATE TABLE conversations (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
lower_target VARBINARY(%[1]d) NOT NULL,
upper_target VARBINARY(%[1]d) NOT NULL,
target VARBINARY(%[1]d) NOT NULL,
correspondent VARBINARY(%[1]d) NOT NULL,
nanotime BIGINT UNSIGNED NOT NULL,
history_id BIGINT NOT NULL,
KEY (lower_target, upper_target, nanotime),
KEY (target, correspondent, nanotime),
KEY (history_id)
) CHARSET=ascii COLLATE=ascii_bin;`, MaxTargetLength))
if err != nil {
@ -278,7 +278,7 @@ func (mysql *MySQL) prepareStatements() (err error) {
return
}
mysql.insertConversation, err = mysql.db.Prepare(`INSERT INTO conversations
(lower_target, upper_target, nanotime, history_id) VALUES (?, ?, ?, ?);`)
(target, correspondent, nanotime, history_id) VALUES (?, ?, ?, ?);`)
if err != nil {
return
}
@ -315,19 +315,18 @@ func (mysql *MySQL) AddChannelItem(target string, item history.Item) (err error)
return
}
err = mysql.insertSequenceEntry(ctx, target, item.Message.Time, id)
err = mysql.insertSequenceEntry(ctx, target, item.Message.Time.UnixNano(), id)
return
}
func (mysql *MySQL) insertSequenceEntry(ctx context.Context, target string, messageTime time.Time, id int64) (err error) {
_, err = mysql.insertSequence.ExecContext(ctx, target, messageTime.UnixNano(), id)
func (mysql *MySQL) insertSequenceEntry(ctx context.Context, target string, messageTime int64, id int64) (err error) {
_, err = mysql.insertSequence.ExecContext(ctx, target, messageTime, id)
mysql.logError("could not insert sequence entry", err)
return
}
func (mysql *MySQL) insertConversationEntry(ctx context.Context, sender, recipient string, messageTime time.Time, id int64) (err error) {
lower, higher := stringMinMax(sender, recipient)
_, err = mysql.insertConversation.ExecContext(ctx, lower, higher, messageTime.UnixNano(), id)
func (mysql *MySQL) insertConversationEntry(ctx context.Context, target, correspondent string, messageTime int64, id int64) (err error) {
_, err = mysql.insertConversation.ExecContext(ctx, target, correspondent, messageTime, id)
mysql.logError("could not insert conversations entry", err)
return
}
@ -355,20 +354,12 @@ func (mysql *MySQL) insertBase(ctx context.Context, item history.Item) (id int64
return
}
func stringMinMax(first, second string) (min, max string) {
if first < second {
return first, second
} else {
return second, first
}
}
func (mysql *MySQL) AddDirectMessage(sender, recipient string, senderPersistent, recipientPersistent bool, item history.Item) (err error) {
func (mysql *MySQL) AddDirectMessage(sender, senderAccount, recipient, recipientAccount string, item history.Item) (err error) {
if mysql.db == nil {
return
}
if !(senderPersistent || recipientPersistent) {
if senderAccount == "" && recipientAccount == "" {
return
}
@ -384,22 +375,30 @@ func (mysql *MySQL) AddDirectMessage(sender, recipient string, senderPersistent,
return
}
if senderPersistent {
mysql.insertSequenceEntry(ctx, sender, item.Message.Time, id)
nanotime := item.Message.Time.UnixNano()
if senderAccount != "" {
err = mysql.insertSequenceEntry(ctx, senderAccount, nanotime, id)
if err != nil {
return
}
err = mysql.insertConversationEntry(ctx, senderAccount, recipient, nanotime, id)
if err != nil {
return
}
}
if recipientPersistent && sender != recipient {
err = mysql.insertSequenceEntry(ctx, recipient, item.Message.Time, id)
if recipientAccount != "" && sender != recipient {
err = mysql.insertSequenceEntry(ctx, recipientAccount, nanotime, id)
if err != nil {
return
}
err = mysql.insertConversationEntry(ctx, recipientAccount, sender, nanotime, id)
if err != nil {
return
}
}
err = mysql.insertConversationEntry(ctx, sender, recipient, item.Message.Time, id)
return
}
@ -453,14 +452,8 @@ func (mysql *MySQL) selectItems(ctx context.Context, query string, args ...inter
return
}
func (mysql *MySQL) betweenTimestamps(ctx context.Context, sender, recipient string, after, before, cutoff time.Time, limit int) (results []history.Item, err error) {
useSequence := true
var lowerTarget, upperTarget string
if sender != "" {
lowerTarget, upperTarget = stringMinMax(sender, recipient)
useSequence = false
}
func (mysql *MySQL) betweenTimestamps(ctx context.Context, target, correspondent string, after, before, cutoff time.Time, limit int) (results []history.Item, err error) {
useSequence := correspondent == ""
table := "sequence"
if !useSequence {
table = "conversations"
@ -479,11 +472,11 @@ func (mysql *MySQL) betweenTimestamps(ctx context.Context, sender, recipient str
"SELECT history.data from history INNER JOIN %[1]s ON history.id = %[1]s.history_id WHERE", table)
if useSequence {
fmt.Fprintf(&queryBuf, " sequence.target = ?")
args = append(args, recipient)
args = append(args, target)
} else {
fmt.Fprintf(&queryBuf, " conversations.lower_target = ? AND conversations.upper_target = ?")
args = append(args, lowerTarget)
args = append(args, upperTarget)
fmt.Fprintf(&queryBuf, " conversations.target = ? AND conversations.correspondent = ?")
args = append(args, target)
args = append(args, correspondent)
}
if !after.IsZero() {
fmt.Fprintf(&queryBuf, " AND %s.nanotime > ?", table)
@ -514,10 +507,10 @@ func (mysql *MySQL) Close() {
// implements history.Sequence, emulating a single history buffer (for a channel,
// a single user's DMs, or a DM conversation)
type mySQLHistorySequence struct {
mysql *MySQL
sender string
recipient string
cutoff time.Time
mysql *MySQL
target string
correspondent string
cutoff time.Time
}
func (s *mySQLHistorySequence) Between(start, end history.Selector, limit int) (results []history.Item, complete bool, err error) {
@ -539,7 +532,7 @@ func (s *mySQLHistorySequence) Between(start, end history.Selector, limit int) (
}
}
results, err = s.mysql.betweenTimestamps(ctx, s.sender, s.recipient, startTime, endTime, s.cutoff, limit)
results, err = s.mysql.betweenTimestamps(ctx, s.target, s.correspondent, startTime, endTime, s.cutoff, limit)
return results, (err == nil), err
}
@ -547,11 +540,11 @@ func (s *mySQLHistorySequence) Around(start history.Selector, limit int) (result
return history.GenericAround(s, start, limit)
}
func (mysql *MySQL) MakeSequence(sender, recipient string, cutoff time.Time) history.Sequence {
func (mysql *MySQL) MakeSequence(target, correspondent string, cutoff time.Time) history.Sequence {
return &mySQLHistorySequence{
sender: sender,
recipient: recipient,
mysql: mysql,
cutoff: cutoff,
target: target,
correspondent: correspondent,
mysql: mysql,
cutoff: cutoff,
}
}